Skip to content

Add Failure Policy for Jobs SDK #1448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Aug 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bb56b8d
Update CONTRIBUTING.md
siri-varma Apr 29, 2025
f09bf44
Merge branch 'dapr:master' into master
siri-varma Apr 29, 2025
64d2fe3
Merge branch 'dapr:master' into master
siri-varma May 5, 2025
d50775c
Merge branch 'dapr:master' into master
siri-varma May 8, 2025
0b3e757
Merge branch 'dapr:master' into master
siri-varma May 10, 2025
c30f537
Merge branch 'dapr:master' into master
siri-varma May 23, 2025
29eac2e
Merge branch 'master' of https://github.com/siri-varma/java-sdk
siri-varma Jul 13, 2025
b713413
Add failrue policy
siri-varma Jul 13, 2025
8c05789
Add tests
siri-varma Jul 13, 2025
3e5a384
Add Tests
siri-varma Jul 13, 2025
8828335
Merge branch 'master' into users/svegiraju/add-failure-policy
siri-varma Jul 15, 2025
a0b8a48
Merge branch 'master' into users/svegiraju/add-failure-policy
artur-ciocanu Jul 16, 2025
6cd3bca
Merge branch 'master' into users/svegiraju/add-failure-policy
siri-varma Jul 19, 2025
3b9fc1c
Upgrading to 1.15.7 (#1458)
salaboy Jul 18, 2025
6a07cb3
Rename classes
siri-varma Jul 21, 2025
7e64c73
Merge branch 'master' into users/svegiraju/add-failure-policy
artur-ciocanu Jul 22, 2025
9c0c66e
Merge branch 'master' into users/svegiraju/add-failure-policy
artur-ciocanu Jul 23, 2025
1eb4ad0
Merge branch 'master' into users/svegiraju/add-failure-policy
siri-varma Jul 24, 2025
cbf5320
add rc
siri-varma Aug 1, 2025
cb0131e
add rc
siri-varma Aug 1, 2025
6902b30
fix checkstyle
siri-varma Aug 1, 2025
dd6fe55
Fix things
siri-varma Aug 1, 2025
66488ae
Test latest
siri-varma Aug 1, 2025
6ad648c
fix checkstyle
siri-varma Aug 1, 2025
aafee46
Merge pull request #2 from siri-varma/users/svegiraju/test-latest
siri-varma Aug 1, 2025
d023c9f
Merge branch 'master' into users/svegiraju/add-failure-policy
artur-ciocanu Aug 4, 2025
67c9d4a
Merge branch 'master' into users/svegiraju/add-failure-policy
siri-varma Aug 4, 2025
5b11ef8
Address comments
siri-varma Aug 5, 2025
8a0065d
Merge branch 'users/svegiraju/add-failure-policy' of https://github.c…
siri-varma Aug 5, 2025
28017f8
Address comments
siri-varma Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
package io.dapr.it.testcontainers.jobs;

import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.ConstantFailurePolicy;
import io.dapr.client.domain.DeleteJobRequest;
import io.dapr.client.domain.DropFailurePolicy;
import io.dapr.client.domain.FailurePolicyType;
import io.dapr.client.domain.GetJobRequest;
import io.dapr.client.domain.GetJobResponse;
import io.dapr.client.domain.JobSchedule;
Expand All @@ -25,6 +28,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.runner.notification.Failure;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
Expand All @@ -34,6 +38,7 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -97,6 +102,9 @@ public void testJobScheduleCreationWithDueTime() {

GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block();

daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();

assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
assertEquals("Job", getJobResponse.getName());
}
Expand All @@ -112,6 +120,9 @@ public void testJobScheduleCreationWithSchedule() {

GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block();

daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();

assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression());
assertEquals("Job", getJobResponse.getName());
Expand All @@ -134,6 +145,9 @@ public void testJobScheduleCreationWithAllParameters() {

GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block();

daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();

assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression());
assertEquals("Job", getJobResponse.getName());
Expand All @@ -143,6 +157,57 @@ public void testJobScheduleCreationWithAllParameters() {
getJobResponse.getTtl().toString());
}

@Test
public void testJobScheduleCreationWithDropFailurePolicy() {
Instant currentTime = Instant.now();
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC);

String cronExpression = "2 * 3 * * FRI";

daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
.setData("Job data".getBytes())
.setRepeat(3)
.setFailurePolicy(new DropFailurePolicy())
.setSchedule(JobSchedule.fromString(cronExpression))).block();

GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block();

daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();

assertEquals(FailurePolicyType.DROP, getJobResponse.getFailurePolicy().getFailurePolicyType());
}

@Test
public void testJobScheduleCreationWithConstantFailurePolicy() {
Instant currentTime = Instant.now();
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC);

String cronExpression = "2 * 3 * * FRI";

daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
.setData("Job data".getBytes())
.setRepeat(3)
.setFailurePolicy(new ConstantFailurePolicy(3)
.setDurationBetweenRetries(Duration.of(10, ChronoUnit.SECONDS)))
.setSchedule(JobSchedule.fromString(cronExpression))).block();

GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block();

daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();

ConstantFailurePolicy jobFailurePolicyConstant = (ConstantFailurePolicy) getJobResponse.getFailurePolicy();
assertEquals(FailurePolicyType.CONSTANT, getJobResponse.getFailurePolicy().getFailurePolicyType());
assertEquals(3, (int)jobFailurePolicyConstant.getMaxRetries());
assertEquals(Duration.of(10, ChronoUnit.SECONDS).getNano(),
jobFailurePolicyConstant.getDurationBetweenRetries().getNano());
}

@Test
public void testDeleteJobRequest() {
Instant currentTime = Instant.now();
Expand Down
60 changes: 60 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.ComponentMetadata;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.ConstantFailurePolicy;
import io.dapr.client.domain.ConversationInput;
import io.dapr.client.domain.ConversationOutput;
import io.dapr.client.domain.ConversationRequest;
import io.dapr.client.domain.ConversationResponse;
import io.dapr.client.domain.DaprMetadata;
import io.dapr.client.domain.DeleteJobRequest;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.DropFailurePolicy;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
import io.dapr.client.domain.FailurePolicy;
import io.dapr.client.domain.FailurePolicyType;
import io.dapr.client.domain.GetBulkSecretRequest;
import io.dapr.client.domain.GetBulkStateRequest;
import io.dapr.client.domain.GetConfigurationRequest;
Expand Down Expand Up @@ -105,6 +109,7 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1336,6 +1341,10 @@ public Mono<Void> scheduleJob(ScheduleJobRequest scheduleJobRequest) {
scheduleJobRequestBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime()));
}

if (scheduleJobRequest.getFailurePolicy() != null) {
scheduleJobRequestBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
}

scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite());

Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono =
Expand Down Expand Up @@ -1380,6 +1389,10 @@ public Mono<GetJobResponse> getJob(GetJobRequest getJobRequest) {
getJobResponse = new GetJobResponse(job.getName(), Instant.parse(job.getDueTime()));
}

if (job.hasFailurePolicy()) {
getJobResponse.setFailurePolicy(getJobFailurePolicy(job.getFailurePolicy()));
}

return getJobResponse
.setTtl(job.hasTtl() ? Instant.parse(job.getTtl()) : null)
.setData(job.hasData() ? job.getData().getValue().toByteArray() : null)
Expand All @@ -1390,6 +1403,53 @@ public Mono<GetJobResponse> getJob(GetJobRequest getJobRequest) {
}
}

private FailurePolicy getJobFailurePolicy(CommonProtos.JobFailurePolicy jobFailurePolicy) {
if (jobFailurePolicy.hasDrop()) {
return new DropFailurePolicy();
}

CommonProtos.JobFailurePolicyConstant jobFailurePolicyConstant = jobFailurePolicy.getConstant();
if (jobFailurePolicyConstant.hasInterval() && jobFailurePolicyConstant.hasMaxRetries()) {
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries())
.setDurationBetweenRetries(Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
ChronoUnit.NANOS));
}

if (jobFailurePolicyConstant.hasMaxRetries()) {
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries());
}

return new ConstantFailurePolicy(
Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
ChronoUnit.NANOS));
}

private CommonProtos.JobFailurePolicy getJobFailurePolicy(FailurePolicy failurePolicy) {
CommonProtos.JobFailurePolicy.Builder jobFailurePolicyBuilder = CommonProtos.JobFailurePolicy.newBuilder();

if (failurePolicy.getFailurePolicyType() == FailurePolicyType.DROP) {
jobFailurePolicyBuilder.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build());
return jobFailurePolicyBuilder.build();
}

CommonProtos.JobFailurePolicyConstant.Builder constantPolicyBuilder =
CommonProtos.JobFailurePolicyConstant.newBuilder();
ConstantFailurePolicy jobConstantFailurePolicy = (ConstantFailurePolicy)failurePolicy;

if (jobConstantFailurePolicy.getMaxRetries() != null) {
constantPolicyBuilder.setMaxRetries(jobConstantFailurePolicy.getMaxRetries());
}

if (jobConstantFailurePolicy.getDurationBetweenRetries() != null) {
constantPolicyBuilder.setInterval(com.google.protobuf.Duration.newBuilder()
.setNanos(jobConstantFailurePolicy.getDurationBetweenRetries().getNano()).build());
}

jobFailurePolicyBuilder.setConstant(constantPolicyBuilder.build());

return jobFailurePolicyBuilder.build();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.client.domain;

import java.time.Duration;

/**
* A failure policy that applies a constant retry interval for job retries.
* This implementation of {@link FailurePolicy} retries a job a fixed number of times
* with a constant delay between each retry attempt.
*/
public class ConstantFailurePolicy implements FailurePolicy {

private Integer maxRetries;
private Duration durationBetweenRetries;

/**
* Constructs a {@code JobConstantFailurePolicy} with the specified maximum number of retries.
*
* @param maxRetries the maximum number of retries
*/
public ConstantFailurePolicy(Integer maxRetries) {
this.maxRetries = maxRetries;
}

/**
* Constructs a {@code JobConstantFailurePolicy} with the specified duration between retries.
*
* @param durationBetweenRetries the duration to wait between retries
*/
public ConstantFailurePolicy(Duration durationBetweenRetries) {
this.durationBetweenRetries = durationBetweenRetries;
}

/**
* Sets the duration to wait between retry attempts.
*
* @param durationBetweenRetries the duration between retries
* @return a {@code JobFailurePolicyConstant}.
*/
public ConstantFailurePolicy setDurationBetweenRetries(Duration durationBetweenRetries) {
this.durationBetweenRetries = durationBetweenRetries;
return this;
}

/**
* Sets the maximum number of retries allowed.
*
* @param maxRetries the number of retries
* @return a {@code JobFailurePolicyConstant}.
*/
public ConstantFailurePolicy setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}

/**
* Returns the configured duration between retry attempts.
*
* @return the duration between retries
*/
public Duration getDurationBetweenRetries() {
return this.durationBetweenRetries;
}

/**
* Returns the configured maximum number of retries.
*
* @return the maximum number of retries
*/
public Integer getMaxRetries() {
return this.maxRetries;
}

/**
* Returns the type of failure policy.
*
* @return {@link FailurePolicyType#CONSTANT}
*/
@Override
public FailurePolicyType getFailurePolicyType() {
return FailurePolicyType.CONSTANT;
}
}
32 changes: 32 additions & 0 deletions sdk/src/main/java/io/dapr/client/domain/DropFailurePolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.client.domain;

/**
* A failure policy that drops the job upon failure without retrying.
* This implementation of {@link FailurePolicy} immediately discards failed jobs
* instead of retrying them.
*/
public class DropFailurePolicy implements FailurePolicy {

/**
* Returns the type of failure policy.
*
* @return {@link FailurePolicyType#DROP}
*/
@Override
public FailurePolicyType getFailurePolicyType() {
return FailurePolicyType.DROP;
}
}
21 changes: 21 additions & 0 deletions sdk/src/main/java/io/dapr/client/domain/FailurePolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.client.domain;

/**
* Set a failure policy for the job.
*/
public interface FailurePolicy {
FailurePolicyType getFailurePolicyType();
}
20 changes: 20 additions & 0 deletions sdk/src/main/java/io/dapr/client/domain/FailurePolicyType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.client.domain;

public enum FailurePolicyType {
DROP,

CONSTANT
}
Loading