Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,21 @@ static WorkflowClient newInstance(WorkflowServiceStubs service, WorkflowClientOp
* @param workflowId Workflow id.
* @param runId Run id of the workflow execution.
* @return Stub that implements workflowInterface and can be used to signal, update, or query it.
* @deprecated Use {@link #newWorkflowStub(Class, WorkflowTargetOptions)} instead.
*/
@Deprecated
<T> T newWorkflowStub(Class<T> workflowInterface, String workflowId, Optional<String> runId);

/**
* Creates workflow client stub for a known execution. Use it to send signals or queries to a
* running workflow. Do not call methods annotated with @WorkflowMethod.
*
* @param workflowInterface interface that given workflow implements.
* @param workflowTargetOptions options that specify target workflow execution.
* @return Stub that implements workflowInterface and can be used to signal or query it.
*/
<T> T newWorkflowStub(Class<T> workflowInterface, WorkflowTargetOptions workflowTargetOptions);

/**
* Creates workflow untyped client stub that can be used to start a single workflow execution. Use
* it to send signals or queries to a running workflow. Do not call methods annotated
Expand Down Expand Up @@ -191,7 +203,9 @@ static WorkflowClient newInstance(WorkflowServiceStubs service, WorkflowClientOp
* workflowId is assumed.
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
* @return Stub that can be used to start workflow and later to signal or query it.
* @deprecated Use {@link #newUntypedWorkflowStub(Optional, WorkflowTargetOptions)} instead.
*/
@Deprecated
WorkflowStub newUntypedWorkflowStub(
String workflowId, Optional<String> runId, Optional<String> workflowType);

Expand All @@ -202,9 +216,21 @@ WorkflowStub newUntypedWorkflowStub(
* @param execution workflow id and optional run id for execution
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
* @return Stub that can be used to start workflow and later to signal or query it.
* @deprecated Use {@link #newUntypedWorkflowStub(Optional, WorkflowTargetOptions)} instead.
*/
WorkflowStub newUntypedWorkflowStub(WorkflowExecution execution, Optional<String> workflowType);

/**
* Creates workflow untyped client stub for a known execution. Use it to send signals or queries
* to a running workflow. Do not call methods annotated with @WorkflowMethod.
*
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
* @param workflowTargetOptions options that specify target workflow execution.
* @return Stub that can be used to start workflow and later to signal or query it.
*/
WorkflowStub newUntypedWorkflowStub(
Optional<String> workflowType, WorkflowTargetOptions workflowTargetOptions);

/**
* Creates new {@link ActivityCompletionClient} that can be used to complete activities
* asynchronously. Only relevant for activity implementations that called {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,50 @@ public <T> T newWorkflowStub(Class<T> workflowInterface, String workflowId) {
return newWorkflowStub(workflowInterface, workflowId, Optional.empty());
}

public <T> T newWorkflowStub(
Class<T> workflowInterface, WorkflowTargetOptions workflowTargetOptions) {
return newWorkflowStub(workflowInterface, workflowTargetOptions, true);
}

@Override
@SuppressWarnings("deprecation")
public <T> T newWorkflowStub(
Class<T> workflowInterface, String workflowId, Optional<String> runId) {
return newWorkflowStub(
workflowInterface,
WorkflowTargetOptions.newBuilder()
.setWorkflowId(workflowId)
.setRunId(runId.orElse(null))
.build());
}

public <T> T newWorkflowStub(
Class<T> workflowInterface,
WorkflowTargetOptions workflowTargetOptions,
boolean legacyTargeting) {
checkAnnotation(
workflowInterface,
WorkflowMethod.class,
QueryMethod.class,
SignalMethod.class,
UpdateMethod.class);
if (Strings.isNullOrEmpty(workflowId)) {
if (Strings.isNullOrEmpty(workflowTargetOptions.getWorkflowId())) {
throw new IllegalArgumentException("workflowId is null or empty");
}
WorkflowExecution execution =
WorkflowExecution.newBuilder().setWorkflowId(workflowId).setRunId(runId.orElse("")).build();
WorkflowExecution.Builder execution =
WorkflowExecution.newBuilder().setWorkflowId(workflowTargetOptions.getWorkflowId());
if (!Strings.isNullOrEmpty(workflowTargetOptions.getRunId())) {
execution.setRunId(workflowTargetOptions.getRunId());
}

WorkflowInvocationHandler invocationHandler =
new WorkflowInvocationHandler(
workflowInterface, this.getOptions(), workflowClientCallsInvoker, execution);
workflowInterface,
this.getOptions(),
workflowClientCallsInvoker,
execution.build(),
legacyTargeting,
workflowTargetOptions.getFirstExecutionRunId());
@SuppressWarnings("unchecked")
T result =
(T)
Expand Down Expand Up @@ -194,6 +220,7 @@ public WorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions
}

@Override
@SuppressWarnings("deprecation")
public WorkflowStub newUntypedWorkflowStub(
String workflowId, Optional<String> runId, Optional<String> workflowType) {
WorkflowExecution execution =
Expand All @@ -205,10 +232,41 @@ public WorkflowStub newUntypedWorkflowStub(
@SuppressWarnings("deprecation")
public WorkflowStub newUntypedWorkflowStub(
WorkflowExecution execution, Optional<String> workflowType) {
return newUntypedWorkflowStub(
workflowType,
true,
WorkflowTargetOptions.newBuilder()
.setWorkflowId(execution.getWorkflowId())
.setRunId(execution.getRunId())
.build());
}

@Override
public WorkflowStub newUntypedWorkflowStub(
Optional<String> workflowType, WorkflowTargetOptions workflowTargetOptions) {
return newUntypedWorkflowStub(workflowType, false, workflowTargetOptions);
}

@SuppressWarnings("deprecation")
WorkflowStub newUntypedWorkflowStub(
Optional<String> workflowType,
boolean legacyTargeting,
WorkflowTargetOptions workflowTargetOptions) {
WorkflowExecution.Builder execution =
WorkflowExecution.newBuilder().setWorkflowId(workflowTargetOptions.getWorkflowId());
if (!Strings.isNullOrEmpty(workflowTargetOptions.getRunId())) {
execution.setRunId(workflowTargetOptions.getRunId());
}
WorkflowStub result =
new WorkflowStubImpl(options, workflowClientCallsInvoker, workflowType, execution);
new WorkflowStubImpl(
options,
workflowClientCallsInvoker,
workflowType,
execution.build(),
legacyTargeting,
workflowTargetOptions.getFirstExecutionRunId());
for (WorkflowClientInterceptor i : interceptors) {
result = i.newUntypedWorkflowStub(execution, workflowType, result);
result = i.newUntypedWorkflowStub(execution.build(), workflowType, result);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.*;
import javax.annotation.Nullable;

/**
* Dynamic implementation of a strongly typed workflow interface that can be used to start, signal
Expand Down Expand Up @@ -107,11 +108,19 @@ static void closeAsyncInvocation() {
Class<?> workflowInterface,
WorkflowClientOptions clientOptions,
WorkflowClientCallsInterceptor workflowClientCallsInvoker,
WorkflowExecution execution) {
WorkflowExecution execution,
boolean legacyTargeting,
@Nullable String firstExecutionRunId) {
workflowMetadata = POJOWorkflowInterfaceMetadata.newInstance(workflowInterface, false);
Optional<String> workflowType = workflowMetadata.getWorkflowType();
WorkflowStub stub =
new WorkflowStubImpl(clientOptions, workflowClientCallsInvoker, workflowType, execution);
new WorkflowStubImpl(
clientOptions,
workflowClientCallsInvoker,
workflowType,
execution,
legacyTargeting,
firstExecutionRunId);
for (WorkflowClientInterceptor i : clientOptions.getInterceptors()) {
stub = i.newUntypedWorkflowStub(execution, workflowType, stub);
}
Expand Down
18 changes: 18 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,25 @@ <R> WorkflowUpdateHandle<R> startUpdateWithStart(
<R> R executeUpdateWithStart(
UpdateOptions<R> updateOptions, Object[] updateArgs, Object[] startArgs);

/**
* Sends a signal to a workflow, starting the workflow if it is not already running.
*
* @param signalName name of the signal handler. Usually it is a method name.
* @param signalArgs signal method arguments
* @param startArgs workflow start arguments
* @return workflow execution
*/
WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs);

/**
* @return workflow type name if it was provided when the stub was created.
*/
Optional<String> getWorkflowType();

/**
* @return current workflow execution. Returns null if the workflow has not been started yet.
*/
@Nullable
WorkflowExecution getExecution();

/**
Expand Down Expand Up @@ -406,6 +421,9 @@ <R> CompletableFuture<R> getResultAsync(
*/
WorkflowExecutionDescription describe();

/**
* @return workflow options if they were provided when the stub was created.
*/
Optional<WorkflowOptions> getOptions();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,25 @@ class WorkflowStubImpl implements WorkflowStub {
// if null, this stub is created to bound to an existing execution.
// This stub is created to bound to an existing execution otherwise.
private final @Nullable WorkflowOptions options;
private final boolean legacyTargeting;
private final @Nullable String firstExecutionRunId;

WorkflowStubImpl(
WorkflowClientOptions clientOptions,
WorkflowClientCallsInterceptor workflowClientInvoker,
Optional<String> workflowType,
WorkflowExecution execution) {
WorkflowExecution execution,
boolean legacyTargeting,
@Nullable String firstExecutionRunId) {
this.clientOptions = clientOptions;
this.workflowClientInvoker = workflowClientInvoker;
this.workflowType = workflowType;
if (execution == null || execution.getWorkflowId().isEmpty()) {
throw new IllegalArgumentException("null or empty workflowId");
}
this.execution.set(execution);
this.legacyTargeting = legacyTargeting;
this.firstExecutionRunId = firstExecutionRunId;
this.options = null;
}

Expand All @@ -60,12 +66,14 @@ class WorkflowStubImpl implements WorkflowStub {
this.workflowClientInvoker = workflowClientInvoker;
this.workflowType = Optional.of(workflowType);
this.options = options;
this.legacyTargeting = false;
this.firstExecutionRunId = null;
}

@Override
public void signal(String signalName, Object... args) {
checkStarted();
WorkflowExecution targetExecution = currentExecutionWithoutRunId();
WorkflowExecution targetExecution = currentExecutionCheckLegacy();
try {
workflowClientInvoker.signal(
new WorkflowClientCallsInterceptor.WorkflowSignalInput(
Expand Down Expand Up @@ -338,6 +346,7 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {
.setUpdateName(updateName)
.setWaitForStage(WorkflowUpdateStage.COMPLETED)
.setResultClass(resultClass)
.setFirstExecutionRunId(firstExecutionRunId)
.build();
return startUpdate(options, args).getResultAsync().get();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -385,21 +394,17 @@ private <R> WorkflowClientCallsInterceptor.StartUpdateInput<R> startUpdateInput(
Strings.isNullOrEmpty(options.getUpdateId())
? UUID.randomUUID().toString()
: options.getUpdateId();
WorkflowClientCallsInterceptor.StartUpdateInput<R> input =
new WorkflowClientCallsInterceptor.StartUpdateInput<>(
targetExecution,
workflowType,
options.getUpdateName(),
Header.empty(),
updateId,
args,
options.getResultClass(),
options.getResultType(),
options.getFirstExecutionRunId(),
WaitPolicy.newBuilder()
.setLifecycleStage(options.getWaitForStage().getProto())
.build());
return input;
return new WorkflowClientCallsInterceptor.StartUpdateInput<>(
targetExecution,
workflowType,
options.getUpdateName(),
Header.empty(),
updateId,
args,
options.getResultClass(),
options.getResultType(),
options.getFirstExecutionRunId(),
WaitPolicy.newBuilder().setLifecycleStage(options.getWaitForStage().getProto()).build());
}

@Override
Expand Down Expand Up @@ -435,10 +440,11 @@ public void cancel() {
@Override
public void cancel(@Nullable String reason) {
checkStarted();
WorkflowExecution targetExecution = currentExecutionWithoutRunId();
WorkflowExecution targetExecution = currentExecutionCheckLegacy();
try {
workflowClientInvoker.cancel(
new WorkflowClientCallsInterceptor.CancelInput(targetExecution, reason));
new WorkflowClientCallsInterceptor.CancelInput(
targetExecution, firstExecutionRunId, reason));
} catch (Exception e) {
Throwable failure = throwAsWorkflowFailureException(e, targetExecution);
throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
Expand All @@ -448,10 +454,11 @@ public void cancel(@Nullable String reason) {
@Override
public void terminate(@Nullable String reason, Object... details) {
checkStarted();
WorkflowExecution targetExecution = currentExecutionWithoutRunId();
WorkflowExecution targetExecution = currentExecutionCheckLegacy();
try {
workflowClientInvoker.terminate(
new WorkflowClientCallsInterceptor.TerminateInput(targetExecution, reason, details));
new WorkflowClientCallsInterceptor.TerminateInput(
targetExecution, firstExecutionRunId, reason, details));
} catch (Exception e) {
Throwable failure = throwAsWorkflowFailureException(e, targetExecution);
throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
Expand Down Expand Up @@ -532,6 +539,14 @@ private WorkflowExecution currentExecutionWithoutRunId() {
}
}

private WorkflowExecution currentExecutionCheckLegacy() {
if (legacyTargeting) {
return currentExecutionWithoutRunId();
} else {
return execution.get();
}
}

private <R> R throwAsWorkflowFailureExceptionForQuery(
Throwable failure,
@SuppressWarnings("unused") Class<R> returnType,
Expand Down Expand Up @@ -589,6 +604,7 @@ private Throwable throwAsWorkflowFailureException(

private void populateExecutionAfterStart(WorkflowExecution startedExecution) {
this.startedExecution.set(startedExecution);
// this.firstExecutionRunId.set(startedExecution.getRunId());
// bind to an execution without a runId, so queries follow runId chains by default
this.execution.set(WorkflowExecution.newBuilder(startedExecution).setRunId("").build());
}
Expand Down
Loading
Loading