From 557f7ac4a40d57578bf9fd6ee41b3f280691f9b2 Mon Sep 17 00:00:00 2001 From: Nikolay Tretyak Date: Tue, 12 Nov 2024 10:21:47 +0100 Subject: [PATCH] Don't heartbeat if poller is stopped --- internal/internal_task_pollers.go | 3 + internal/internal_workers_test.go | 92 +++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index a59600638..e4e6bcee6 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -415,6 +415,9 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) (retErr e wfctx, func(response interface{}, startTime time.Time) (*workflowTask, error) { wtp.logger.Debug("Force RespondWorkflowTaskCompleted.", "TaskStartedEventID", task.task.GetStartedEventId()) + if wtp.stopping() { + return nil, errStop + } heartbeatResponse, err := wtp.RespondTaskCompletedWithMetrics(response, nil, task.task, startTime) if err != nil { return nil, err diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 5622e0ca3..65d7b551c 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -591,6 +591,98 @@ func (s *WorkersTestSuite) TestLongRunningWorkflowTask() { s.Equal(2, localActivityCalledCount) } +func (s *WorkersTestSuite) TestNoHeartbeatsAfterStop() { + stopCh := make(chan struct{}) + localActivityStop := func() error { + close(stopCh) + return nil + } + localActivitySleep := func(duration time.Duration) error { + time.Sleep(duration) + return nil + } + + longWorkflowTaskWorkflowFn := func(ctx Context, input []byte) error { + lao := LocalActivityOptions{ + ScheduleToCloseTimeout: time.Second * 2, + } + ctx = WithLocalActivityOptions(ctx, lao) + _ = ExecuteLocalActivity(ctx, localActivityStop).Get(ctx, nil) + for { + _ = ExecuteLocalActivity(ctx, localActivitySleep, time.Second).Get(ctx, nil) + } + } + + startToCloseTimeout := 5 * time.Second + taskQueue := "long-running-workflow-task-tq" + testEvents := []*historypb.HistoryEvent{ + { + EventId: 1, + EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{WorkflowExecutionStartedEventAttributes: &historypb.WorkflowExecutionStartedEventAttributes{ + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + WorkflowExecutionTimeout: durationpb.New(10 * time.Second), + WorkflowRunTimeout: durationpb.New(10 * time.Second), + WorkflowTaskTimeout: durationpb.New(2 * time.Second), + WorkflowType: &commonpb.WorkflowType{Name: "long-running-workflow-task-workflow-type"}, + }}, + }, + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{ + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(startToCloseTimeout)}, + ), + createTestEventWorkflowTaskStarted(3), + } + + s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + task := &workflowservice.PollWorkflowTaskQueueResponse{ + TaskToken: []byte("test-token"), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: "long-running-workflow-task-workflow-id", + RunId: "long-running-workflow-task-workflow-run-id", + }, + WorkflowType: &commonpb.WorkflowType{ + Name: "long-running-workflow-task-workflow-type", + }, + PreviousStartedEventId: 0, + StartedEventId: 3, + History: &historypb.History{Events: testEvents}, + NextPageToken: nil, + } + s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, serviceerror.NewInvalidArgument("")).Times(1) + s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(task, nil).Times(1) + s.service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollWorkflowTaskQueueResponse{}, serviceerror.NewInternal("")).AnyTimes() + s.service.EXPECT().PollActivityTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.PollActivityTaskQueueResponse{}, nil).AnyTimes() + + s.service.EXPECT().ShutdownWorker(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.ShutdownWorkerResponse{}, nil).Times(1) + + clientOptions := ClientOptions{ + Identity: "test-worker-identity", + } + + client := NewServiceClient(s.service, nil, clientOptions) + worker := NewAggregatedWorker(client, taskQueue, WorkerOptions{}) + worker.RegisterWorkflowWithOptions( + longWorkflowTaskWorkflowFn, + RegisterWorkflowOptions{Name: "long-running-workflow-task-workflow-type"}, + ) + worker.RegisterActivity(localActivitySleep) + worker.RegisterActivity(localActivityStop) + + _ = worker.Start() + select { + case <-stopCh: + break + case <-time.After(time.Second * 4): + s.Fail("local activity stop function not called") + } + worker.Stop() + + // RespondWorkflowTaskCompleted is not mocked, so the test will fail if the worker tries to heartbeat + time.Sleep(startToCloseTimeout) +} + func (s *WorkersTestSuite) TestMultipleLocalActivities() { localActivityCalledCount := 0 localActivitySleep := func(duration time.Duration) error {