|
1 | 1 | package workflow |
2 | 2 |
|
3 | 3 | import ( |
| 4 | + "context" |
4 | 5 | "fmt" |
5 | 6 | "net/url" |
6 | 7 | "testing" |
@@ -524,3 +525,62 @@ func Test_EngineInvocationConcurrent(t *testing.T) { |
524 | 525 | } |
525 | 526 | } |
526 | 527 | } |
| 528 | + |
| 529 | +func Test_EngineImpl_InvokeWithContext_CustomContext(t *testing.T) { |
| 530 | + config := configuration.NewInMemory() |
| 531 | + engine := NewWorkFlowEngine(config) |
| 532 | + |
| 533 | + wfId := NewWorkflowIdentifier("ctxtest") |
| 534 | + flagset := pflag.NewFlagSet("ctx", pflag.ContinueOnError) |
| 535 | + |
| 536 | + var receivedCtx context.Context |
| 537 | + _, err := engine.Register(wfId, ConfigurationOptionsFromFlagset(flagset), func(invocation InvocationContext, input []Data) ([]Data, error) { |
| 538 | + receivedCtx = invocation.Context() |
| 539 | + return nil, nil |
| 540 | + }) |
| 541 | + assert.NoError(t, err) |
| 542 | + |
| 543 | + err = engine.Init() |
| 544 | + assert.NoError(t, err) |
| 545 | + |
| 546 | + // Create a context with a specific value and deadline to verify it's passed through |
| 547 | + type ctxKey string |
| 548 | + testKey := ctxKey("test-key") |
| 549 | + testValue := "test-value" |
| 550 | + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 551 | + defer cancel() |
| 552 | + ctx = context.WithValue(ctx, testKey, testValue) |
| 553 | + |
| 554 | + _, err = engine.Invoke(wfId, WithContext(ctx)) |
| 555 | + assert.NoError(t, err) |
| 556 | + assert.NotNil(t, receivedCtx) |
| 557 | + assert.Equal(t, testValue, receivedCtx.Value(testKey)) |
| 558 | + |
| 559 | + // Verify deadline is propagated |
| 560 | + deadline, hasDeadline := receivedCtx.Deadline() |
| 561 | + assert.True(t, hasDeadline, "context should have a deadline") |
| 562 | + assert.False(t, deadline.IsZero(), "deadline should not be zero") |
| 563 | +} |
| 564 | + |
| 565 | +func Test_EngineImpl_InvokeWithContext_DefaultContext(t *testing.T) { |
| 566 | + config := configuration.NewInMemory() |
| 567 | + engine := NewWorkFlowEngine(config) |
| 568 | + |
| 569 | + wfId := NewWorkflowIdentifier("ctxdefault") |
| 570 | + flagset := pflag.NewFlagSet("cd", pflag.ContinueOnError) |
| 571 | + |
| 572 | + var receivedCtx context.Context |
| 573 | + _, err := engine.Register(wfId, ConfigurationOptionsFromFlagset(flagset), func(invocation InvocationContext, input []Data) ([]Data, error) { |
| 574 | + receivedCtx = invocation.Context() |
| 575 | + return nil, nil |
| 576 | + }) |
| 577 | + assert.NoError(t, err) |
| 578 | + |
| 579 | + err = engine.Init() |
| 580 | + assert.NoError(t, err) |
| 581 | + |
| 582 | + // Invoke without WithContext - should get a non-nil default context |
| 583 | + _, err = engine.Invoke(wfId) |
| 584 | + assert.NoError(t, err) |
| 585 | + assert.NotNil(t, receivedCtx) |
| 586 | +} |
0 commit comments