Skip to content

Commit 4bcbbf0

Browse files
committed
initial commit
Signed-off-by: salaboy <[email protected]>
1 parent 08478a7 commit 4bcbbf0

File tree

4 files changed

+67
-18
lines changed

4 files changed

+67
-18
lines changed

client/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ plugins {
1111
}
1212

1313
group 'io.dapr'
14-
version = '1.5.10'
14+
version = '1.5.11-SNAPSHOT'
1515
archivesBaseName = 'durabletask-client'
1616

1717
def grpcVersion = '1.69.0'

client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,19 @@ default Task<Task<?>> anyOf(Task<?>... tasks) {
178178
*/
179179
Task<Void> createTimer(Duration delay);
180180

181+
/**
182+
* Creates a durable timer that expires after the specified delay.
183+
* <p>
184+
* Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
185+
* internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However,
186+
* it may be visible in framework logs and the stored history state.
187+
*
188+
* @param name of the timer
189+
* @param delay the amount of time before the timer should expire
190+
* @return a new {@code Task} that completes after the specified delay
191+
*/
192+
Task<Void> createTimer(String name, Duration delay);
193+
181194
/**
182195
* Creates a durable timer that expires after the specified timestamp with specific zone.
183196
* <p>
@@ -190,6 +203,19 @@ default Task<Task<?>> anyOf(Task<?>... tasks) {
190203
*/
191204
Task<Void> createTimer(ZonedDateTime zonedDateTime);
192205

206+
/**
207+
* Creates a durable timer that expires after the specified timestamp with specific zone.
208+
* <p>
209+
* Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
210+
* internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However,
211+
* it may be visible in framework logs and the stored history state.
212+
*
213+
* @param name for the timer
214+
* @param zonedDateTime timestamp with specific zone when the timer should expire
215+
* @return a new {@code Task} that completes after the specified delay
216+
*/
217+
Task<Void> createTimer(String name, ZonedDateTime zonedDateTime);
218+
193219
/**
194220
* Transitions the orchestration into the {@link OrchestrationRuntimeStatus#COMPLETED} state with the given output.
195221
*

client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ public <V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V>
492492
// If a non-infinite timeout is specified, schedule an internal durable timer.
493493
// If the timer expires and the external event task hasn't yet completed, we'll cancel the task.
494494
if (hasTimeout) {
495-
this.createTimer(timeout).future.thenRun(() -> {
495+
this.createTimer(name, timeout).future.thenRun(() -> {
496496
if (!eventTask.isDone()) {
497497
// Book-keeping - remove the task record for the canceled task
498498
eventQueue.removeIf(t -> t.task == eventTask);
@@ -632,7 +632,26 @@ public Task<Void> createTimer(Duration duration) {
632632
Helpers.throwIfArgumentNull(duration, "duration");
633633

634634
Instant finalFireAt = this.currentInstant.plus(duration);
635-
return createTimer(finalFireAt);
635+
return createTimer(orchestratorName, finalFireAt);
636+
}
637+
638+
public Task<Void> createTimer(String name, Duration duration) {
639+
Helpers.throwIfOrchestratorComplete(this.isComplete);
640+
Helpers.throwIfArgumentNull(duration, "duration");
641+
Helpers.throwIfArgumentNull(name, "name");
642+
643+
Instant finalFireAt = this.currentInstant.plus(duration);
644+
return createTimer(name, finalFireAt);
645+
}
646+
647+
@Override
648+
public Task<Void> createTimer(String name, ZonedDateTime zonedDateTime) {
649+
Helpers.throwIfOrchestratorComplete(this.isComplete);
650+
Helpers.throwIfArgumentNull(zonedDateTime, "zonedDateTime");
651+
Helpers.throwIfArgumentNull(name, "name");
652+
653+
Instant finalFireAt = zonedDateTime.toInstant();
654+
return createTimer(name, finalFireAt);
636655
}
637656

638657
@Override
@@ -641,18 +660,19 @@ public Task<Void> createTimer(ZonedDateTime zonedDateTime) {
641660
Helpers.throwIfArgumentNull(zonedDateTime, "zonedDateTime");
642661

643662
Instant finalFireAt = zonedDateTime.toInstant();
644-
return createTimer(finalFireAt);
663+
return createTimer(orchestratorName + "-timer", finalFireAt);
645664
}
646665

647-
private Task<Void> createTimer(Instant finalFireAt) {
648-
return new TimerTask(finalFireAt);
666+
private Task<Void> createTimer(String name, Instant finalFireAt) {
667+
return new TimerTask(name, finalFireAt);
649668
}
650669

651-
private CompletableTask<Void> createInstantTimer(int id, Instant fireAt) {
670+
private CompletableTask<Void> createInstantTimer(String name, int id, Instant fireAt) {
652671
Timestamp ts = DataConverter.getTimestampFromInstant(fireAt);
653672
this.pendingActions.put(id, OrchestratorAction.newBuilder()
654673
.setId(id)
655-
.setCreateTimer(CreateTimerAction.newBuilder().setFireAt(ts))
674+
.setCreateTimer(CreateTimerAction.newBuilder()
675+
.setName(name + "-timer-" + id).setFireAt(ts))
656676
.build());
657677

658678
if (!this.isReplaying) {
@@ -1022,10 +1042,10 @@ private class TimerTask extends CompletableTask<Void> {
10221042
private Instant finalFireAt;
10231043
CompletableTask<Void> task;
10241044

1025-
public TimerTask(Instant finalFireAt) {
1045+
public TimerTask(String name, Instant finalFireAt) {
10261046
super();
1027-
CompletableTask<Void> firstTimer = createTimerTask(finalFireAt);
1028-
CompletableFuture<Void> timerChain = createTimerChain(finalFireAt, firstTimer.future);
1047+
CompletableTask<Void> firstTimer = createTimerTask(name, finalFireAt);
1048+
CompletableFuture<Void> timerChain = createTimerChain(name, finalFireAt, firstTimer.future);
10291049
this.task = new CompletableTask<>(timerChain);
10301050
this.finalFireAt = finalFireAt;
10311051
}
@@ -1035,26 +1055,26 @@ public TimerTask(Instant finalFireAt) {
10351055
// currentFuture completes, we check if we have not yet reached finalFireAt. If that is the case, we create a new sub-timer
10361056
// task and make a recursive call on that new sub-timer task so that once it completes, another sub-timer task is created
10371057
// if necessary. Otherwise, we return and no more sub-timers are created.
1038-
private CompletableFuture<Void> createTimerChain(Instant finalFireAt, CompletableFuture<Void> currentFuture) {
1058+
private CompletableFuture<Void> createTimerChain(String name, Instant finalFireAt, CompletableFuture<Void> currentFuture) {
10391059
return currentFuture.thenRun(() -> {
10401060
Instant currentInstsanceMinusNanos = currentInstant.minusNanos(currentInstant.getNano());
10411061
Instant finalFireAtMinusNanos = finalFireAt.minusNanos(finalFireAt.getNano());
10421062
if (currentInstsanceMinusNanos.compareTo(finalFireAtMinusNanos) >= 0) {
10431063
return;
10441064
}
1045-
Task<Void> nextTimer = createTimerTask(finalFireAt);
1046-
createTimerChain(finalFireAt, nextTimer.future);
1065+
Task<Void> nextTimer = createTimerTask(name + "-next", finalFireAt);
1066+
createTimerChain(name, finalFireAt, nextTimer.future);
10471067
});
10481068
}
10491069

1050-
private CompletableTask<Void> createTimerTask(Instant finalFireAt) {
1070+
private CompletableTask<Void> createTimerTask(String name, Instant finalFireAt) {
10511071
CompletableTask<Void> nextTimer;
10521072
Duration remainingTime = Duration.between(currentInstant, finalFireAt);
10531073
if (remainingTime.compareTo(maximumTimerInterval) > 0) {
10541074
Instant nextFireAt = currentInstant.plus(maximumTimerInterval);
1055-
nextTimer = createInstantTimer(sequenceNumber++, nextFireAt);
1075+
nextTimer = createInstantTimer(name, sequenceNumber++, nextFireAt);
10561076
} else {
1057-
nextTimer = createInstantTimer(sequenceNumber++, finalFireAt);
1077+
nextTimer = createInstantTimer(name, sequenceNumber++, finalFireAt);
10581078
}
10591079
nextTimer.setParentTask(this);
10601080
return nextTimer;
@@ -1185,7 +1205,7 @@ public void tryRetry(TaskFailedException ex) {
11851205
Duration delay = this.getNextDelay();
11861206
if (!delay.isZero() && !delay.isNegative()) {
11871207
// Use a durable timer to create the delay between retries
1188-
this.context.createTimer(delay).await();
1208+
this.context.createTimer(getName() + "-retry",delay).await();
11891209
}
11901210

11911211
this.totalRetryTime = Duration.between(this.startTime, this.context.getCurrentInstant());

client/src/test/java/io/dapr/durabletask/IntegrationTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ void emptyOrchestration() throws TimeoutException {
7676
defaultTimeout,
7777
true);
7878

79+
7980
assertNotNull(instance);
8081
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
8182
assertEquals(input, instance.readInputAs(String.class));
@@ -174,6 +175,8 @@ void loopWithTimer() throws IOException, TimeoutException {
174175

175176

176177
}
178+
179+
177180
}
178181

179182
@Test

0 commit comments

Comments
 (0)