Skip to content

Commit f458e8a

Browse files
authored
Improve Nexus cancellation type test assertions (#2016)
* Improve Nexus cancellation type test assertions * Remove sleeps * reduce flakes
1 parent 573d6de commit f458e8a

File tree

1 file changed

+59
-24
lines changed

1 file changed

+59
-24
lines changed

test/nexus_test.go

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"os"
1010
"slices"
1111
"strings"
12-
"sync/atomic"
1312
"testing"
1413
"time"
1514

@@ -1061,24 +1060,50 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
10611060
})
10621061
}
10631062

1064-
func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationType workflow.NexusOperationCancellationType, t *testing.T) (client.WorkflowRun, string, time.Time) {
1063+
// cancelTypeOp is a wrapper for a workflow run operation that delays responding to the cancel request so that time
1064+
// based assertions aren't flakey.
1065+
type cancelTypeOp struct {
1066+
nexus.UnimplementedOperation[string, string]
1067+
workflowRunOp nexus.Operation[string, string]
1068+
unblockCancelCh chan struct{}
1069+
}
1070+
1071+
func (o *cancelTypeOp) Name() string {
1072+
return o.workflowRunOp.Name()
1073+
}
1074+
1075+
func (o *cancelTypeOp) Start(ctx context.Context, input string, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[string], error) {
1076+
return o.workflowRunOp.Start(ctx, input, options)
1077+
}
1078+
1079+
func (o *cancelTypeOp) Cancel(ctx context.Context, token string, options nexus.CancelOperationOptions) error {
1080+
if o.unblockCancelCh != nil {
1081+
// Should only be non-nil in the TRY_CANCEL case.
1082+
<-o.unblockCancelCh
1083+
}
1084+
return o.workflowRunOp.Cancel(ctx, token, options)
1085+
}
1086+
1087+
func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationType workflow.NexusOperationCancellationType, unblockCancelCh chan struct{}, t *testing.T) (client.WorkflowRun, string, time.Time) {
10651088
handlerWf := func(ctx workflow.Context, ownID string) (string, error) {
10661089
err := workflow.Await(ctx, func() bool { return false })
10671090
// Delay completion after receiving cancellation so that assertions on end time aren't flakey.
10681091
disconCtx, _ := workflow.NewDisconnectedContext(ctx)
1069-
_ = workflow.Sleep(disconCtx, time.Second)
1092+
workflow.GetSignalChannel(disconCtx, "unblock").Receive(disconCtx, nil)
10701093
return "", err
10711094
}
10721095

1073-
handlerID := atomic.Value{}
1074-
op := temporalnexus.NewWorkflowRunOperation(
1075-
"workflow-op",
1076-
handlerWf,
1077-
func(ctx context.Context, _ string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
1078-
handlerID.Store(soo.RequestID)
1079-
return client.StartWorkflowOptions{ID: soo.RequestID}, nil
1080-
},
1081-
)
1096+
handlerID := uuid.NewString()
1097+
op := &cancelTypeOp{
1098+
unblockCancelCh: unblockCancelCh,
1099+
workflowRunOp: temporalnexus.NewWorkflowRunOperation(
1100+
"workflow-op",
1101+
handlerWf,
1102+
func(ctx context.Context, _ string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
1103+
return client.StartWorkflowOptions{ID: handlerID}, nil
1104+
},
1105+
),
1106+
}
10821107

10831108
var unblockedTime time.Time
10841109
callerWf := func(ctx workflow.Context, cancellation workflow.NexusOperationCancellationType) error {
@@ -1091,13 +1116,16 @@ func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationT
10911116
return err
10921117
}
10931118

1119+
disconCtx, _ := workflow.NewDisconnectedContext(ctx) // Use disconnected ctx so it is not auto canceled.
10941120
if cancellation == workflow.NexusOperationCancellationTypeTryCancel || cancellation == workflow.NexusOperationCancellationTypeWaitRequested {
1095-
disconCtx, _ := workflow.NewDisconnectedContext(ctx) // Use disconnected ctx so it is not auto canceled.
10961121
workflow.Go(disconCtx, func(ctx workflow.Context) {
10971122
// Wake up the caller so it is not waiting for the operation to complete to get the next WFT.
10981123
_ = workflow.Sleep(ctx, time.Millisecond)
10991124
})
11001125
}
1126+
if cancellation == workflow.NexusOperationCancellationTypeWaitCompleted {
1127+
_ = workflow.SignalExternalWorkflow(disconCtx, handlerID, "", "unblock", nil).Get(disconCtx, nil)
1128+
}
11011129

11021130
_ = fut.Get(ctx, nil)
11031131
unblockedTime = workflow.Now(ctx).UTC()
@@ -1119,11 +1147,7 @@ func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationT
11191147
}, callerWf, cancellationType)
11201148
require.NoError(t, err)
11211149
require.Eventuallyf(t, func() bool {
1122-
id := handlerID.Load()
1123-
if id == nil {
1124-
return false
1125-
}
1126-
_, descErr := tc.client.DescribeWorkflow(ctx, id.(string), "")
1150+
_, descErr := tc.client.DescribeWorkflow(ctx, handlerID, "")
11271151
return descErr == nil
11281152
}, 2*time.Second, 20*time.Millisecond, "timed out waiting for handler wf to start")
11291153
require.NoError(t, tc.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID()))
@@ -1135,7 +1159,15 @@ func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationT
11351159
var canceledErr *temporal.CanceledError
11361160
require.ErrorAs(t, err, &canceledErr)
11371161

1138-
return run, handlerID.Load().(string), unblockedTime
1162+
if unblockCancelCh != nil {
1163+
// Should only be non-nil in the TRY_CANCEL case.
1164+
close(unblockCancelCh)
1165+
}
1166+
if cancellationType != workflow.NexusOperationCancellationTypeWaitCompleted {
1167+
require.NoError(t, tc.client.SignalWorkflow(ctx, handlerID, "", "unblock", nil))
1168+
}
1169+
1170+
return run, handlerID, unblockedTime
11391171
}
11401172

11411173
func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) {
@@ -1148,7 +1180,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) {
11481180
defer cancel()
11491181
tc := newTestContext(t, ctx)
11501182

1151-
callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeAbandon, t)
1183+
callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeAbandon, nil, t)
11521184
require.NotZero(t, unblockedTime)
11531185

11541186
// Verify that caller never sent a cancellation request.
@@ -1172,7 +1204,8 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) {
11721204
ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout)
11731205
defer cancel()
11741206
tc := newTestContext(t, ctx)
1175-
callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeTryCancel, t)
1207+
unblockCancelCh := make(chan struct{})
1208+
callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeTryCancel, unblockCancelCh, t)
11761209

11771210
// Verify operation future was unblocked after cancel command was recorded.
11781211
callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
@@ -1185,6 +1218,8 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) {
11851218
foundRequestedEvent = true
11861219
require.Greater(t, unblockedTime, event.EventTime.AsTime().UTC())
11871220
}
1221+
require.NotEqual(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED, event.EventType)
1222+
require.NotEqual(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED, event.EventType)
11881223
callerCloseEvent = event
11891224
}
11901225
require.True(t, foundRequestedEvent)
@@ -1204,7 +1239,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) {
12041239
ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout)
12051240
defer cancel()
12061241
tc := newTestContext(t, ctx)
1207-
callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitRequested, t)
1242+
callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitRequested, nil, t)
12081243

12091244
// Verify operation future was unblocked after cancel request was delivered.
12101245
callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
@@ -1236,7 +1271,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) {
12361271
ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout)
12371272
defer cancel()
12381273
tc := newTestContext(t, ctx)
1239-
callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitCompleted, t)
1274+
callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitCompleted, nil, t)
12401275

12411276
// Verify operation future was unblocked after operation was cancelled.
12421277
callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
@@ -1247,7 +1282,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) {
12471282
require.NoError(t, err)
12481283
if event.EventType == enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCELED {
12491284
foundCancelledEvent = true
1250-
require.Greater(t, unblockedTime, event.EventTime.AsTime().UTC())
1285+
require.GreaterOrEqual(t, unblockedTime, event.EventTime.AsTime().UTC())
12511286
}
12521287
callerCloseEvent = event
12531288
}

0 commit comments

Comments
 (0)