Skip to content

Add activity tasklist rate limiter option to worker options #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.uber.cadence.PollForActivityTaskResponse;
import com.uber.cadence.ServiceBusyError;
import com.uber.cadence.TaskList;
import com.uber.cadence.TaskListMetadata;
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.m3.tally.Stopwatch;
Expand Down Expand Up @@ -56,6 +57,13 @@ public ActivityWorker.MeasurableActivityTask poll() throws TException {
pollRequest.setDomain(domain);
pollRequest.setIdentity(options.getIdentity());
pollRequest.setTaskList(new TaskList().setName(taskList));

if (options.getTaskListActivitiesPerSecond() > 0) {
TaskListMetadata metadata = new TaskListMetadata();
metadata.setMaxTasksPerSecond(options.getTaskListActivitiesPerSecond());
pollRequest.setTaskListMetadata(metadata);
}

if (log.isDebugEnabled()) {
log.debug("poll request begin: " + pollRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,15 @@ public final class SingleWorkerOptions {
public static final class Builder {

private String identity;

private DataConverter dataConverter;

private int taskExecutorThreadPoolSize = 100;

private double taskListActivitiesPerSecond;
private PollerOptions pollerOptions;

/** TODO: Dynamic expiration based on activity timeout */
private RetryOptions reportCompletionRetryOptions;

private RetryOptions reportFailureRetryOptions;

private Scope metricsScope;

private boolean enableLoggingInReplay;

public Builder() {}
Expand All @@ -52,6 +47,7 @@ public Builder(SingleWorkerOptions options) {
this.identity = options.getIdentity();
this.dataConverter = options.getDataConverter();
this.pollerOptions = options.getPollerOptions();
this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond();
this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize();
this.reportCompletionRetryOptions = options.getReportCompletionRetryOptions();
this.reportFailureRetryOptions = options.getReportFailureRetryOptions();
Expand Down Expand Up @@ -89,6 +85,21 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
return this;
}

public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) {
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
return this;
}

public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) {
this.reportCompletionRetryOptions = reportCompletionRetryOptions;
return this;
}

public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) {
this.reportFailureRetryOptions = reportFailureRetryOptions;
return this;
}

public SingleWorkerOptions build() {
if (reportCompletionRetryOptions == null) {
reportCompletionRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS;
Expand Down Expand Up @@ -119,44 +130,30 @@ public SingleWorkerOptions build() {
identity,
dataConverter,
taskExecutorThreadPoolSize,
taskListActivitiesPerSecond,
pollerOptions,
reportCompletionRetryOptions,
reportFailureRetryOptions,
metricsScope,
enableLoggingInReplay);
}

public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) {
this.reportCompletionRetryOptions = reportCompletionRetryOptions;
return this;
}

public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) {
this.reportFailureRetryOptions = reportFailureRetryOptions;
return this;
}
}

private final String identity;

private final DataConverter dataConverter;

private final int taskExecutorThreadPoolSize;

private final double taskListActivitiesPerSecond;
private final PollerOptions pollerOptions;

private final RetryOptions reportCompletionRetryOptions;

private final RetryOptions reportFailureRetryOptions;

private final Scope metricsScope;

private final boolean enableLoggingInReplay;

private SingleWorkerOptions(
String identity,
DataConverter dataConverter,
int taskExecutorThreadPoolSize,
double taskListActivitiesPerSecond,
PollerOptions pollerOptions,
RetryOptions reportCompletionRetryOptions,
RetryOptions reportFailureRetryOptions,
Expand All @@ -165,6 +162,7 @@ private SingleWorkerOptions(
this.identity = identity;
this.dataConverter = dataConverter;
this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize;
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
this.pollerOptions = pollerOptions;
this.reportCompletionRetryOptions = reportCompletionRetryOptions;
this.reportFailureRetryOptions = reportFailureRetryOptions;
Expand All @@ -180,22 +178,26 @@ public DataConverter getDataConverter() {
return dataConverter;
}

public int getTaskExecutorThreadPoolSize() {
int getTaskExecutorThreadPoolSize() {
return taskExecutorThreadPoolSize;
}

public PollerOptions getPollerOptions() {
PollerOptions getPollerOptions() {
return pollerOptions;
}

public RetryOptions getReportCompletionRetryOptions() {
RetryOptions getReportCompletionRetryOptions() {
return reportCompletionRetryOptions;
}

public RetryOptions getReportFailureRetryOptions() {
RetryOptions getReportFailureRetryOptions() {
return reportFailureRetryOptions;
}

double getTaskListActivitiesPerSecond() {
return taskListActivitiesPerSecond;
}

public Scope getMetricsScope() {
return metricsScope;
}
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/uber/cadence/worker/WorkerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static final class Builder {
private int maxConcurrentActivityExecutionSize = 100;
private int maxConcurrentWorkflowExecutionSize = 50;
private int maxConcurrentLocalActivityExecutionSize = 100;
private double taskListActivitiesPerSecond = 100000;
private PollerOptions activityPollerOptions;
private PollerOptions workflowPollerOptions;
private RetryOptions reportActivityCompletionRetryOptions;
Expand Down Expand Up @@ -186,6 +187,19 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
return this;
}

/**
* Optional: Sets the rate limiting on number of activities that can be executed per second.
* This is managed by the server and controls activities per second for your entire tasklist.
* Notice that the number is represented in double, so that you can set it to less than 1 if
* needed. For example, set the number to 0.1 means you want your activity to be executed once
* every 10 seconds. This can be used to protect down stream services from flooding. The zero
* value of this uses the default value. Default: 100k
*/
public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) {
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
return this;
}

public WorkerOptions build() {
if (identity == null) {
identity = ManagementFactory.getRuntimeMXBean().getName();
Expand All @@ -204,6 +218,7 @@ public WorkerOptions build() {
maxConcurrentActivityExecutionSize,
maxConcurrentWorkflowExecutionSize,
maxConcurrentLocalActivityExecutionSize,
taskListActivitiesPerSecond,
activityPollerOptions,
workflowPollerOptions,
reportActivityCompletionRetryOptions,
Expand All @@ -224,6 +239,7 @@ public WorkerOptions build() {
private final int maxConcurrentActivityExecutionSize;
private final int maxConcurrentWorkflowExecutionSize;
private final int maxConcurrentLocalActivityExecutionSize;
private final double taskListActivitiesPerSecond;
private final PollerOptions activityPollerOptions;
private final PollerOptions workflowPollerOptions;
private final RetryOptions reportActivityCompletionRetryOptions;
Expand All @@ -243,6 +259,7 @@ private WorkerOptions(
int maxConcurrentActivityExecutionSize,
int maxConcurrentWorkflowExecutionSize,
int maxConcurrentLocalActivityExecutionSize,
double taskListActivitiesPerSecond,
PollerOptions activityPollerOptions,
PollerOptions workflowPollerOptions,
RetryOptions reportActivityCompletionRetryOptions,
Expand All @@ -260,6 +277,7 @@ private WorkerOptions(
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
this.maxConcurrentWorkflowExecutionSize = maxConcurrentWorkflowExecutionSize;
this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize;
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
this.activityPollerOptions = activityPollerOptions;
this.workflowPollerOptions = workflowPollerOptions;
this.reportActivityCompletionRetryOptions = reportActivityCompletionRetryOptions;
Expand Down Expand Up @@ -359,6 +377,8 @@ public String toString() {
+ maxConcurrentWorkflowExecutionSize
+ ", maxConcurrentLocalActivityExecutionSize="
+ maxConcurrentLocalActivityExecutionSize
+ ", taskListActivitiesPerSecond="
+ taskListActivitiesPerSecond
+ ", activityPollerOptions="
+ activityPollerOptions
+ ", workflowPollerOptions="
Expand Down