diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java index 49387639d..2d39b7ca0 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java @@ -158,9 +158,21 @@ static WorkflowClient newInstance(WorkflowServiceStubs service, WorkflowClientOp * @param workflowId Workflow id. * @param runId Run id of the workflow execution. * @return Stub that implements workflowInterface and can be used to signal, update, or query it. + * @deprecated Use {@link #newWorkflowStub(Class, WorkflowTargetOptions)} instead. */ + @Deprecated T newWorkflowStub(Class workflowInterface, String workflowId, Optional runId); + /** + * Creates workflow client stub for a known execution. Use it to send signals or queries to a + * running workflow. Do not call methods annotated with @WorkflowMethod. + * + * @param workflowInterface interface that given workflow implements. + * @param workflowTargetOptions options that specify target workflow execution. + * @return Stub that implements workflowInterface and can be used to signal or query it. + */ + T newWorkflowStub(Class workflowInterface, WorkflowTargetOptions workflowTargetOptions); + /** * Creates workflow untyped client stub that can be used to start a single workflow execution. Use * it to send signals or queries to a running workflow. Do not call methods annotated @@ -191,7 +203,9 @@ static WorkflowClient newInstance(WorkflowServiceStubs service, WorkflowClientOp * workflowId is assumed. * @param workflowType type of the workflow. Optional as it is used for error reporting only. * @return Stub that can be used to start workflow and later to signal or query it. + * @deprecated Use {@link #newUntypedWorkflowStub(WorkflowTargetOptions, Optional)} instead. */ + @Deprecated WorkflowStub newUntypedWorkflowStub( String workflowId, Optional runId, Optional workflowType); @@ -202,9 +216,30 @@ WorkflowStub newUntypedWorkflowStub( * @param execution workflow id and optional run id for execution * @param workflowType type of the workflow. Optional as it is used for error reporting only. * @return Stub that can be used to start workflow and later to signal or query it. + * @deprecated Use {@link #newUntypedWorkflowStub(WorkflowTargetOptions, Optional)} instead. */ WorkflowStub newUntypedWorkflowStub(WorkflowExecution execution, Optional workflowType); + /** + * Creates workflow untyped client stub for a known execution. Use it to send signals or queries + * to a running workflow. Do not call methods annotated with @WorkflowMethod. + * + * @param workflowTargetOptions options that specify target workflow execution. + * @return Stub that can be used to start workflow and later to signal or query it. + */ + WorkflowStub newUntypedWorkflowStub(WorkflowTargetOptions workflowTargetOptions); + + /** + * Creates workflow untyped client stub for a known execution. Use it to send signals or queries + * to a running workflow. Do not call methods annotated with @WorkflowMethod. + * + * @param workflowTargetOptions options that specify target workflow execution. + * @param workflowType type of the workflow. Optional as it is used for error reporting only. + * @return Stub that can be used to start workflow and later to signal or query it. + */ + WorkflowStub newUntypedWorkflowStub( + WorkflowTargetOptions workflowTargetOptions, Optional workflowType); + /** * Creates new {@link ActivityCompletionClient} that can be used to complete activities * asynchronously. Only relevant for activity implementations that called {@link diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index c9aa37326..9b9a36897 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -149,24 +149,51 @@ public T newWorkflowStub(Class workflowInterface, String workflowId) { return newWorkflowStub(workflowInterface, workflowId, Optional.empty()); } + public T newWorkflowStub( + Class workflowInterface, WorkflowTargetOptions workflowTargetOptions) { + return newWorkflowStub(workflowInterface, workflowTargetOptions, false); + } + @Override + @SuppressWarnings("deprecation") public T newWorkflowStub( Class workflowInterface, String workflowId, Optional runId) { + return newWorkflowStub( + workflowInterface, + WorkflowTargetOptions.newBuilder() + .setWorkflowId(workflowId) + .setRunId(runId.orElse(null)) + .build(), + true); + } + + public T newWorkflowStub( + Class workflowInterface, + WorkflowTargetOptions workflowTargetOptions, + boolean legacyTargeting) { checkAnnotation( workflowInterface, WorkflowMethod.class, QueryMethod.class, SignalMethod.class, UpdateMethod.class); - if (Strings.isNullOrEmpty(workflowId)) { + if (Strings.isNullOrEmpty(workflowTargetOptions.getWorkflowId())) { throw new IllegalArgumentException("workflowId is null or empty"); } - WorkflowExecution execution = - WorkflowExecution.newBuilder().setWorkflowId(workflowId).setRunId(runId.orElse("")).build(); + WorkflowExecution.Builder execution = + WorkflowExecution.newBuilder().setWorkflowId(workflowTargetOptions.getWorkflowId()); + if (!Strings.isNullOrEmpty(workflowTargetOptions.getRunId())) { + execution.setRunId(workflowTargetOptions.getRunId()); + } WorkflowInvocationHandler invocationHandler = new WorkflowInvocationHandler( - workflowInterface, this.getOptions(), workflowClientCallsInvoker, execution); + workflowInterface, + this.getOptions(), + workflowClientCallsInvoker, + execution.build(), + legacyTargeting, + workflowTargetOptions.getFirstExecutionRunId()); @SuppressWarnings("unchecked") T result = (T) @@ -194,6 +221,7 @@ public WorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions } @Override + @SuppressWarnings("deprecation") public WorkflowStub newUntypedWorkflowStub( String workflowId, Optional runId, Optional workflowType) { WorkflowExecution execution = @@ -205,10 +233,46 @@ public WorkflowStub newUntypedWorkflowStub( @SuppressWarnings("deprecation") public WorkflowStub newUntypedWorkflowStub( WorkflowExecution execution, Optional workflowType) { + return newUntypedWorkflowStub( + workflowType, + true, + WorkflowTargetOptions.newBuilder() + .setWorkflowId(execution.getWorkflowId()) + .setRunId(execution.getRunId()) + .build()); + } + + @Override + public WorkflowStub newUntypedWorkflowStub(WorkflowTargetOptions workflowTargetOptions) { + return newUntypedWorkflowStub(Optional.empty(), false, workflowTargetOptions); + } + + @Override + public WorkflowStub newUntypedWorkflowStub( + WorkflowTargetOptions workflowTargetOptions, Optional workflowType) { + return newUntypedWorkflowStub(workflowType, false, workflowTargetOptions); + } + + @SuppressWarnings("deprecation") + WorkflowStub newUntypedWorkflowStub( + Optional workflowType, + boolean legacyTargeting, + WorkflowTargetOptions workflowTargetOptions) { + WorkflowExecution.Builder execution = + WorkflowExecution.newBuilder().setWorkflowId(workflowTargetOptions.getWorkflowId()); + if (!Strings.isNullOrEmpty(workflowTargetOptions.getRunId())) { + execution.setRunId(workflowTargetOptions.getRunId()); + } WorkflowStub result = - new WorkflowStubImpl(options, workflowClientCallsInvoker, workflowType, execution); + new WorkflowStubImpl( + options, + workflowClientCallsInvoker, + workflowType, + execution.build(), + legacyTargeting, + workflowTargetOptions.getFirstExecutionRunId()); for (WorkflowClientInterceptor i : interceptors) { - result = i.newUntypedWorkflowStub(execution, workflowType, result); + result = i.newUntypedWorkflowStub(execution.build(), workflowType, result); } return result; } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java index 0da7a835c..cd14942a6 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java @@ -20,6 +20,7 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.*; +import javax.annotation.Nullable; /** * Dynamic implementation of a strongly typed workflow interface that can be used to start, signal @@ -107,11 +108,19 @@ static void closeAsyncInvocation() { Class workflowInterface, WorkflowClientOptions clientOptions, WorkflowClientCallsInterceptor workflowClientCallsInvoker, - WorkflowExecution execution) { + WorkflowExecution execution, + boolean legacyTargeting, + @Nullable String firstExecutionRunId) { workflowMetadata = POJOWorkflowInterfaceMetadata.newInstance(workflowInterface, false); Optional workflowType = workflowMetadata.getWorkflowType(); WorkflowStub stub = - new WorkflowStubImpl(clientOptions, workflowClientCallsInvoker, workflowType, execution); + new WorkflowStubImpl( + clientOptions, + workflowClientCallsInvoker, + workflowType, + execution, + legacyTargeting, + firstExecutionRunId); for (WorkflowClientInterceptor i : clientOptions.getInterceptors()) { stub = i.newUntypedWorkflowStub(execution, workflowType, stub); } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java index ad41605e5..36b3ad819 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java @@ -160,10 +160,25 @@ WorkflowUpdateHandle startUpdateWithStart( R executeUpdateWithStart( UpdateOptions updateOptions, Object[] updateArgs, Object[] startArgs); + /** + * Sends a signal to a workflow, starting the workflow if it is not already running. + * + * @param signalName name of the signal handler. Usually it is a method name. + * @param signalArgs signal method arguments + * @param startArgs workflow start arguments + * @return workflow execution + */ WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs); + /** + * @return workflow type name if it was provided when the stub was created. + */ Optional getWorkflowType(); + /** + * @return current workflow execution. Returns null if the workflow has not been started yet. + */ + @Nullable WorkflowExecution getExecution(); /** @@ -406,6 +421,9 @@ CompletableFuture getResultAsync( */ WorkflowExecutionDescription describe(); + /** + * @return workflow options if they were provided when the stub was created. + */ Optional getOptions(); /** diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 675cc5b18..8aeefc06c 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -35,12 +35,16 @@ class WorkflowStubImpl implements WorkflowStub { // if null, this stub is created to bound to an existing execution. // This stub is created to bound to an existing execution otherwise. private final @Nullable WorkflowOptions options; + private final boolean legacyTargeting; + private final @Nullable String firstExecutionRunId; WorkflowStubImpl( WorkflowClientOptions clientOptions, WorkflowClientCallsInterceptor workflowClientInvoker, Optional workflowType, - WorkflowExecution execution) { + WorkflowExecution execution, + boolean legacyTargeting, + @Nullable String firstExecutionRunId) { this.clientOptions = clientOptions; this.workflowClientInvoker = workflowClientInvoker; this.workflowType = workflowType; @@ -48,6 +52,8 @@ class WorkflowStubImpl implements WorkflowStub { throw new IllegalArgumentException("null or empty workflowId"); } this.execution.set(execution); + this.legacyTargeting = legacyTargeting; + this.firstExecutionRunId = firstExecutionRunId; this.options = null; } @@ -60,12 +66,14 @@ class WorkflowStubImpl implements WorkflowStub { this.workflowClientInvoker = workflowClientInvoker; this.workflowType = Optional.of(workflowType); this.options = options; + this.legacyTargeting = false; + this.firstExecutionRunId = null; } @Override public void signal(String signalName, Object... args) { checkStarted(); - WorkflowExecution targetExecution = currentExecutionWithoutRunId(); + WorkflowExecution targetExecution = currentExecutionCheckLegacy(); try { workflowClientInvoker.signal( new WorkflowClientCallsInterceptor.WorkflowSignalInput( @@ -338,6 +346,7 @@ public R update(String updateName, Class resultClass, Object... args) { .setUpdateName(updateName) .setWaitForStage(WorkflowUpdateStage.COMPLETED) .setResultClass(resultClass) + .setFirstExecutionRunId(firstExecutionRunId) .build(); return startUpdate(options, args).getResultAsync().get(); } catch (InterruptedException e) { @@ -385,21 +394,17 @@ private WorkflowClientCallsInterceptor.StartUpdateInput startUpdateInput( Strings.isNullOrEmpty(options.getUpdateId()) ? UUID.randomUUID().toString() : options.getUpdateId(); - WorkflowClientCallsInterceptor.StartUpdateInput input = - new WorkflowClientCallsInterceptor.StartUpdateInput<>( - targetExecution, - workflowType, - options.getUpdateName(), - Header.empty(), - updateId, - args, - options.getResultClass(), - options.getResultType(), - options.getFirstExecutionRunId(), - WaitPolicy.newBuilder() - .setLifecycleStage(options.getWaitForStage().getProto()) - .build()); - return input; + return new WorkflowClientCallsInterceptor.StartUpdateInput<>( + targetExecution, + workflowType, + options.getUpdateName(), + Header.empty(), + updateId, + args, + options.getResultClass(), + options.getResultType(), + options.getFirstExecutionRunId(), + WaitPolicy.newBuilder().setLifecycleStage(options.getWaitForStage().getProto()).build()); } @Override @@ -435,10 +440,11 @@ public void cancel() { @Override public void cancel(@Nullable String reason) { checkStarted(); - WorkflowExecution targetExecution = currentExecutionWithoutRunId(); + WorkflowExecution targetExecution = currentExecutionCheckLegacy(); try { workflowClientInvoker.cancel( - new WorkflowClientCallsInterceptor.CancelInput(targetExecution, reason)); + new WorkflowClientCallsInterceptor.CancelInput( + targetExecution, firstExecutionRunId, reason)); } catch (Exception e) { Throwable failure = throwAsWorkflowFailureException(e, targetExecution); throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure); @@ -448,10 +454,11 @@ public void cancel(@Nullable String reason) { @Override public void terminate(@Nullable String reason, Object... details) { checkStarted(); - WorkflowExecution targetExecution = currentExecutionWithoutRunId(); + WorkflowExecution targetExecution = currentExecutionCheckLegacy(); try { workflowClientInvoker.terminate( - new WorkflowClientCallsInterceptor.TerminateInput(targetExecution, reason, details)); + new WorkflowClientCallsInterceptor.TerminateInput( + targetExecution, firstExecutionRunId, reason, details)); } catch (Exception e) { Throwable failure = throwAsWorkflowFailureException(e, targetExecution); throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure); @@ -532,6 +539,14 @@ private WorkflowExecution currentExecutionWithoutRunId() { } } + private WorkflowExecution currentExecutionCheckLegacy() { + if (legacyTargeting) { + return currentExecutionWithoutRunId(); + } else { + return execution.get(); + } + } + private R throwAsWorkflowFailureExceptionForQuery( Throwable failure, @SuppressWarnings("unused") Class returnType, @@ -589,6 +604,7 @@ private Throwable throwAsWorkflowFailureException( private void populateExecutionAfterStart(WorkflowExecution startedExecution) { this.startedExecution.set(startedExecution); + // this.firstExecutionRunId.set(startedExecution.getRunId()); // bind to an execution without a runId, so queries follow runId chains by default this.execution.set(WorkflowExecution.newBuilder(startedExecution).setRunId("").build()); } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowTargetOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowTargetOptions.java new file mode 100644 index 000000000..a229db942 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowTargetOptions.java @@ -0,0 +1,122 @@ +package io.temporal.client; + +import io.temporal.api.common.v1.WorkflowExecution; +import java.util.Objects; + +/** Options for targeting a specific workflow execution. */ +public final class WorkflowTargetOptions { + public static WorkflowTargetOptions.Builder newBuilder() { + return new WorkflowTargetOptions.Builder(); + } + + public static WorkflowTargetOptions.Builder newBuilder(WorkflowTargetOptions options) { + return new WorkflowTargetOptions.Builder(options); + } + + public static WorkflowTargetOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final WorkflowTargetOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = WorkflowTargetOptions.newBuilder().build(); + } + + private final String workflowId; + private final String runId; + private final String firstExecutionRunId; + + private WorkflowTargetOptions(String workflowId, String runId, String firstExecutionRunId) { + this.workflowId = workflowId; + this.runId = runId; + this.firstExecutionRunId = firstExecutionRunId; + } + + public String getWorkflowId() { + return workflowId; + } + + public String getRunId() { + return runId; + } + + public String getFirstExecutionRunId() { + return firstExecutionRunId; + } + + public static final class Builder { + private String workflowId; + private String runId; + private String firstExecutionRunId; + + private Builder() {} + + private Builder(WorkflowTargetOptions options) { + this.workflowId = options.workflowId; + this.runId = options.runId; + this.firstExecutionRunId = options.firstExecutionRunId; + } + + /** Sets the workflowId of the target workflow. */ + public Builder setWorkflowId(String workflowId) { + this.workflowId = workflowId; + return this; + } + + /** Sets the runId of a specific execution of a workflow. */ + public Builder setRunId(String runId) { + this.runId = runId; + return this; + } + + /** + * Sets the runId of the first execution of a workflow. This is useful for targeting workflows + * that have been continued as new. + */ + public Builder setFirstExecutionRunId(String firstExecutionRunId) { + this.firstExecutionRunId = firstExecutionRunId; + return this; + } + + /** Sets both workflowId and runId from a WorkflowExecution object. */ + public Builder setWorkflowExecution(WorkflowExecution execution) { + this.workflowId = execution.getWorkflowId(); + this.runId = execution.getRunId(); + return this; + } + + public WorkflowTargetOptions build() { + return new WorkflowTargetOptions(workflowId, runId, firstExecutionRunId); + } + } + + @Override + public String toString() { + return "WorkflowTargetOptions{" + + "workflowId='" + + workflowId + + '\'' + + ", runId='" + + runId + + '\'' + + ", firstExecutionRunId='" + + firstExecutionRunId + + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + WorkflowTargetOptions that = (WorkflowTargetOptions) o; + return Objects.equals(workflowId, that.workflowId) + && Objects.equals(runId, that.runId) + && Objects.equals(firstExecutionRunId, that.firstExecutionRunId); + } + + @Override + public int hashCode() { + return Objects.hash(workflowId, runId, firstExecutionRunId); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index 8391412a4..e3593e9c2 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -433,6 +433,7 @@ public R getResult() { final class CancelInput { private final WorkflowExecution workflowExecution; + private final @Nullable String firstExecutionRunId; private final @Nullable String reason; /** @@ -445,7 +446,15 @@ public CancelInput(WorkflowExecution workflowExecution) { } public CancelInput(WorkflowExecution workflowExecution, @Nullable String reason) { + this(workflowExecution, null, reason); + } + + public CancelInput( + WorkflowExecution workflowExecution, + @Nullable String firstExecutionRunId, + @Nullable String reason) { this.workflowExecution = workflowExecution; + this.firstExecutionRunId = firstExecutionRunId; this.reason = reason; } @@ -457,6 +466,11 @@ public WorkflowExecution getWorkflowExecution() { public String getReason() { return reason; } + + @Nullable + public String getFirstExecutionRunId() { + return firstExecutionRunId; + } } final class StartUpdateInput { @@ -606,12 +620,22 @@ final class CancelOutput {} final class TerminateInput { private final WorkflowExecution workflowExecution; + private final @Nullable String firstExecutionRunId; private final @Nullable String reason; private final Object[] details; public TerminateInput( WorkflowExecution workflowExecution, @Nullable String reason, Object[] details) { + this(workflowExecution, null, reason, details); + } + + public TerminateInput( + WorkflowExecution workflowExecution, + @Nullable String firstExecutionRunId, + @Nullable String reason, + Object[] details) { this.workflowExecution = workflowExecution; + this.firstExecutionRunId = firstExecutionRunId; this.reason = reason; this.details = details; } @@ -620,6 +644,11 @@ public WorkflowExecution getWorkflowExecution() { return workflowExecution; } + @Nullable + public String getFirstExecutionRunId() { + return firstExecutionRunId; + } + @Nullable public String getReason() { return reason; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 78c64b944..896fdc076 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -645,6 +645,9 @@ public CancelOutput cancel(CancelInput input) { if (input.getReason() != null) { request.setReason(input.getReason()); } + if (input.getFirstExecutionRunId() != null) { + request.setFirstExecutionRunId(input.getFirstExecutionRunId()); + } genericClient.requestCancel(request.build()); return new CancelOutput(); } @@ -659,6 +662,9 @@ public TerminateOutput terminate(TerminateInput input) { if (input.getReason() != null) { request.setReason(input.getReason()); } + if (input.getFirstExecutionRunId() != null) { + request.setFirstExecutionRunId(input.getFirstExecutionRunId()); + } DataConverter dataConverterWithWorkflowContext = clientOptions .getDataConverter() diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/CancelTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/CancelTest.java index b472a360f..b68790a92 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/CancelTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/CancelTest.java @@ -32,8 +32,6 @@ public void cancellationOfNonExistentWorkflow() { assertThrows(WorkflowNotFoundException.class, untyped::cancel); } - // Testing the current behavior, this test MAY break after fixing: - // https://github.com/temporalio/temporal/issues/2860 @Test public void secondCancellationImmediately() { TestWorkflows.TestWorkflow1 workflow = @@ -44,8 +42,6 @@ public void secondCancellationImmediately() { untyped.cancel(); } - // Testing the current behavior, this test WILL break after fixing: - // https://github.com/temporalio/temporal/issues/2860 @Test public void secondCancellationAfterWorkflowIsCancelled() { TestWorkflows.TestWorkflow1 workflow = diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java index 63a607280..2c70b3a94 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/StartTest.java @@ -9,6 +9,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowTargetOptions; import io.temporal.common.WorkflowExecutionHistory; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.testing.internal.SDKTestOptions; @@ -16,7 +17,6 @@ import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.*; import java.time.Duration; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Rule; @@ -200,7 +200,8 @@ private void assertResult(String expected, WorkflowExecution execution) { String result = testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(String.class); assertEquals(expected, result); } @@ -209,7 +210,8 @@ private void assertResult(int expected, WorkflowExecution execution) { int result = testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(int.class); assertEquals(expected, result); } @@ -217,7 +219,8 @@ private void assertResult(int expected, WorkflowExecution execution) { private void waitForProc(WorkflowExecution execution) { testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(Void.class); } } diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/WorkflowIdConflictPolicyTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/WorkflowIdConflictPolicyTest.java index db34bb029..35fa7f0e3 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/WorkflowIdConflictPolicyTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/WorkflowIdConflictPolicyTest.java @@ -58,7 +58,9 @@ public void policyTerminateExisting() { testWorkflowRule .getWorkflowClient() .newUntypedWorkflowStub( - workflowExecution1, + WorkflowTargetOptions.newBuilder() + .setWorkflowExecution(workflowExecution1) + .build(), Optional.of(TestWorkflows.TestSignaledWorkflow.class.toString())) .getResult(String.class)); } diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/WorkflowStubRespectRunIdTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/WorkflowStubRespectRunIdTest.java new file mode 100644 index 000000000..4af3e5008 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/WorkflowStubRespectRunIdTest.java @@ -0,0 +1,191 @@ +package io.temporal.client.functional; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.client.*; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.QueryMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowStubRespectRunIdTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(AwaitingWorkflow.class).build(); + + @Test + public void terminateFollowFirstRunId() throws InterruptedException { + TestWorkflowWithQuery workflow = testWorkflowRule.newWorkflowStub(TestWorkflowWithQuery.class); + WorkflowClient.start(workflow::execute, "input1"); + WorkflowStub untyped = WorkflowStub.fromTyped(workflow); + waitForContinueAsNew(untyped.getExecution()); + Assert.assertThrows( + "If the workflow continued as new, terminating by execution without firstExecutionRunId should fail", + WorkflowNotFoundException.class, + () -> + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowExecution(untyped.getExecution()) + .build()) + .terminate("termination")); + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowExecution(untyped.getExecution()) + .setFirstExecutionRunId(untyped.getExecution().getRunId()) + .build()) + .terminate("termination"); + Assert.assertThrows( + "Workflow should not be terminated", + WorkflowFailedException.class, + () -> untyped.getResult(String.class)); + } + + @Test + public void cancelFollowFirstRunId() throws InterruptedException { + TestWorkflowWithQuery workflow = testWorkflowRule.newWorkflowStub(TestWorkflowWithQuery.class); + WorkflowClient.start(workflow::execute, "input1"); + WorkflowStub untyped = WorkflowStub.fromTyped(workflow); + waitForContinueAsNew(untyped.getExecution()); + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(untyped.getExecution()).build()) + .cancel(); + testWorkflowRule.assertNoHistoryEvent( + untyped.getExecution().getWorkflowId(), + untyped.getExecution().getRunId(), + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED); + + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowExecution(untyped.getExecution()) + .setFirstExecutionRunId(untyped.getExecution().getRunId()) + .build()) + .cancel(); + Assert.assertThrows( + "Workflow should be cancelled", + WorkflowFailedException.class, + () -> untyped.getResult(String.class)); + testWorkflowRule.assertHistoryEvent( + untyped.getExecution().getWorkflowId(), + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED); + } + + @Test + public void signalRespectRunId() throws InterruptedException { + TestWorkflowWithQuery workflow = testWorkflowRule.newWorkflowStub(TestWorkflowWithQuery.class); + WorkflowClient.start(workflow::execute, "input1"); + WorkflowStub untyped = WorkflowStub.fromTyped(workflow); + waitForContinueAsNew(untyped.getExecution()); + Assert.assertThrows( + WorkflowNotFoundException.class, + () -> + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowExecution(untyped.getExecution()) + .build()) + .signal("signal")); + + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowId(untyped.getExecution().getWorkflowId()) + .build()) + .signal("signal"); + } + + private void waitForContinueAsNew(WorkflowExecution execution) throws InterruptedException { + final int maxAttempts = 5; // 100 * 100ms = 10s + final long sleepMs = 1000L; + int attempts = 0; + + WorkflowStub latestStub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowId(execution.getWorkflowId()) + .build()); + + while (attempts++ < maxAttempts) { + try { + String currentRunId = latestStub.describe().getExecution().getRunId(); + if (!execution.getRunId().equals(currentRunId)) { + return; + } + } catch (Exception e) { + // Ignore and retry until timeout (query may fail while continue-as-new is in progress) + } + Thread.sleep(sleepMs); + } + + throw new AssertionError( + "ContinueAsNew event was not observed for workflowId: " + execution.getWorkflowId()); + } + + @Test + public void queryRespectRunId() throws InterruptedException { + TestWorkflowWithQuery workflow = testWorkflowRule.newWorkflowStub(TestWorkflowWithQuery.class); + WorkflowClient.start(workflow::execute, "input1"); + WorkflowStub untyped = WorkflowStub.fromTyped(workflow); + waitForContinueAsNew(untyped.getExecution()); + String actualRunId = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowExecution(untyped.getExecution()) + .build()) + .query("getRunId", String.class); + Assert.assertEquals(untyped.getExecution().getRunId(), actualRunId); + + actualRunId = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowId(untyped.getExecution().getWorkflowId()) + .build()) + .query("getRunId", String.class); + Assert.assertNotEquals(untyped.getExecution().getRunId(), actualRunId); + } + + @WorkflowInterface + public interface TestWorkflowWithQuery { + @WorkflowMethod() + String execute(String arg); + + @QueryMethod() + String getRunId(); + } + + public static class AwaitingWorkflow implements TestWorkflowWithQuery { + + @Override + public String execute(String arg) { + if (!Workflow.getInfo().getContinuedExecutionRunId().isPresent()) { + Workflow.continueAsNew(); + } + Workflow.await(() -> false); + return "done"; + } + + @Override + public String getRunId() { + return Workflow.getInfo().getRunId(); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotMaxConcurrentTests.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotMaxConcurrentTests.java index 70cb012ac..944d5d515 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotMaxConcurrentTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotMaxConcurrentTests.java @@ -10,6 +10,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowTargetOptions; import io.temporal.common.RetryOptions; import io.temporal.common.reporter.TestStatsReporter; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -20,7 +21,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; @@ -164,7 +164,9 @@ public void TestSlotsNotExceeded() { // wait for all of them to finish for (WorkflowExecution execution : executions) { - WorkflowStub workflowStub = client.newUntypedWorkflowStub(execution, Optional.empty()); + WorkflowStub workflowStub = + client.newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()); workflowStub.getResult(String.class); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/SyncTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/SyncTest.java index 6b95583b1..6f17d24da 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/SyncTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/SyncTest.java @@ -6,6 +6,7 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowTargetOptions; import io.temporal.failure.CanceledFailure; import io.temporal.failure.TerminatedFailure; import io.temporal.testing.internal.SDKTestOptions; @@ -76,7 +77,9 @@ public void testSyncUntypedAndStackTrace() { workflowStub = testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, workflowStub.getWorkflowType()); + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build(), + workflowStub.getWorkflowType()); stackTrace = workflowStub.query(QUERY_TYPE_STACK_TRACE, String.class); assertTrue(stackTrace, stackTrace.contains("TestSyncWorkflowImpl.execute")); assertTrue(stackTrace, stackTrace.contains("activityWithDelay")); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalDuringLastWorkflowTaskTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalDuringLastWorkflowTaskTest.java index 55220dd5e..5a226a949 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalDuringLastWorkflowTaskTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalDuringLastWorkflowTaskTest.java @@ -5,11 +5,11 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowTargetOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.WorkerOptions; import io.temporal.workflow.shared.TestWorkflows.TestSignaledWorkflow; import java.time.Duration; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -56,7 +56,8 @@ public void testSignalDuringLastWorkflowTask() throws ExecutionException, Interr "Signal Input", testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(String.class)); assertCompleted.complete(true); }); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java index 407853b26..a3bde4bf9 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/signalTests/SignalTest.java @@ -59,7 +59,8 @@ public void testSignal() { // Test client created using WorkflowExecution QueryableWorkflow client2 = workflowClient.newWorkflowStub( - QueryableWorkflow.class, execution.getWorkflowId(), Optional.of(execution.getRunId())); + QueryableWorkflow.class, + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()); assertEquals("Hello ", client2.getState()); testWorkflowRule.sleep(Duration.ofMillis(500)); @@ -68,7 +69,10 @@ public void testSignal() { assertEquals("World!", client2.getState()); assertEquals( "Hello World!", - workflowClient.newUntypedWorkflowStub(execution, Optional.empty()).getResult(String.class)); + workflowClient + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) + .getResult(String.class)); client2.execute(); } @@ -102,7 +106,10 @@ public void testSignalWithStart() { assertEquals("World!", client2.getState()); assertEquals( "Hello World!", - workflowClient.newUntypedWorkflowStub(execution, Optional.empty()).getResult(String.class)); + workflowClient + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) + .getResult(String.class)); // Check if that it starts closed workflow (AllowDuplicate is default IdReusePolicy) QueryableWorkflow client3 = workflowClient.newWorkflowStub(QueryableWorkflow.class, options); @@ -160,7 +167,9 @@ public void testSignalUntyped() { assertEquals( "Hello World!", workflowClient - .newUntypedWorkflowStub(execution, Optional.of(workflowType)) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build(), + Optional.of(workflowType)) .getResult(String.class)); assertEquals("Hello World!", workflowStub.getResult(String.class)); assertEquals("World!", workflowStub.query("getState", String.class)); @@ -173,7 +182,9 @@ public void testSignalUntyped() { .setQueryRejectCondition(QueryRejectCondition.QUERY_REJECT_CONDITION_NOT_OPEN) .build()); WorkflowStub workflowStubNotOptionRejectCondition = - client.newUntypedWorkflowStub(execution, Optional.of(workflowType)); + client.newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build(), + Optional.of(workflowType)); try { workflowStubNotOptionRejectCondition.query("getState", String.class); fail("unreachable"); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/DynamicUpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/DynamicUpdateTest.java index b21a713ae..17a509968 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/DynamicUpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/DynamicUpdateTest.java @@ -13,7 +13,6 @@ import io.temporal.workflow.shared.TestWorkflows; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; import org.junit.Assert; @@ -59,7 +58,8 @@ public void dynamicUpdate() throws ExecutionException, InterruptedException { String result = testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(String.class); assertEquals(" update complete", result); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java index 9e8e10bfb..9e08de11c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/SpeculativeUpdateTest.java @@ -15,7 +15,6 @@ import io.temporal.workflow.shared.TestActivities; import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate; import java.time.Duration; -import java.util.Optional; import java.util.Random; import java.util.UUID; import org.junit.Rule; @@ -60,7 +59,8 @@ public void speculativeUpdateRejected() { String result = testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(String.class); assertEquals("", result); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/TypedUpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/TypedUpdateTest.java index 63b39a0fe..b3af69c42 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/TypedUpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/TypedUpdateTest.java @@ -35,7 +35,7 @@ public void testTypedStubSync() { TestMultiArgWorkflowUpdateFunctions.TestMultiArgUpdateWorkflow workflow = workflowClient.newWorkflowStub( TestMultiArgWorkflowUpdateFunctions.TestMultiArgUpdateWorkflow.class, options); - WorkflowExecution execution = WorkflowClient.start(workflow::execute); + WorkflowClient.start(workflow::execute); Assert.assertEquals("func", workflow.func()); Assert.assertEquals("input", workflow.func1("input")); @@ -54,12 +54,7 @@ public void testTypedStubSync() { workflow.proc6("input", 2, 3, 4, 5, 6); workflow.complete(); - String result = - testWorkflowRule - .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) - .getResult(String.class); - assertEquals("procinputinput2input23input234input2345input23456", result); + assertEquals("procinputinput2input23input234input2345input23456", workflow.execute()); } @Test @@ -129,11 +124,6 @@ public void testTypedAsync() throws ExecutionException, InterruptedException { .get(); workflow.complete(); - String result = - testWorkflowRule - .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) - .getResult(String.class); - assertEquals("procinputinput2input23input234input2345input23456", result); + assertEquals("procinputinput2input23input234input2345input23456", workflow.execute()); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateAnnotationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateAnnotationTest.java index 7861d00d1..e739028fa 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateAnnotationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateAnnotationTest.java @@ -5,13 +5,13 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.UpdateMethod; import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; -import java.util.Optional; import org.junit.Rule; import org.junit.Test; @@ -36,11 +36,7 @@ public void testUpdateOnlyInterface() { workflowClient.newWorkflowStub(UpdateWorkflowInterface.class, execution.getWorkflowId()); updateOnlyWorkflow.update(); - String result = - testWorkflowRule - .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) - .getResult(String.class); + String result = WorkflowStub.fromTyped(updateOnlyWorkflow).getResult(String.class); assertEquals("success", result); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateBadValidatorTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateBadValidatorTest.java index 8c20f83dd..a2419f46f 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateBadValidatorTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateBadValidatorTest.java @@ -13,7 +13,6 @@ import io.temporal.workflow.shared.TestActivities; import io.temporal.workflow.shared.TestWorkflows; import java.time.Duration; -import java.util.Optional; import java.util.UUID; import org.junit.Rule; import org.junit.Test; @@ -58,12 +57,7 @@ public void testBadUpdateValidator() { workflow.complete(); - String result = - testWorkflowRule - .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) - .getResult(String.class); - assertEquals("", result); + assertEquals("", workflow.execute()); } public static class TestUpdateWithBadValidatorWorkflowImpl diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateInfoTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateInfoTest.java index 857d19265..4fab2ca12 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateInfoTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateInfoTest.java @@ -40,11 +40,11 @@ public void testUpdateInfo() throws ExecutionException, InterruptedException { .setUpdateName("update") .setWaitForStage(WorkflowUpdateStage.COMPLETED); - WorkflowUpdateHandle handle1 = + WorkflowUpdateHandle handle1 = stub.startUpdate(updateOptionsBuilder.setUpdateId("update id 1").build(), 0, ""); assertEquals("update:update id 1", handle1.getResultAsync().get()); - WorkflowUpdateHandle handle2 = + WorkflowUpdateHandle handle2 = stub.startUpdate(updateOptionsBuilder.setUpdateId("update id 2").build(), 0, ""); assertEquals("update:update id 2", handle2.getResultAsync().get()); @@ -57,7 +57,8 @@ public void testUpdateInfo() throws ExecutionException, InterruptedException { String result = testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(String.class); assertEquals(" update id 1 update id 2", result); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateRetryExceptionTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateRetryExceptionTest.java index 508b47044..6e102b58a 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateRetryExceptionTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateRetryExceptionTest.java @@ -61,13 +61,7 @@ public void testUpdateExceptionRetries() { "message='simulated 3', type='Failure', nonRetryable=false", e.getCause().getMessage()); } workflow.complete(); - - String result = - testWorkflowRule - .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) - .getResult(String.class); - assertEquals("", result); + assertEquals("", workflow.execute()); } public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java index a5830ab0a..1489b2128 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java @@ -84,7 +84,8 @@ public void testUpdate() { String result = testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(String.class); assertEquals("Execute-Hello Update Execute-Hello Update 2", result); } @@ -133,7 +134,8 @@ public void testUpdateIntercepted() { String result = testWorkflowRule .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) + .newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(execution).build()) .getResult(String.class); assertEquals("Execute-Hello Update Execute-Hello Update 2", result); } @@ -287,7 +289,11 @@ public void testUpdateResets() { // Create a new workflow stub with the new run ID workflow = workflowClient.newWorkflowStub( - WorkflowWithUpdate.class, workflowId, Optional.of(resetResponse.getRunId())); + WorkflowWithUpdate.class, + WorkflowTargetOptions.newBuilder() + .setWorkflowId(workflowId) + .setRunId(resetResponse.getRunId()) + .build()); assertEquals("Execute-Hello Update 2", workflow.update(0, "Hello Update 2")); // Complete would throw an exception if the update was not applied to the reset workflow. workflow.complete(); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithLocalActivityTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithLocalActivityTest.java index 5b3581f9b..0e591a0e3 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithLocalActivityTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithLocalActivityTest.java @@ -14,7 +14,6 @@ import io.temporal.workflow.shared.TestWorkflows; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.UUID; import org.junit.Rule; import org.junit.Test; @@ -61,11 +60,7 @@ public void testUpdateWithLocalActivities() { workflow.complete(); - String result = - testWorkflowRule - .getWorkflowClient() - .newUntypedWorkflowStub(execution, Optional.empty()) - .getResult(String.class); + String result = WorkflowStub.fromTyped(workflow).getResult(String.class); assertEquals( "Hello Update sleepActivity0 activity Hello Update 2 sleepActivity1 activity", result); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionContinueAsNewTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionContinueAsNewTest.java index 50db1a0f1..9531c2905 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionContinueAsNewTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionContinueAsNewTest.java @@ -8,6 +8,7 @@ import io.temporal.api.history.v1.HistoryEvent; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowTargetOptions; import io.temporal.common.WorkflowExecutionHistory; import io.temporal.internal.history.VersionMarkerUtils; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -16,7 +17,6 @@ import io.temporal.workflow.WorkflowMethod; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.junit.Rule; import org.junit.Test; @@ -45,7 +45,7 @@ public void versionNotCarriedOverOnContinueAsNew() { testWorkflowRule .getWorkflowClient() .newUntypedWorkflowStub( - run1.getWorkflowId(), Optional.of(run1.getRunId()), Optional.empty()); + WorkflowTargetOptions.newBuilder().setWorkflowExecution(run1).build()); WorkflowStub latestUntyped = testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub(run1.getWorkflowId()); WorkflowExecution run2 = latestUntyped.getExecution(); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java b/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java index 57ec25a7a..f749c68e0 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java @@ -293,6 +293,10 @@ public WorkflowExecutionHistory getExecutionHistory(String workflowId) { return testWorkflowRule.getWorkflowClient().fetchHistory(workflowId); } + public WorkflowExecutionHistory getExecutionHistory(String workflowId, String runId) { + return testWorkflowRule.getWorkflowClient().fetchHistory(workflowId, runId); + } + /** Returns list of all events of the given EventType found in the history. */ public List getHistoryEvents(String workflowId, EventType eventType) { List result = new ArrayList<>(); @@ -318,7 +322,11 @@ public HistoryEvent getHistoryEvent(String workflowId, EventType eventType) { /** Asserts that an event of the given EventType is found in the history. */ public void assertHistoryEvent(String workflowId, EventType eventType) { - History history = getExecutionHistory(workflowId).getHistory(); + assertHistoryEvent(workflowId, null, eventType); + } + + public void assertHistoryEvent(String workflowId, String runId, EventType eventType) { + History history = getExecutionHistory(workflowId, runId).getHistory(); for (HistoryEvent event : history.getEventsList()) { if (eventType == event.getEventType()) { return; @@ -329,7 +337,12 @@ public void assertHistoryEvent(String workflowId, EventType eventType) { /** Asserts that an event of the given EventType is not found in the history. */ public void assertNoHistoryEvent(String workflowId, EventType eventType) { - History history = getExecutionHistory(workflowId).getHistory(); + assertNoHistoryEvent(workflowId, null, eventType); + } + + /** Asserts that an event of the given EventType is not found in the history. */ + public void assertNoHistoryEvent(String workflowId, String runId, EventType eventType) { + History history = getExecutionHistory(workflowId, runId).getHistory(); assertNoHistoryEvent(history, eventType); } diff --git a/temporal-testing/src/test/java/io/temporal/testing/TestWorkflowEnvironmentSleepTest.java b/temporal-testing/src/test/java/io/temporal/testing/TestWorkflowEnvironmentSleepTest.java index 06140ba87..5bb33f1f4 100644 --- a/temporal-testing/src/test/java/io/temporal/testing/TestWorkflowEnvironmentSleepTest.java +++ b/temporal-testing/src/test/java/io/temporal/testing/TestWorkflowEnvironmentSleepTest.java @@ -6,11 +6,7 @@ import io.grpc.StatusRuntimeException; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.TimeoutType; -import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowFailedException; -import io.temporal.client.WorkflowOptions; -import io.temporal.client.WorkflowServiceException; -import io.temporal.client.WorkflowStub; +import io.temporal.client.*; import io.temporal.failure.TimeoutFailure; import io.temporal.worker.Worker; import io.temporal.workflow.SignalMethod; @@ -18,7 +14,6 @@ import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import java.time.Duration; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.junit.After; @@ -207,7 +202,9 @@ public void timeskippingWorksForBothTypesOfUntypedStubs() { client.newUntypedWorkflowStub("ConfigurableSleepWorkflow", workflowAOptions); WorkflowExecution executionB = stubB.start(durationToSleep); - WorkflowStub stubBPrime = client.newUntypedWorkflowStub(executionB, Optional.empty()); + WorkflowStub stubBPrime = + client.newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder().setWorkflowExecution(executionB).build()); waitForWorkflow(stubBPrime, "newUntypedStubForWorkflowExecution", durationToWait); }