Skip to content

Add options to GetVersion for executeWithVersion and executeWithMinVersion #1008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.uber.cadence.workflow.ActivityFailureException;
import com.uber.cadence.workflow.Functions.Func;
import com.uber.cadence.workflow.Functions.Func1;
import com.uber.cadence.workflow.GetVersionOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -320,6 +321,16 @@ private void handleVersionMarker(MarkerRecordedEventAttributes attributes) {

GetVersionResult getVersion(
String changeId, DataConverter converter, int minSupported, int maxSupported) {
return getVersion(
changeId, converter, minSupported, maxSupported, GetVersionOptions.newBuilder().build());
}

GetVersionResult getVersion(
String changeId,
DataConverter converter,
int minSupported,
int maxSupported,
GetVersionOptions options) {
Predicate<MarkerRecordedEventAttributes> changeIdEquals =
(attributes) -> {
MarkerHandler.MarkerInterface markerData =
Expand All @@ -328,6 +339,9 @@ GetVersionResult getVersion(
};
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));

// Determine which version to use based on options
int versionToUse = determineVersionToUse(minSupported, maxSupported, options);

final MarkerHandler.HandleResult result =
versionHandler.handle(
changeId,
Expand All @@ -336,14 +350,14 @@ GetVersionResult getVersion(
if (stored.isPresent()) {
return Optional.empty();
}
return Optional.of(converter.toData(maxSupported));
return Optional.of(converter.toData(versionToUse));
});

final boolean isNewlyAdded = result.isNewlyStored();
Map<String, Object> searchAttributesForChangeVersion = null;
if (isNewlyAdded) {
searchAttributesForChangeVersion =
createSearchAttributesForChangeVersion(changeId, maxSupported, versionMap);
createSearchAttributesForChangeVersion(changeId, versionToUse, versionMap);
}

Integer version = versionMap.get(changeId);
Expand All @@ -361,6 +375,22 @@ GetVersionResult getVersion(
return new GetVersionResult(version, isNewlyAdded, searchAttributesForChangeVersion);
}

private int determineVersionToUse(int minSupported, int maxSupported, GetVersionOptions options) {
if (isReplaying()) {
return WorkflowInternal.DEFAULT_VERSION;
}

if (options.getCustomVersion().isPresent()) {
return options.getCustomVersion().get();
}

if (options.isUseMinVersion()) {
return minSupported;
}

return maxSupported;
}

private void validateVersion(String changeID, int version, int minSupported, int maxSupported) {
if ((version < minSupported || version > maxSupported)
&& version != WorkflowInternal.DEFAULT_VERSION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.uber.cadence.converter.DataConverter;
import com.uber.cadence.workflow.Functions.Func;
import com.uber.cadence.workflow.Functions.Func1;
import com.uber.cadence.workflow.GetVersionOptions;
import com.uber.cadence.workflow.Promise;
import com.uber.m3.tally.Scope;
import java.time.Duration;
Expand Down Expand Up @@ -194,6 +195,23 @@ Optional<byte[]> mutableSideEffect(
*/
int getVersion(String changeID, DataConverter dataConverter, int minSupported, int maxSupported);

/**
* Enhanced version of getVersion with additional options for version control.
*
* @param changeID identifier of a particular change
* @param dataConverter data converter for serialization
* @param minSupported min version supported for the change
* @param maxSupported max version supported for the change
* @param options version control options
* @return version
*/
int getVersion(
String changeID,
DataConverter dataConverter,
int minSupported,
int maxSupported,
GetVersionOptions options);

Random newRandom();

/** @return scope to be used for metrics reporting. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.uber.cadence.internal.worker.SingleWorkerOptions;
import com.uber.cadence.workflow.Functions.Func;
import com.uber.cadence.workflow.Functions.Func1;
import com.uber.cadence.workflow.GetVersionOptions;
import com.uber.cadence.workflow.Promise;
import com.uber.cadence.workflow.Workflow;
import com.uber.m3.tally.Scope;
Expand Down Expand Up @@ -302,6 +303,23 @@ public int getVersion(
return results.getVersion();
}

@Override
public int getVersion(
String changeID,
DataConverter converter,
int minSupported,
int maxSupported,
GetVersionOptions options) {
final ClockDecisionContext.GetVersionResult results =
workflowClock.getVersion(changeID, converter, minSupported, maxSupported, options);
if (results.shouldUpdateCadenceChangeVersion()) {
upsertSearchAttributes(
InternalUtils.convertMapToSearchAttributes(
results.getSearchAttributesForChangeVersion()));
}
return results.getVersion();
}

@Override
public long currentTimeMillis() {
return workflowClock.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
import com.uber.cadence.workflow.Functions.Func;
import com.uber.cadence.workflow.Functions.Func1;
import com.uber.cadence.workflow.GetVersionOptions;
import com.uber.cadence.workflow.Promise;
import com.uber.m3.tally.Scope;
import java.time.Duration;
Expand Down Expand Up @@ -695,6 +696,16 @@ public int getVersion(
throw new UnsupportedOperationException("not implemented");
}

@Override
public int getVersion(
String changeID,
DataConverter converter,
int minSupported,
int maxSupported,
GetVersionOptions options) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public Random newRandom() {
throw new UnsupportedOperationException("not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.uber.cadence.workflow.ContinueAsNewOptions;
import com.uber.cadence.workflow.Functions;
import com.uber.cadence.workflow.Functions.Func;
import com.uber.cadence.workflow.GetVersionOptions;
import com.uber.cadence.workflow.Promise;
import com.uber.cadence.workflow.SignalExternalWorkflowException;
import com.uber.cadence.workflow.Workflow;
Expand Down Expand Up @@ -620,6 +621,12 @@ public int getVersion(String changeID, int minSupported, int maxSupported) {
return context.getVersion(changeID, converter, minSupported, maxSupported);
}

@Override
public int getVersion(
String changeID, int minSupported, int maxSupported, GetVersionOptions options) {
return context.getVersion(changeID, converter, minSupported, maxSupported, options);
}

void fireTimers() {
timers.fireTimers(context.currentTimeMillis());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ public int getVersion(String changeID, int minSupported, int maxSupported) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public int getVersion(
String changeID, int minSupported, int maxSupported, GetVersionOptions options) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void continueAsNew(
Optional<String> workflowType, Optional<ContinueAsNewOptions> options, Object[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.uber.cadence.workflow.ExternalWorkflowStub;
import com.uber.cadence.workflow.Functions;
import com.uber.cadence.workflow.Functions.Func;
import com.uber.cadence.workflow.GetVersionOptions;
import com.uber.cadence.workflow.Promise;
import com.uber.cadence.workflow.QueryMethod;
import com.uber.cadence.workflow.Workflow;
Expand Down Expand Up @@ -253,6 +254,20 @@ public static int getVersion(String changeID, int minSupported, int maxSupported
return getWorkflowInterceptor().getVersion(changeID, minSupported, maxSupported);
}

/**
* Enhanced version of getVersion with additional options for version control.
*
* @param changeID identifier of a particular change
* @param minSupported min version supported for the change
* @param maxSupported max version supported for the change
* @param options version control options
* @return version
*/
public static int getVersion(
String changeID, int minSupported, int maxSupported, GetVersionOptions options) {
return getWorkflowInterceptor().getVersion(changeID, minSupported, maxSupported, options);
}

public static <U> Promise<List<U>> promiseAllOf(Collection<Promise<U>> promises) {
return new AllOfPromise<>(promises);
}
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/com/uber/cadence/workflow/GetVersionOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.workflow;

import java.util.Optional;

/**
* Options for configuring GetVersion behavior. This class provides a builder pattern for
* configuring version control options.
*
* <p>Example usage:
*
* <pre><code>
* // Force a specific version
* GetVersionOptions options = GetVersionOptions.newBuilder()
* .executeWithVersion(2)
* .build();
*
* // Use minimum supported version
* GetVersionOptions options = GetVersionOptions.newBuilder()
* .executeWithMinVersion()
* .build();
* </code></pre>
*/
public final class GetVersionOptions {
private final Optional<Integer> customVersion;
private final boolean useMinVersion;

private GetVersionOptions(Optional<Integer> customVersion, boolean useMinVersion) {
this.customVersion = customVersion;
this.useMinVersion = useMinVersion;
}

/** Returns the custom version if specified, otherwise empty. */
public Optional<Integer> getCustomVersion() {
return customVersion;
}

/** Returns true if the minimum version should be used instead of maximum version. */
public boolean isUseMinVersion() {
return useMinVersion;
}

/** Creates a new builder for GetVersionOptions. */
public static Builder newBuilder() {
return new Builder();
}

/** Builder for GetVersionOptions. */
public static class Builder {
private Optional<Integer> customVersion = Optional.empty();
private boolean useMinVersion = false;

/**
* Forces a specific version to be returned when executed for the first time, instead of
* returning maxSupported version.
*
* @param version the specific version to use
* @return this builder
*/
public Builder executeWithVersion(int version) {
this.customVersion = Optional.of(version);
return this;
}

/**
* Makes GetVersion return minSupported version when executed for the first time, instead of
* returning maxSupported version.
*
* @return this builder
*/
public Builder executeWithMinVersion() {
this.useMinVersion = true;
return this;
}

/**
* Builds the GetVersionOptions instance.
*
* @return the configured GetVersionOptions
*/
public GetVersionOptions build() {
return new GetVersionOptions(customVersion, useMinVersion);
}
}
}
Loading