Skip to content

Commit 4ee6be0

Browse files
siri-varmaartur-ciocanusalaboy
authored
Add Failure Policy for Jobs SDK (#1448)
* Update CONTRIBUTING.md Signed-off-by: Siri Varma Vegiraju <[email protected]> * Add failrue policy Signed-off-by: sirivarma <[email protected]> * Add tests Signed-off-by: sirivarma <[email protected]> * Add Tests Signed-off-by: sirivarma <[email protected]> * Upgrading to 1.15.7 (#1458) * upgrading to 1.15.7 Signed-off-by: salaboy <[email protected]> * using DAPR VERSION Signed-off-by: salaboy <[email protected]> --------- Signed-off-by: salaboy <[email protected]> Signed-off-by: siri-varma <[email protected]> * Rename classes Signed-off-by: siri-varma <[email protected]> * add rc Signed-off-by: sirivarma <[email protected]> * fix checkstyle Signed-off-by: sirivarma <[email protected]> * Fix things Signed-off-by: sirivarma <[email protected]> * Test latest Signed-off-by: sirivarma <[email protected]> * fix checkstyle Signed-off-by: sirivarma <[email protected]> * Address comments Signed-off-by: sirivarma <[email protected]> * Address comments Signed-off-by: sirivarma <[email protected]> --------- Signed-off-by: Siri Varma Vegiraju <[email protected]> Signed-off-by: sirivarma <[email protected]> Signed-off-by: salaboy <[email protected]> Signed-off-by: siri-varma <[email protected]> Co-authored-by: artur-ciocanu <[email protected]> Co-authored-by: salaboy <[email protected]>
1 parent f7ec558 commit 4ee6be0

File tree

9 files changed

+609
-5
lines changed

9 files changed

+609
-5
lines changed

sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
package io.dapr.it.testcontainers.jobs;
1515

1616
import io.dapr.client.DaprPreviewClient;
17+
import io.dapr.client.domain.ConstantFailurePolicy;
1718
import io.dapr.client.domain.DeleteJobRequest;
19+
import io.dapr.client.domain.DropFailurePolicy;
20+
import io.dapr.client.domain.FailurePolicyType;
1821
import io.dapr.client.domain.GetJobRequest;
1922
import io.dapr.client.domain.GetJobResponse;
2023
import io.dapr.client.domain.JobSchedule;
@@ -25,6 +28,7 @@
2528
import org.junit.jupiter.api.BeforeEach;
2629
import org.junit.jupiter.api.Tag;
2730
import org.junit.jupiter.api.Test;
31+
import org.junit.runner.notification.Failure;
2832
import org.springframework.beans.factory.annotation.Autowired;
2933
import org.springframework.boot.test.context.SpringBootTest;
3034
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -34,6 +38,7 @@
3438
import org.testcontainers.junit.jupiter.Container;
3539
import org.testcontainers.junit.jupiter.Testcontainers;
3640

41+
import java.time.Duration;
3742
import java.time.Instant;
3843
import java.time.ZoneOffset;
3944
import java.time.format.DateTimeFormatter;
@@ -97,6 +102,9 @@ public void testJobScheduleCreationWithDueTime() {
97102

98103
GetJobResponse getJobResponse =
99104
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
105+
106+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
107+
100108
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
101109
assertEquals("Job", getJobResponse.getName());
102110
}
@@ -112,6 +120,9 @@ public void testJobScheduleCreationWithSchedule() {
112120

113121
GetJobResponse getJobResponse =
114122
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
123+
124+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
125+
115126
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
116127
assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression());
117128
assertEquals("Job", getJobResponse.getName());
@@ -134,6 +145,9 @@ public void testJobScheduleCreationWithAllParameters() {
134145

135146
GetJobResponse getJobResponse =
136147
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
148+
149+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
150+
137151
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
138152
assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression());
139153
assertEquals("Job", getJobResponse.getName());
@@ -143,6 +157,57 @@ public void testJobScheduleCreationWithAllParameters() {
143157
getJobResponse.getTtl().toString());
144158
}
145159

160+
@Test
161+
public void testJobScheduleCreationWithDropFailurePolicy() {
162+
Instant currentTime = Instant.now();
163+
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
164+
.withZone(ZoneOffset.UTC);
165+
166+
String cronExpression = "2 * 3 * * FRI";
167+
168+
daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
169+
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
170+
.setData("Job data".getBytes())
171+
.setRepeat(3)
172+
.setFailurePolicy(new DropFailurePolicy())
173+
.setSchedule(JobSchedule.fromString(cronExpression))).block();
174+
175+
GetJobResponse getJobResponse =
176+
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
177+
178+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
179+
180+
assertEquals(FailurePolicyType.DROP, getJobResponse.getFailurePolicy().getFailurePolicyType());
181+
}
182+
183+
@Test
184+
public void testJobScheduleCreationWithConstantFailurePolicy() {
185+
Instant currentTime = Instant.now();
186+
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
187+
.withZone(ZoneOffset.UTC);
188+
189+
String cronExpression = "2 * 3 * * FRI";
190+
191+
daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
192+
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
193+
.setData("Job data".getBytes())
194+
.setRepeat(3)
195+
.setFailurePolicy(new ConstantFailurePolicy(3)
196+
.setDurationBetweenRetries(Duration.of(10, ChronoUnit.SECONDS)))
197+
.setSchedule(JobSchedule.fromString(cronExpression))).block();
198+
199+
GetJobResponse getJobResponse =
200+
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
201+
202+
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
203+
204+
ConstantFailurePolicy jobFailurePolicyConstant = (ConstantFailurePolicy) getJobResponse.getFailurePolicy();
205+
assertEquals(FailurePolicyType.CONSTANT, getJobResponse.getFailurePolicy().getFailurePolicyType());
206+
assertEquals(3, (int)jobFailurePolicyConstant.getMaxRetries());
207+
assertEquals(Duration.of(10, ChronoUnit.SECONDS).getNano(),
208+
jobFailurePolicyConstant.getDurationBetweenRetries().getNano());
209+
}
210+
146211
@Test
147212
public void testDeleteJobRequest() {
148213
Instant currentTime = Instant.now();

sdk/src/main/java/io/dapr/client/DaprClientImpl.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,18 @@
2727
import io.dapr.client.domain.CloudEvent;
2828
import io.dapr.client.domain.ComponentMetadata;
2929
import io.dapr.client.domain.ConfigurationItem;
30+
import io.dapr.client.domain.ConstantFailurePolicy;
3031
import io.dapr.client.domain.ConversationInput;
3132
import io.dapr.client.domain.ConversationOutput;
3233
import io.dapr.client.domain.ConversationRequest;
3334
import io.dapr.client.domain.ConversationResponse;
3435
import io.dapr.client.domain.DaprMetadata;
3536
import io.dapr.client.domain.DeleteJobRequest;
3637
import io.dapr.client.domain.DeleteStateRequest;
38+
import io.dapr.client.domain.DropFailurePolicy;
3739
import io.dapr.client.domain.ExecuteStateTransactionRequest;
40+
import io.dapr.client.domain.FailurePolicy;
41+
import io.dapr.client.domain.FailurePolicyType;
3842
import io.dapr.client.domain.GetBulkSecretRequest;
3943
import io.dapr.client.domain.GetBulkStateRequest;
4044
import io.dapr.client.domain.GetConfigurationRequest;
@@ -105,6 +109,7 @@
105109
import java.time.Instant;
106110
import java.time.ZoneOffset;
107111
import java.time.format.DateTimeFormatter;
112+
import java.time.temporal.ChronoUnit;
108113
import java.util.ArrayList;
109114
import java.util.Arrays;
110115
import java.util.Collections;
@@ -1336,6 +1341,10 @@ public Mono<Void> scheduleJob(ScheduleJobRequest scheduleJobRequest) {
13361341
scheduleJobRequestBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime()));
13371342
}
13381343

1344+
if (scheduleJobRequest.getFailurePolicy() != null) {
1345+
scheduleJobRequestBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
1346+
}
1347+
13391348
scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite());
13401349

13411350
Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono =
@@ -1380,6 +1389,10 @@ public Mono<GetJobResponse> getJob(GetJobRequest getJobRequest) {
13801389
getJobResponse = new GetJobResponse(job.getName(), Instant.parse(job.getDueTime()));
13811390
}
13821391

1392+
if (job.hasFailurePolicy()) {
1393+
getJobResponse.setFailurePolicy(getJobFailurePolicy(job.getFailurePolicy()));
1394+
}
1395+
13831396
return getJobResponse
13841397
.setTtl(job.hasTtl() ? Instant.parse(job.getTtl()) : null)
13851398
.setData(job.hasData() ? job.getData().getValue().toByteArray() : null)
@@ -1390,6 +1403,53 @@ public Mono<GetJobResponse> getJob(GetJobRequest getJobRequest) {
13901403
}
13911404
}
13921405

1406+
private FailurePolicy getJobFailurePolicy(CommonProtos.JobFailurePolicy jobFailurePolicy) {
1407+
if (jobFailurePolicy.hasDrop()) {
1408+
return new DropFailurePolicy();
1409+
}
1410+
1411+
CommonProtos.JobFailurePolicyConstant jobFailurePolicyConstant = jobFailurePolicy.getConstant();
1412+
if (jobFailurePolicyConstant.hasInterval() && jobFailurePolicyConstant.hasMaxRetries()) {
1413+
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries())
1414+
.setDurationBetweenRetries(Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
1415+
ChronoUnit.NANOS));
1416+
}
1417+
1418+
if (jobFailurePolicyConstant.hasMaxRetries()) {
1419+
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries());
1420+
}
1421+
1422+
return new ConstantFailurePolicy(
1423+
Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
1424+
ChronoUnit.NANOS));
1425+
}
1426+
1427+
private CommonProtos.JobFailurePolicy getJobFailurePolicy(FailurePolicy failurePolicy) {
1428+
CommonProtos.JobFailurePolicy.Builder jobFailurePolicyBuilder = CommonProtos.JobFailurePolicy.newBuilder();
1429+
1430+
if (failurePolicy.getFailurePolicyType() == FailurePolicyType.DROP) {
1431+
jobFailurePolicyBuilder.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build());
1432+
return jobFailurePolicyBuilder.build();
1433+
}
1434+
1435+
CommonProtos.JobFailurePolicyConstant.Builder constantPolicyBuilder =
1436+
CommonProtos.JobFailurePolicyConstant.newBuilder();
1437+
ConstantFailurePolicy jobConstantFailurePolicy = (ConstantFailurePolicy)failurePolicy;
1438+
1439+
if (jobConstantFailurePolicy.getMaxRetries() != null) {
1440+
constantPolicyBuilder.setMaxRetries(jobConstantFailurePolicy.getMaxRetries());
1441+
}
1442+
1443+
if (jobConstantFailurePolicy.getDurationBetweenRetries() != null) {
1444+
constantPolicyBuilder.setInterval(com.google.protobuf.Duration.newBuilder()
1445+
.setNanos(jobConstantFailurePolicy.getDurationBetweenRetries().getNano()).build());
1446+
}
1447+
1448+
jobFailurePolicyBuilder.setConstant(constantPolicyBuilder.build());
1449+
1450+
return jobFailurePolicyBuilder.build();
1451+
}
1452+
13931453
/**
13941454
* {@inheritDoc}
13951455
*/
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain;
15+
16+
import java.time.Duration;
17+
18+
/**
19+
* A failure policy that applies a constant retry interval for job retries.
20+
* This implementation of {@link FailurePolicy} retries a job a fixed number of times
21+
* with a constant delay between each retry attempt.
22+
*/
23+
public class ConstantFailurePolicy implements FailurePolicy {
24+
25+
private Integer maxRetries;
26+
private Duration durationBetweenRetries;
27+
28+
/**
29+
* Constructs a {@code JobConstantFailurePolicy} with the specified maximum number of retries.
30+
*
31+
* @param maxRetries the maximum number of retries
32+
*/
33+
public ConstantFailurePolicy(Integer maxRetries) {
34+
this.maxRetries = maxRetries;
35+
}
36+
37+
/**
38+
* Constructs a {@code JobConstantFailurePolicy} with the specified duration between retries.
39+
*
40+
* @param durationBetweenRetries the duration to wait between retries
41+
*/
42+
public ConstantFailurePolicy(Duration durationBetweenRetries) {
43+
this.durationBetweenRetries = durationBetweenRetries;
44+
}
45+
46+
/**
47+
* Sets the duration to wait between retry attempts.
48+
*
49+
* @param durationBetweenRetries the duration between retries
50+
* @return a {@code JobFailurePolicyConstant}.
51+
*/
52+
public ConstantFailurePolicy setDurationBetweenRetries(Duration durationBetweenRetries) {
53+
this.durationBetweenRetries = durationBetweenRetries;
54+
return this;
55+
}
56+
57+
/**
58+
* Sets the maximum number of retries allowed.
59+
*
60+
* @param maxRetries the number of retries
61+
* @return a {@code JobFailurePolicyConstant}.
62+
*/
63+
public ConstantFailurePolicy setMaxRetries(int maxRetries) {
64+
this.maxRetries = maxRetries;
65+
return this;
66+
}
67+
68+
/**
69+
* Returns the configured duration between retry attempts.
70+
*
71+
* @return the duration between retries
72+
*/
73+
public Duration getDurationBetweenRetries() {
74+
return this.durationBetweenRetries;
75+
}
76+
77+
/**
78+
* Returns the configured maximum number of retries.
79+
*
80+
* @return the maximum number of retries
81+
*/
82+
public Integer getMaxRetries() {
83+
return this.maxRetries;
84+
}
85+
86+
/**
87+
* Returns the type of failure policy.
88+
*
89+
* @return {@link FailurePolicyType#CONSTANT}
90+
*/
91+
@Override
92+
public FailurePolicyType getFailurePolicyType() {
93+
return FailurePolicyType.CONSTANT;
94+
}
95+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2021 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain;
15+
16+
/**
17+
* A failure policy that drops the job upon failure without retrying.
18+
* This implementation of {@link FailurePolicy} immediately discards failed jobs
19+
* instead of retrying them.
20+
*/
21+
public class DropFailurePolicy implements FailurePolicy {
22+
23+
/**
24+
* Returns the type of failure policy.
25+
*
26+
* @return {@link FailurePolicyType#DROP}
27+
*/
28+
@Override
29+
public FailurePolicyType getFailurePolicyType() {
30+
return FailurePolicyType.DROP;
31+
}
32+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain;
15+
16+
/**
17+
* Set a failure policy for the job.
18+
*/
19+
public interface FailurePolicy {
20+
FailurePolicyType getFailurePolicyType();
21+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.client.domain;
15+
16+
public enum FailurePolicyType {
17+
DROP,
18+
19+
CONSTANT
20+
}

0 commit comments

Comments
 (0)