From 97fefddf88157ddab11c61c63c3075627423351a Mon Sep 17 00:00:00 2001 From: Nate Mortensen Date: Tue, 5 Aug 2025 17:00:49 -0700 Subject: [PATCH] Add support for Ephemeral TaskLists Update Sessions to use Ephemeral TaskLists behind a feature flag. This ensures that the per-host TaskList is automatically removed once it is no longer used. This should only be enabled once the server fully supports Ephemeral TaskLists as it will otherwise return errors for the unknown TaskListKind. Signed-off-by: Nathanael Mortensen --- internal/activity_task_handler.go | 2 +- internal/client.go | 1 + internal/compatibility/api_test.go | 19 ++- internal/compatibility/enum_test.go | 1 + internal/compatibility/proto/enum.go | 2 + internal/compatibility/proto/response.go | 1 + internal/compatibility/thrift/enum.go | 2 + internal/compatibility/thrift/response.go | 1 + internal/internal_activity.go | 1 + internal/internal_event_handlers.go | 9 +- internal/internal_event_handlers_test.go | 1 + internal/internal_task_handlers.go | 3 + internal/internal_task_handlers_test.go | 134 ++++++++++---------- internal/internal_task_pollers.go | 10 +- internal/internal_task_pollers_test.go | 2 +- internal/internal_utils.go | 3 +- internal/internal_worker.go | 32 ++++- internal/internal_worker_base.go | 1 + internal/internal_worker_interfaces_test.go | 5 +- internal/internal_worker_test.go | 4 +- internal/internal_workers_test.go | 109 +++++++++++++++- internal/internal_workflow_testsuite.go | 7 +- internal/workflow.go | 4 + internal/workflow_replayer.go | 5 +- internal/workflow_replayer_test.go | 18 +-- internal/workflow_shadower_worker.go | 13 +- internal/workflow_shadower_worker_test.go | 4 +- test/activity_test.go | 4 + test/integration_test.go | 31 +++++ test/workflow_test.go | 20 +++ 30 files changed, 339 insertions(+), 110 deletions(-) diff --git a/internal/activity_task_handler.go b/internal/activity_task_handler.go index 1a5f9f224..74d9fe4f5 100644 --- a/internal/activity_task_handler.go +++ b/internal/activity_task_handler.go @@ -80,7 +80,7 @@ func newActivityTaskHandlerWithCustomProvider( } return &activityTaskHandlerImpl{ clock: clock, - taskListName: params.TaskList, + taskListName: params.TaskList.GetName(), identity: params.Identity, service: service, logger: params.Logger, diff --git a/internal/client.go b/internal/client.go index e8bd46fe0..c2044df69 100644 --- a/internal/client.go +++ b/internal/client.go @@ -656,6 +656,7 @@ func getFeatureFlags(options *ClientOptions) FeatureFlags { return FeatureFlags{ WorkflowExecutionAlreadyCompletedErrorEnabled: options.FeatureFlags.WorkflowExecutionAlreadyCompletedErrorEnabled, PollerAutoScalerEnabled: options.FeatureFlags.PollerAutoScalerEnabled, + EphemeralTaskListsEnabled: options.FeatureFlags.EphemeralTaskListsEnabled, } } return FeatureFlags{} diff --git a/internal/compatibility/api_test.go b/internal/compatibility/api_test.go index 80ece0e95..9e0c8b944 100644 --- a/internal/compatibility/api_test.go +++ b/internal/compatibility/api_test.go @@ -629,10 +629,27 @@ func TestDescribeTaskListResponse(t *testing.T) { thrift.DescribeTaskListResponse, proto.DescribeTaskListResponse, FuzzOptions{ + CustomFuncs: []interface{}{ + func(e *apiv1.TaskListKind, c fuzz.Continue) { + validValues := []apiv1.TaskListKind{ + apiv1.TaskListKind_TASK_LIST_KIND_INVALID, + apiv1.TaskListKind_TASK_LIST_KIND_NORMAL, + apiv1.TaskListKind_TASK_LIST_KIND_STICKY, + } + *e = validValues[c.Intn(len(validValues))] + }, + func(e *apiv1.TaskListType, c fuzz.Continue) { + validValues := []apiv1.TaskListType{ + apiv1.TaskListType_TASK_LIST_TYPE_INVALID, + apiv1.TaskListType_TASK_LIST_TYPE_DECISION, + apiv1.TaskListType_TASK_LIST_TYPE_ACTIVITY, + } + *e = validValues[c.Intn(len(validValues))] + }, + }, ExcludedFields: []string{ "PartitionConfig", // [BUG] PartitionConfig field is lost during round trip - complex nested maps with TaskListPartition not preserved "TaskListStatus", // [BUG] TaskListStatus fields IsolationGroupMetrics and NewTasksPerSecond are not mapped - they become nil/0 after round trip - "TaskList", // [BUG] TaskList field is lost during round trip - mapper not preserving this field correctly }, }, ) diff --git a/internal/compatibility/enum_test.go b/internal/compatibility/enum_test.go index a3e3f044c..f34aa6145 100644 --- a/internal/compatibility/enum_test.go +++ b/internal/compatibility/enum_test.go @@ -266,6 +266,7 @@ func TestTaskListKind(t *testing.T) { apiv1.TaskListKind_TASK_LIST_KIND_INVALID, apiv1.TaskListKind_TASK_LIST_KIND_NORMAL, apiv1.TaskListKind_TASK_LIST_KIND_STICKY, + apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL, } { assert.Equal(t, item, proto.TaskListKind(thrift.TaskListKind(item))) } diff --git a/internal/compatibility/proto/enum.go b/internal/compatibility/proto/enum.go index 5ddf54ce0..f711cb93e 100644 --- a/internal/compatibility/proto/enum.go +++ b/internal/compatibility/proto/enum.go @@ -35,6 +35,8 @@ func TaskListKind(t *shared.TaskListKind) apiv1.TaskListKind { return apiv1.TaskListKind_TASK_LIST_KIND_NORMAL case shared.TaskListKindSticky: return apiv1.TaskListKind_TASK_LIST_KIND_STICKY + case shared.TaskListKindEphemeral: + return apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL } panic("unexpected enum value") } diff --git a/internal/compatibility/proto/response.go b/internal/compatibility/proto/response.go index 8e7e2b927..9d1a16b0e 100644 --- a/internal/compatibility/proto/response.go +++ b/internal/compatibility/proto/response.go @@ -74,6 +74,7 @@ func DescribeTaskListResponse(t *shared.DescribeTaskListResponse) *apiv1.Describ return &apiv1.DescribeTaskListResponse{ Pollers: PollerInfoArray(t.Pollers), TaskListStatus: TaskListStatus(t.TaskListStatus), + TaskList: TaskList(t.TaskList), } } diff --git a/internal/compatibility/thrift/enum.go b/internal/compatibility/thrift/enum.go index 0a5682f19..f7b1f5b66 100644 --- a/internal/compatibility/thrift/enum.go +++ b/internal/compatibility/thrift/enum.go @@ -34,6 +34,8 @@ func TaskListKind(t apiv1.TaskListKind) *shared.TaskListKind { return shared.TaskListKindNormal.Ptr() case apiv1.TaskListKind_TASK_LIST_KIND_STICKY: return shared.TaskListKindSticky.Ptr() + case apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL: + return shared.TaskListKindEphemeral.Ptr() } panic("unexpected enum value") } diff --git a/internal/compatibility/thrift/response.go b/internal/compatibility/thrift/response.go index 68d4a8434..24ffa3808 100644 --- a/internal/compatibility/thrift/response.go +++ b/internal/compatibility/thrift/response.go @@ -74,6 +74,7 @@ func DescribeTaskListResponse(t *apiv1.DescribeTaskListResponse) *shared.Describ return &shared.DescribeTaskListResponse{ Pollers: PollerInfoArray(t.Pollers), TaskListStatus: TaskListStatus(t.TaskListStatus), + TaskList: TaskList(t.TaskList), } } diff --git a/internal/internal_activity.go b/internal/internal_activity.go index 75a40ed43..ae2a186ec 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -59,6 +59,7 @@ type ( activityOptions struct { ActivityID *string // Users can choose IDs but our framework makes it optional to decrease the crust. TaskListName string + TaskListKind shared.TaskListKind ScheduleToCloseTimeoutSeconds int32 ScheduleToStartTimeoutSeconds int32 StartToCloseTimeoutSeconds int32 diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index cc6dbb27d..d1d82d7a4 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -121,6 +121,7 @@ type ( contextPropagators []ContextPropagator tracer opentracing.Tracer workflowInterceptorFactories []WorkflowInterceptorFactory + featureFlags FeatureFlags } localActivityTask struct { @@ -205,6 +206,7 @@ func newWorkflowExecutionEventHandler( contextPropagators []ContextPropagator, tracer opentracing.Tracer, workflowInterceptorFactories []WorkflowInterceptorFactory, + featureFlags FeatureFlags, ) workflowExecutionEventHandler { context := &workflowEnvironmentImpl{ workflowInfo: workflowInfo, @@ -222,6 +224,7 @@ func newWorkflowExecutionEventHandler( contextPropagators: contextPropagators, tracer: tracer, workflowInterceptorFactories: workflowInterceptorFactories, + featureFlags: featureFlags, } context.logger = logger.With( zapcore.Field{Key: tagWorkflowType, Type: zapcore.StringType, String: workflowInfo.WorkflowType.Name}, @@ -472,7 +475,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters executeActivityPar } activityID := scheduleTaskAttr.GetActivityId() scheduleTaskAttr.ActivityType = activityTypePtr(parameters.ActivityType) - scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName)}) + scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName), Kind: parameters.TaskListKind.Ptr()}) scheduleTaskAttr.Input = parameters.Input scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(parameters.ScheduleToCloseTimeoutSeconds) scheduleTaskAttr.StartToCloseTimeoutSeconds = common.Int32Ptr(parameters.StartToCloseTimeoutSeconds) @@ -804,6 +807,10 @@ func (wc *workflowEnvironmentImpl) GetWorkflowInterceptors() []WorkflowIntercept return wc.workflowInterceptorFactories } +func (wc *workflowEnvironmentImpl) GetFeatureFlags() FeatureFlags { + return wc.featureFlags +} + func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( event *m.HistoryEvent, isReplay bool, diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index 0b87fc801..33a8419bc 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -1065,6 +1065,7 @@ func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workfl nil, opentracing.NoopTracer{}, nil, + FeatureFlags{}, ).(*workflowExecutionEventHandlerImpl) } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index f6b47e0b8..8f9d33d31 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -145,6 +145,7 @@ type ( tracer opentracing.Tracer workflowInterceptorFactories []WorkflowInterceptorFactory disableStrictNonDeterminism bool + featureFlags FeatureFlags } activityProvider func(name string) activity @@ -420,6 +421,7 @@ func newWorkflowTaskHandler( tracer: params.Tracer, workflowInterceptorFactories: params.WorkflowInterceptorChainFactories, disableStrictNonDeterminism: params.WorkerBugPorts.DisableStrictNonDeterminismCheck, + featureFlags: params.FeatureFlags, } traceLog(func() { @@ -623,6 +625,7 @@ func (w *workflowExecutionContextImpl) createEventHandler() { w.wth.contextPropagators, w.wth.tracer, w.wth.workflowInterceptorFactories, + w.wth.featureFlags, ) w.eventHandler.Store(eventHandler) } diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 1db0c7bb5..b24b6b6e9 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -325,7 +325,7 @@ func (t *TaskHandlersTestSuite) testWorkflowTaskWorkflowExecutionStartedHelper(p func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowExecutionStarted() { params := workerExecutionParameters{ - TaskList: testWorkflowTaskTasklist, + TaskList: &s.TaskList{Name: common.StringPtr(testWorkflowTaskTasklist), Kind: s.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ Identity: "test-id-1", Logger: t.logger, @@ -336,7 +336,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowExecutionStarted() { func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowExecutionStarted_WithDataConverter() { params := workerExecutionParameters{ - TaskList: testWorkflowTaskTasklist, + TaskList: &s.TaskList{Name: common.StringPtr(testWorkflowTaskTasklist), Kind: s.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ Identity: "test-id-1", Logger: t.logger, @@ -347,24 +347,24 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowExecutionStarted_WithDa } func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() { - taskList := "tl1" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} checksum1 := "chck1" checksum2 := "chck2" testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(2), BinaryChecksum: common.StringPtr(checksum1)}), createTestEventTimerStarted(5, 0), createTestEventTimerFired(6, 0), - createTestEventDecisionTaskScheduled(7, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskScheduled(7, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(8), createTestEventDecisionTaskCompleted(9, &s.DecisionTaskCompletedEventAttributes{ ScheduledEventId: common.Int64Ptr(7), BinaryChecksum: common.StringPtr(checksum2)}), createTestEventTimerStarted(10, 1), createTestEventTimerFired(11, 1), - createTestEventDecisionTaskScheduled(12, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskScheduled(12, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(13), } task := createWorkflowTask(testEvents, 8, "BinaryChecksumWorkflow") @@ -394,16 +394,16 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() { func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() { // Schedule an activity and see if we complete workflow. - taskList := "tl1" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")}, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, }), createTestEventActivityTaskStarted(6, &s.ActivityTaskStartedEventAttributes{}), createTestEventActivityTaskCompleted(7, &s.ActivityTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}), @@ -440,20 +440,20 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() { func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow() { // Schedule an activity and see if we complete workflow. - taskList := "sticky-tl" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} execution := &s.WorkflowExecution{ WorkflowId: common.StringPtr("fake-workflow-id"), RunId: common.StringPtr(uuid.New()), } testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")}, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, }), createTestEventActivityTaskStarted(6, &s.ActivityTaskStartedEventAttributes{}), createTestEventActivityTaskCompleted(7, &s.ActivityTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}), @@ -493,16 +493,16 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_2() { // This test appears to be just a finer-grained version of TestWorkflowTask_QueryWorkflow, though the older names // for them implied entirely different purposes. Likely it can be combined with TestWorkflowTask_QueryWorkflow // without losing anything useful. - taskList := "tl1" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")}, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, }), createTestEventActivityTaskStarted(6, &s.ActivityTaskStartedEventAttributes{}), createTestEventActivityTaskCompleted(7, &s.ActivityTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}), @@ -570,16 +570,16 @@ func (t *TaskHandlersTestSuite) verifyQueryResult(response interface{}, expected } func (t *TaskHandlersTestSuite) TestCacheEvictionWhenErrorOccurs() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &s.ActivityType{Name: common.StringPtr("pkg.Greeter_Activity")}, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, }), } params := workerExecutionParameters{ @@ -609,13 +609,13 @@ func (t *TaskHandlersTestSuite) TestCacheEvictionWhenErrorOccurs() { } func (t *TaskHandlersTestSuite) TestWithMissingHistoryEvents() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), - createTestEventDecisionTaskScheduled(6, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskScheduled(6, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(7), } params := workerExecutionParameters{ @@ -645,19 +645,19 @@ func (t *TaskHandlersTestSuite) TestWithMissingHistoryEvents() { } func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskFailed(4, &s.DecisionTaskFailedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), - createTestEventDecisionTaskScheduled(5, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskScheduled(5, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(6), createTestEventDecisionTaskCompleted(7, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}), createTestEventActivityTaskScheduled(8, &s.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &s.ActivityType{Name: common.StringPtr("pkg.Greeter_Activity")}, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, }), } params := workerExecutionParameters{ @@ -730,10 +730,10 @@ func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(disableSticky bool) { RegisterWorkflowOptions{Name: workflowName}, ) - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), } @@ -767,16 +767,16 @@ func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(disableSticky bool) { } func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &s.ActivityType{Name: common.StringPtr("pkg.Greeter_Activity")}, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, }), } task := createWorkflowTask(testEvents, 3, "HelloWorld_Workflow") @@ -838,17 +838,17 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() { } func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicLogNonexistingID() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ // Insert an ID which does not exist ActivityId: common.StringPtr("NotAnActivityID"), ActivityType: &s.ActivityType{Name: common.StringPtr("pkg.Greeter_Activity")}, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, }), } @@ -881,15 +881,15 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicLogNonexistingI require.Equal(t.T(), zapcore.ErrorType, replayErrorField.Type) require.ErrorContains(t.T(), replayErrorField.Interface.(error), "nondeterministic workflow: "+ - "history event is ActivityTaskScheduled: (ActivityId:NotAnActivityID, ActivityType:(Name:pkg.Greeter_Activity), TaskList:(Name:taskList), Input:[]), "+ - "replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList)") + "history event is ActivityTaskScheduled: (ActivityId:NotAnActivityID, ActivityType:(Name:pkg.Greeter_Activity), TaskList:(Name:taskList, Kind:NORMAL), Input:[]), "+ + "replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList, Kind:NORMAL)") } func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), } task := createWorkflowTask(testEvents, 3, "ReturnPanicWorkflow") @@ -916,10 +916,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() { } func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowPanics() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), } @@ -955,7 +955,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowPanics() { } func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} parentID := "parentID" parentRunID := "parentRun" cronSchedule := "5 4 * * *" @@ -979,7 +979,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { } startedEventAttributes := &s.WorkflowExecutionStartedEventAttributes{ Input: lastCompletionResult, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, ParentWorkflowExecution: parentExecution, CronSchedule: &cronSchedule, ContinuedExecutionRunId: &continuedRunID, @@ -992,7 +992,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { } testEvents := []*s.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, startedEventAttributes), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), } task := createWorkflowTask(testEvents, 3, workflowType) @@ -1015,7 +1015,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { attr := r.Decisions[0].CompleteWorkflowExecutionDecisionAttributes var result WorkflowInfo t.NoError(getDefaultDataConverter().FromData(attr.Result, &result)) - t.EqualValues(taskList, result.TaskListName) + t.EqualValues(taskList.GetName(), result.TaskListName) t.EqualValues(parentID, result.ParentWorkflowExecution.ID) t.EqualValues(parentRunID, result.ParentWorkflowExecution.RunID) t.EqualValues(cronSchedule, *result.CronSchedule) @@ -1030,7 +1030,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { } func (t *TaskHandlersTestSuite) TestConsistentQuery_InvalidQueryTask() { - taskList := "taskList" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} params := workerExecutionParameters{ TaskList: taskList, WorkerOptions: WorkerOptions{ @@ -1057,7 +1057,7 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_InvalidQueryTask() { } func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { - taskList := "tl1" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} checksum1 := "chck1" numberOfSignalsToComplete, err := getDefaultDataConverter().ToData(2) t.NoError(err) @@ -1065,7 +1065,7 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { t.NoError(err) testEvents := []*s.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{ - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, Input: numberOfSignalsToComplete, }), createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{}), @@ -1142,9 +1142,9 @@ func (t *TaskHandlersTestSuite) assertQueryResultsEqual(expected map[string]*s.W func (t *TaskHandlersTestSuite) TestWorkflowTask_CancelActivityBeforeSent() { // Schedule an activity and see if we complete workflow. - taskList := "tl1" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{}), createTestEventDecisionTaskStarted(3), } @@ -1169,9 +1169,9 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_CancelActivityBeforeSent() { func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() { // Schedule a decision activity and see if we complete workflow. - taskList := "tl1" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{}), } task := createWorkflowTask(testEvents, 0, "HelloWorld_Workflow") @@ -1244,7 +1244,7 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() { task := createWorkflowTask(testEvents, 0, "RetryLocalActivityWorkflow") stopCh := make(chan struct{}) params := workerExecutionParameters{ - TaskList: testWorkflowTaskTasklist, + TaskList: &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ Identity: "test-id-1", Logger: t.logger, @@ -1548,7 +1548,7 @@ func (t *TaskHandlersTestSuite) TestRegression_QueriesDoNotLeakGoroutines() { // so a size of 2 actually means a size of 1. SetStickyWorkflowCacheSize(2) - taskList := "tl1" + taskList := &s.TaskList{Name: common.StringPtr("taskList"), Kind: s.TaskListKindNormal.Ptr()} params := workerExecutionParameters{ TaskList: taskList, WorkerOptions: WorkerOptions{ @@ -1562,14 +1562,14 @@ func (t *TaskHandlersTestSuite) TestRegression_QueriesDoNotLeakGoroutines() { // process a throw-away workflow to fill the cache. this is copied from TestWorkflowTask_QueryWorkflow since it's // relatively simple, but any should work fine, as long as it can be queried. testEvents := []*s.HistoryEvent{ - createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), - createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: taskList}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: taskList}), createTestEventDecisionTaskStarted(3), createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}), createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")}, - TaskList: &s.TaskList{Name: &taskList}, + TaskList: taskList, }), } cachedTask := createWorkflowTask(testEvents[0:1], 1, "HelloWorld_Workflow") diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index c2966ff1c..e655ed48d 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -99,7 +99,7 @@ type ( activityTaskPoller struct { basePoller domain string - taskListName string + taskList *s.TaskList identity string service workflowserviceclient.Interface taskHandler ActivityTaskHandler @@ -301,7 +301,7 @@ func newWorkflowTaskPoller( basePoller: basePoller{shutdownC: params.WorkerStopChannel}, service: service, domain: domain, - taskListName: params.TaskList, + taskListName: params.TaskList.GetName(), identity: params.Identity, taskHandler: taskHandler, ldaTunnel: ldaTunnelInterface, @@ -1040,7 +1040,7 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv taskHandler: taskHandler, service: service, domain: domain, - taskListName: params.TaskList, + taskList: params.TaskList, identity: params.Identity, logger: params.Logger, metricsScope: metrics.NewTaggedScope(params.MetricsScope), @@ -1061,7 +1061,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask }) request := &s.PollForActivityTaskRequest{ Domain: common.StringPtr(atp.domain), - TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(atp.taskListName)}), + TaskList: atp.taskList, Identity: common.StringPtr(atp.identity), TaskListMetadata: &s.TaskListMetadata{MaxTasksPerSecond: &atp.activitiesPerSecond}, } @@ -1164,7 +1164,7 @@ func (atp *activityTaskPoller) ProcessTask(task interface{}) error { executionStartTime := time.Now() // Process the activity task. - request, err := atp.taskHandler.Execute(atp.taskListName, activityTask.task) + request, err := atp.taskHandler.Execute(atp.taskList.GetName(), activityTask.task) if err != nil { metricsScope.Counter(metrics.ActivityExecutionFailedCounter).Inc(1) return err diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index 233be56c3..cf2cb4ba1 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -363,7 +363,7 @@ func buildActivityTaskPoller(t *testing.T, shutdown bool) (*activityTaskPoller, shutdownC: shutdownC, }, domain: _testDomainName, - taskListName: _testTaskList, + taskList: &s.TaskList{Name: common.StringPtr(_testTaskList)}, identity: _testIdentity, service: mockService, metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)}, diff --git a/internal/internal_utils.go b/internal/internal_utils.go index e31e23e36..707bffdc4 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -75,7 +75,8 @@ type ( FeatureFlags struct { WorkflowExecutionAlreadyCompletedErrorEnabled bool // Deprecated: use AutoScalerOptions instead - PollerAutoScalerEnabled bool + PollerAutoScalerEnabled bool + EphemeralTaskListsEnabled bool } ) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 11b6e5c1b..06a68462e 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -39,6 +39,7 @@ import ( "sync/atomic" "time" + "go.uber.org/cadence/internal/common" "go.uber.org/cadence/internal/common/debug" "go.uber.org/cadence/internal/common/isolationgroup" @@ -132,7 +133,7 @@ type ( WorkerOptions // Task list name to poll. - TaskList string + TaskList *shared.TaskList // Context to store user provided key/value pairs UserContext context.Context @@ -168,7 +169,7 @@ func ensureRequiredParams(params *workerExecutionParameters) { params.Tracer = opentracing.NoopTracer{} } if params.Identity == "" { - params.Identity = getWorkerIdentity(params.TaskList) + params.Identity = getWorkerIdentity(params.TaskList.GetName()) } if params.Logger == nil { // create default logger if user does not supply one. @@ -381,13 +382,27 @@ func newSessionWorker(service workflowserviceclient.Interface, } sessionEnvironment := newSessionEnvironment(params.SessionResourceID, maxConcurrentSessionExecutionSize) - creationTasklist := getCreationTasklist(params.TaskList) + creationTasklist := getCreationTasklist(params.TaskList.GetName()) params.UserContext = context.WithValue(params.UserContext, sessionEnvironmentContextKey, sessionEnvironment) - params.TaskList = sessionEnvironment.GetResourceSpecificTasklist() + resourceSpecificTasklist := sessionEnvironment.GetResourceSpecificTasklist() + if params.FeatureFlags.EphemeralTaskListsEnabled { + params.TaskList = &shared.TaskList{ + Name: common.StringPtr(resourceSpecificTasklist), + Kind: shared.TaskListKindEphemeral.Ptr(), + } + } else { + params.TaskList = &shared.TaskList{ + Name: common.StringPtr(resourceSpecificTasklist), + Kind: shared.TaskListKindNormal.Ptr(), + } + } activityWorker := newActivityWorker(service, domain, params, overrides, env, nil) params.MaxConcurrentActivityTaskPollers = 1 - params.TaskList = creationTasklist + params.TaskList = &shared.TaskList{ + Name: common.StringPtr(creationTasklist), + Kind: shared.TaskListKindNormal.Ptr(), + } creationWorker := newActivityWorker(service, domain, params, overrides, env, sessionEnvironment.GetTokenBucket()) return &sessionWorker{ @@ -1020,8 +1035,11 @@ func newAggregatedWorker( backgroundActivityContext, backgroundActivityContextCancel := context.WithCancel(ctx) workerParams := workerExecutionParameters{ - WorkerOptions: wOptions, - TaskList: taskList, + WorkerOptions: wOptions, + TaskList: &shared.TaskList{ + Name: common.StringPtr(taskList), + Kind: shared.TaskListKindNormal.Ptr(), + }, UserContext: backgroundActivityContext, UserContextCancel: backgroundActivityContextCancel, } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 051461ae5..a10776af9 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -98,6 +98,7 @@ type ( UpsertSearchAttributes(attributes map[string]interface{}) error GetRegistry() *registry GetWorkflowInterceptors() []WorkflowInterceptorFactory + GetFeatureFlags() FeatureFlags } // WorkflowDefinition wraps the code that can execute a workflow. diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index d0eb8347e..7027407b5 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "go.uber.org/cadence/internal/common" "go.uber.org/cadence/internal/common/testlogger" "github.com/golang/mock/gomock" @@ -179,7 +180,7 @@ func (s *InterfacesTestSuite) TestInterface() { domain := "testDomain" // Workflow execution parameters. workflowExecutionParameters := workerExecutionParameters{ - TaskList: "testTaskList", + TaskList: &m.TaskList{Name: common.StringPtr("testTaskList"), Kind: m.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: 4, MaxConcurrentDecisionTaskPollers: 4, @@ -211,7 +212,7 @@ func (s *InterfacesTestSuite) TestInterface() { // Create activity execution parameters. activityExecutionParameters := workerExecutionParameters{ - TaskList: "testTaskList", + TaskList: &m.TaskList{Name: common.StringPtr("testTaskList"), Kind: m.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: 10, MaxConcurrentDecisionTaskPollers: 10, diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 6f39f1cb5..e66890230 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1074,7 +1074,7 @@ func TestWorkerOptionDefaults(t *testing.T) { require.Nil(t, decisionWorker.executionParameters.ContextPropagators) expected := workerExecutionParameters{ - TaskList: taskList, + TaskList: &shared.TaskList{Name: common.StringPtr(taskList), Kind: shared.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: defaultConcurrentPollRoutineSize, MaxConcurrentDecisionTaskPollers: defaultConcurrentPollRoutineSize, @@ -1140,7 +1140,7 @@ func TestWorkerOptionNonDefaults(t *testing.T) { require.True(t, len(decisionWorker.executionParameters.ContextPropagators) > 0) expected := workerExecutionParameters{ - TaskList: taskList, + TaskList: &shared.TaskList{Name: common.StringPtr(taskList), Kind: shared.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: options.MaxConcurrentActivityTaskPollers, MaxConcurrentDecisionTaskPollers: options.MaxConcurrentDecisionTaskPollers, diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 2557e3599..f82ed95c4 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -23,6 +23,8 @@ package internal import ( "context" + "strings" + "sync" "testing" "time" @@ -93,7 +95,7 @@ func (s *WorkersTestSuite) TestWorkflowWorker() { ctx, cancel := context.WithCancel(context.Background()) executionParameters := workerExecutionParameters{ - TaskList: "testTaskList", + TaskList: &m.TaskList{Name: common.StringPtr("testTaskList"), Kind: m.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ MaxConcurrentDecisionTaskPollers: 5, Logger: logger}, @@ -125,7 +127,7 @@ func (s *WorkersTestSuite) testActivityWorker(useLocallyDispatched bool) { s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptions()...).Return(nil).AnyTimes() executionParameters := workerExecutionParameters{ - TaskList: "testTaskList", + TaskList: &m.TaskList{Name: common.StringPtr("testTaskList"), Kind: m.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: 5, Logger: testlogger.NewZap(s.T())}, @@ -170,7 +172,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { stopC := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) executionParameters := workerExecutionParameters{ - TaskList: "testTaskList", + TaskList: &m.TaskList{Name: common.StringPtr("testTaskList"), Kind: m.TaskListKindNormal.Ptr()}, WorkerOptions: AugmentWorkerOptions( WorkerOptions{ MaxConcurrentActivityTaskPollers: 5, @@ -211,7 +213,7 @@ func (s *WorkersTestSuite) TestPollForDecisionTask_InternalServiceError() { s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions()...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes() executionParameters := workerExecutionParameters{ - TaskList: "testDecisionTaskList", + TaskList: &m.TaskList{Name: common.StringPtr("testDecisionTaskList"), Kind: m.TaskListKindNormal.Ptr()}, WorkerOptions: WorkerOptions{ MaxConcurrentDecisionTaskPollers: 5, Logger: testlogger.NewZap(s.T())}, @@ -892,6 +894,105 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() { s.True(activityCalledCount.Load() > 0) } +func (s *WorkersTestSuite) TestSessionWorker() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + pollCount := atomic.NewInt32(0) + done := make(chan struct{}) + var hostSpecificTl sync.Once + var commonTl sync.Once + s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), callOptions()...).Return(nil, nil).AnyTimes() + s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(func(ctx context.Context, request *m.PollForActivityTaskRequest, opts ...yarpc.CallOption) (*m.PollForActivityTaskResponse, error) { + // The host-specific TL + if strings.Contains(request.TaskList.GetName(), "@") && request.TaskList.GetKind() == m.TaskListKindNormal { + hostSpecificTl.Do(func() { + if pollCount.Add(1) == 2 { + close(done) + } + }) + } + // The common creation TL + if request.TaskList.GetName() == "test-tl__internal_session_creation" && request.TaskList.GetKind() == m.TaskListKindNormal { + commonTl.Do(func() { + if pollCount.Add(1) == 2 { + close(done) + } + }) + } + return &m.PollForActivityTaskResponse{}, nil + }).AnyTimes() + options := WorkerOptions{ + Logger: testlogger.NewZap(s.T()), + DisableActivityWorker: true, + DisableWorkflowWorker: true, + EnableSessionWorker: true, + Identity: "test-worker-identity", + } + worker, err := newAggregatedWorker(s.service, domain, "test-tl", options) + s.NoError(err, "worked to create worker") + + err = worker.Start() + s.NoError(err, "worker failed to start") + defer worker.Stop() + + // Wait for both to complete + select { + case <-ctx.Done(): + s.Fail("Timed out") + case <-done: + } +} + +func (s *WorkersTestSuite) TestSessionWorker_Ephemeral() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + pollCount := atomic.NewInt32(0) + done := make(chan struct{}) + var hostSpecificTl sync.Once + var commonTl sync.Once + s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), callOptions()...).Return(nil, nil).AnyTimes() + s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(func(ctx context.Context, request *m.PollForActivityTaskRequest, opts ...yarpc.CallOption) (*m.PollForActivityTaskResponse, error) { + // The host-specific TL is a random UUID and the + if strings.Contains(request.TaskList.GetName(), "@") && request.TaskList.GetKind() == m.TaskListKindEphemeral { + hostSpecificTl.Do(func() { + if pollCount.Add(1) == 2 { + close(done) + } + }) + } + // The common creation TL + if request.TaskList.GetName() == "test-tl__internal_session_creation" && request.TaskList.GetKind() == m.TaskListKindNormal { + commonTl.Do(func() { + if pollCount.Add(1) == 2 { + close(done) + } + }) + } + return &m.PollForActivityTaskResponse{}, nil + }).AnyTimes() + options := WorkerOptions{ + Logger: testlogger.NewZap(s.T()), + DisableActivityWorker: true, + DisableWorkflowWorker: true, + EnableSessionWorker: true, + Identity: "test-worker-identity", + FeatureFlags: FeatureFlags{EphemeralTaskListsEnabled: true}, + } + worker, err := newAggregatedWorker(s.service, domain, "test-tl", options) + s.NoError(err, "worked to create worker") + + err = worker.Start() + s.NoError(err, "worker failed to start") + defer worker.Stop() + + // Wait for both to complete + select { + case <-ctx.Done(): + s.Fail("Timed out") + case <-done: + } +} + // wait for test to complete - timeout and fail after 10 seconds to not block execution of other tests func startWorkerAndWait(s *WorkersTestSuite, worker *aggregatedWorker, doneCh *chan struct{}) { s.T().Helper() diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 997f440a4..f951719e0 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -327,6 +327,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist env.service = mockService + env.workerOptions.FeatureFlags = FeatureFlags{EphemeralTaskListsEnabled: true} if env.workerOptions.Logger == nil { env.workerOptions.Logger = env.logger } @@ -1634,7 +1635,7 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskList stri wOptions.DataConverter = dataConverter params := workerExecutionParameters{ WorkerOptions: wOptions, - TaskList: taskList, + TaskList: &shared.TaskList{Name: common.StringPtr(taskList), Kind: shared.TaskListKindNormal.Ptr()}, UserContext: wOptions.BackgroundActivityContext, WorkerStopChannel: env.workerStopChannel, } @@ -2166,6 +2167,10 @@ func (env *testWorkflowEnvironmentImpl) GetWorkflowInterceptors() []WorkflowInte return env.workflowInterceptors } +func (env *testWorkflowEnvironmentImpl) GetFeatureFlags() FeatureFlags { + return env.workerOptions.FeatureFlags +} + func newTestSessionEnvironment(testWorkflowEnvironment *testWorkflowEnvironmentImpl, params *workerExecutionParameters, concurrentSessionExecutionSize int) *testSessionEnvironmentImpl { resourceID := params.SessionResourceID diff --git a/internal/workflow.go b/internal/workflow.go index 51313a885..5a74949d6 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -772,8 +772,12 @@ func (wc *workflowEnvironmentInterceptor) ExecuteActivity(ctx Context, typeName // Use session tasklist oldTaskListName := options.TaskListName options.TaskListName = sessionInfo.tasklist + if wc.env.GetFeatureFlags().EphemeralTaskListsEnabled { + options.TaskListKind = s.TaskListKindEphemeral + } defer func() { options.TaskListName = oldTaskListName + options.TaskListKind = s.TaskListKindNormal }() } } diff --git a/internal/workflow_replayer.go b/internal/workflow_replayer.go index f64d6a89c..6db1de6d7 100644 --- a/internal/workflow_replayer.go +++ b/internal/workflow_replayer.go @@ -324,7 +324,10 @@ func (r *WorkflowReplayer) replayWorkflowHistory( Logger: logger, DisableStickyExecution: true, }, - TaskList: replayTaskListName, + TaskList: &shared.TaskList{ + Name: common.StringPtr(replayTaskListName), + Kind: shared.TaskListKindNormal.Ptr(), + }, } metricScope := tally.NoopScope diff --git a/internal/workflow_replayer_test.go b/internal/workflow_replayer_test.go index 98dbf1dba..6818ea5d4 100644 --- a/internal/workflow_replayer_test.go +++ b/internal/workflow_replayer_test.go @@ -46,7 +46,7 @@ type workflowReplayerSuite struct { } var ( - testTaskList = "taskList" + testTaskList = &shared.TaskList{Name: common.StringPtr("testTaskList"), Kind: shared.TaskListKindNormal.Ptr()} ) func TestWorkflowReplayerSuite(t *testing.T) { @@ -311,7 +311,7 @@ func getTestReplayWorkflowFullHistory(t *testing.T) *shared.History { Events: []*shared.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{ WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflow")}, - TaskList: &shared.TaskList{Name: common.StringPtr(testTaskList)}, + TaskList: testTaskList, Input: testEncodeFunctionArgs(t, getDefaultDataConverter()), }), createTestEventDecisionTaskScheduled(2, &shared.DecisionTaskScheduledEventAttributes{}), @@ -320,7 +320,7 @@ func getTestReplayWorkflowFullHistory(t *testing.T) *shared.History { createTestEventActivityTaskScheduled(5, &shared.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &shared.ActivityType{Name: common.StringPtr("testActivity")}, - TaskList: &shared.TaskList{Name: &testTaskList}, + TaskList: testTaskList, }), createTestEventActivityTaskStarted(6, &shared.ActivityTaskStartedEventAttributes{ ScheduledEventId: common.Int64Ptr(5), @@ -347,7 +347,7 @@ func getTestReplayWorkflowPartialHistoryWithDecisionEvents(t *testing.T) *shared Events: []*shared.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{ WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflow")}, - TaskList: &shared.TaskList{Name: common.StringPtr(testTaskList)}, + TaskList: testTaskList, Input: testEncodeFunctionArgs(t, getDefaultDataConverter()), }), createTestEventDecisionTaskScheduled(2, &shared.DecisionTaskScheduledEventAttributes{}), @@ -356,7 +356,7 @@ func getTestReplayWorkflowPartialHistoryWithDecisionEvents(t *testing.T) *shared createTestEventActivityTaskScheduled(5, &shared.ActivityTaskScheduledEventAttributes{ ActivityId: common.StringPtr("0"), ActivityType: &shared.ActivityType{Name: common.StringPtr("testActivity-fm")}, - TaskList: &shared.TaskList{Name: &testTaskList}, + TaskList: testTaskList, }), }, } @@ -367,7 +367,7 @@ func getTestReplayWorkflowPartialHistoryNoDecisionEvents(t *testing.T) *shared.H Events: []*shared.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{ WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflow")}, - TaskList: &shared.TaskList{Name: common.StringPtr(testTaskList)}, + TaskList: testTaskList, Input: testEncodeFunctionArgs(t, getDefaultDataConverter()), }), createTestEventDecisionTaskScheduled(2, &shared.DecisionTaskScheduledEventAttributes{}), @@ -404,7 +404,7 @@ func getTestReplayWorkflowLocalActivityHistory(t *testing.T) *shared.History { Events: []*shared.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{ WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflowLocalActivity")}, - TaskList: &shared.TaskList{Name: common.StringPtr(testTaskList)}, + TaskList: testTaskList, Input: testEncodeFunctionArgs(t, getDefaultDataConverter()), }), createTestEventDecisionTaskScheduled(2, &shared.DecisionTaskScheduledEventAttributes{}), @@ -429,7 +429,7 @@ func getTestReplayWorkflowLocalActivityResultMismatchHistory(t *testing.T) *shar Events: []*shared.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{ WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflowLocalActivity")}, - TaskList: &shared.TaskList{Name: common.StringPtr(testTaskList)}, + TaskList: testTaskList, Input: testEncodeFunctionArgs(t, getDefaultDataConverter()), }), createTestEventDecisionTaskScheduled(2, &shared.DecisionTaskScheduledEventAttributes{}), @@ -455,7 +455,7 @@ func getTestReplayWorkflowLocalActivityTypeMismatchHistory(t *testing.T) *shared Events: []*shared.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &shared.WorkflowExecutionStartedEventAttributes{ WorkflowType: &shared.WorkflowType{Name: common.StringPtr("go.uber.org/cadence/internal.testReplayWorkflowLocalActivity")}, - TaskList: &shared.TaskList{Name: common.StringPtr(testTaskList)}, + TaskList: testTaskList, Input: testEncodeFunctionArgs(t, getDefaultDataConverter()), }), createTestEventDecisionTaskScheduled(2, &shared.DecisionTaskScheduledEventAttributes{}), diff --git a/internal/workflow_shadower_worker.go b/internal/workflow_shadower_worker.go index 5669dcfd9..6647684b8 100644 --- a/internal/workflow_shadower_worker.go +++ b/internal/workflow_shadower_worker.go @@ -72,12 +72,15 @@ func newShadowWorker( replayer.registry = registry ensureRequiredParams(¶ms) - if len(params.TaskList) != 0 { + if params.TaskList.GetName() != "" { // include domain name in tasklist to avoid confliction // since all shadow workflow will be run in a single system domain - params.TaskList = generateShadowTaskList(domain, params.TaskList) - params.MetricsScope = tagScope(params.MetricsScope, tagTaskList, params.TaskList) - params.Logger = params.Logger.With(zap.String(tagTaskList, params.TaskList)) + params.TaskList = &shared.TaskList{ + Name: common.StringPtr(generateShadowTaskList(domain, params.TaskList.GetName())), + Kind: params.TaskList.GetKind().Ptr(), + } + params.MetricsScope = tagScope(params.MetricsScope, tagTaskList, params.TaskList.GetName()) + params.Logger = params.Logger.With(zap.String(tagTaskList, params.TaskList.GetName())) } params.UserContext = context.WithValue(params.UserContext, serviceClientContextKey, service) @@ -105,7 +108,7 @@ func newShadowWorker( service: service, domain: domain, - taskList: params.TaskList, + taskList: params.TaskList.GetName(), options: shadowOptions, logger: params.Logger, featureFlags: params.FeatureFlags, diff --git a/internal/workflow_shadower_worker_test.go b/internal/workflow_shadower_worker_test.go index bb55ee1a5..52e36e3ff 100644 --- a/internal/workflow_shadower_worker_test.go +++ b/internal/workflow_shadower_worker_test.go @@ -90,7 +90,7 @@ func (s *shadowWorkerSuite) TestNewShadowWorker() { s.True(ok) taskList := shadowWorker.activityWorker.executionParameters.TaskList - s.Contains(taskList, testDomain) + s.Contains(taskList.GetName(), testDomain) } func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_InvalidShadowOption() { @@ -228,7 +228,7 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Succeed() { var workflowParams shadower.WorkflowParams getDefaultDataConverter().FromData(startRequest.Input, &workflowParams) s.Equal(testDomain, workflowParams.GetDomain()) - s.Equal(generateShadowTaskList(testDomain, testTaskList), workflowParams.GetTaskList()) + s.Equal(generateShadowTaskList(testDomain, testTaskList.GetName()), workflowParams.GetTaskList()) s.Equal(workflowQuery, workflowParams.GetWorkflowQuery()) s.Equal(samplingRate, workflowParams.GetSamplingRate()) s.Equal(shadowMode.toThriftPtr(), workflowParams.ShadowMode) diff --git a/test/activity_test.go b/test/activity_test.go index 3602f08e6..2a12cf4cb 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -157,6 +157,10 @@ func (a *Activities) InspectActivitySpan(ctx context.Context) (map[string]string return carrier, err } +func (a *Activities) GetTaskList(ctx context.Context) (string, error) { + return activity.GetInfo(ctx).TaskList, nil +} + func (a *Activities) register(worker worker.Worker) { // Kept to verify backward compatibility of activity registration. activity.RegisterWithOptions(a, activity.RegisterOptions{Name: "Activities_", DisableAlreadyRegisteredCheck: true}) diff --git a/test/integration_test.go b/test/integration_test.go index c1e6dcfb3..a5510cf06 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -161,6 +161,7 @@ func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) { Logger: zaptest.NewLogger(ts.T()), WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptorFactory{ts.tracer}, ContextPropagators: []workflow.ContextPropagator{NewStringMapPropagator([]string{testContextKey})}, + EnableSessionWorker: true, } if testName == "TestNonDeterministicWorkflowQuery" || testName == "TestNonDeterministicWorkflowFailPolicy" { @@ -169,6 +170,11 @@ func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) { // disable sticky executon so each workflow yield will require rerunning it from beginning options.DisableStickyExecution = true } + if testName == "TestSession_Ephemeral" { + options.FeatureFlags = internal.FeatureFlags{ + EphemeralTaskListsEnabled: true, + } + } ts.worker = worker.New(ts.rpcClient.Interface, domainName, ts.taskListName, options) ts.registerWorkflowsAndActivities(ts.worker) @@ -583,6 +589,31 @@ func (ts *IntegrationTestSuite) TestOverrideSpanContext() { ts.Equal("some-value", result["mockpfx-baggage-some-key"]) } +func (ts *IntegrationTestSuite) TestSession() { + var result string + _, err := ts.executeWorkflow("test-session", ts.workflows.Session, &result) + ts.NoError(err) + tl := ts.describeTaskList(result) + ts.NotNil(tl) + ts.Equal(shared.TaskListKindNormal.Ptr(), tl.Kind) +} + +func (ts *IntegrationTestSuite) TestSession_Ephemeral() { + // Ephemeral TaskList is enabled by the test name + var result string + _, err := ts.executeWorkflow("test-session-ephemeral", ts.workflows.Session, &result) + ts.NoError(err) + tl := ts.describeTaskList(result) + ts.NotNil(tl) + ts.Equal(shared.TaskListKindEphemeral.Ptr(), tl.Kind) +} + +func (ts *IntegrationTestSuite) describeTaskList(taskListName string) *shared.TaskList { + descResp, err := ts.libClient.DescribeTaskList(context.Background(), taskListName, shared.TaskListTypeActivity) + ts.Require().NoError(err) + return descResp.GetTaskList() +} + // TestVersionedWorkflowV1 tests that a workflow started on the worker with VersionedWorkflowV1 // can be replayed on worker with VersionedWorkflowV2 and VersionedWorkflowV3, // but not on VersionedWorkflowV4, VersionedWorkflowV5, VersionedWorkflowV6. diff --git a/test/workflow_test.go b/test/workflow_test.go index e53403477..6ff9780dd 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -659,6 +659,25 @@ func (w *Workflows) OverrideSpanContext(ctx workflow.Context) (map[string]string return res, err } +func (w *Workflows) Session(ctx workflow.Context) (string, error) { + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + + so := &workflow.SessionOptions{ + CreationTimeout: time.Second * 10, + ExecutionTimeout: time.Second * 10, + } + sessionCtx, err := workflow.CreateSession(ctx, so) + if err != nil { + return "", err + } + defer workflow.CompleteSession(sessionCtx) + var result string + if err = workflow.ExecuteActivity(sessionCtx, "Activities_GetTaskList").Get(ctx, &result); err != nil { + return "", err + } + return result, nil +} + func (w *Workflows) register(worker worker.Worker) { // Kept to verify backward compatibility of workflow registration. workflow.RegisterWithOptions(w.Basic, workflow.RegisterOptions{DisableAlreadyRegisteredCheck: true}) @@ -689,6 +708,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.WorkflowWithLocalActivityCtxPropagation) worker.RegisterWorkflow(w.NonDeterminismSimulatorWorkflow) worker.RegisterWorkflow(w.OverrideSpanContext) + worker.RegisterWorkflow(w.Session) }