Skip to content

Commit 287a352

Browse files
committed
adding tests to orchestrator
1 parent 2a267b9 commit 287a352

File tree

6 files changed

+203
-55
lines changed

6 files changed

+203
-55
lines changed

spring-boot-examples/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@
5252
<skip>true</skip>
5353
</configuration>
5454
</plugin>
55+
<plugin>
56+
<groupId>org.apache.maven.plugins</groupId>
57+
<artifactId>maven-failsafe-plugin</artifactId>
58+
<version>3.2.2</version>
59+
</plugin>
5560
</plugins>
5661
</build>
5762
</project>
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Cross App (remote activities) workflow Example
2+
3+
This example demonstrate how you can create distributed workflows where the orchestrator doesn't host the workflow activities.
4+
This example is composed by three Spring Boot applications:
5+
- `orchestrator`: The `orchestrator` app contains the Dapr Workflow definition and expose REST endpoints to create and raise events against workflow instances.
6+
- `worker-one`: The `worker-one` app contains the `RegisterCustomerActivity` definition, which will be orchstrated by the `orchestrator` app.
7+
- `worker-two`: The `worker-two` app contains the `CustomerFollowupActivity` definition, which will be orchstrated by the `orchestrator` app.
8+
9+
To start the applications you need to run the following commands on separate terminals, starting from the `remote-activities` directory.
10+
To start the `orchestrator` app run:
11+
```bash
12+
cd orchestrator/
13+
mvn -Dspring-boot.run.arguments="--reuse=true" clean spring-boot:test-run
14+
```
15+
16+
The `orchestrator` application will run on port `8080`.
17+
18+
On a separate terminal, to start the `worker-one` app run:
19+
```bash
20+
cd worker-one/
21+
mvn -Dspring-boot.run.arguments="--reuse=true" clean spring-boot:test-run
22+
```
23+
24+
The `worker-one` application will run on port `8081`.
25+
26+
On a separate terminal, to start the `worker-two` app run:
27+
```bash
28+
cd worker-two/
29+
mvn -Dspring-boot.run.arguments="--reuse=true" clean spring-boot:test-run
30+
```
31+
32+
The `worker-two` application will run on port `8082`.
33+
34+
You can create new workflow instances of the `CustomerWorkflow` by calling the `/customers` endpoint of the `orchestrator` application.
35+
36+
```bash
37+
curl -X POST localhost:8080/customers -H 'Content-Type: application/json' -d '{ "customerName": "salaboy" }'
38+
```
39+
40+
The workflow definition [`CustomerWorkflow`](orchstrator/src/main/java/io/dapr/springboot/examples/orchestrator/CustomerWorkflow.java) that you can find inside the `orchestrator` app,
41+
performs the following orchestration when a new workflow instance is created:
42+
43+
- Call the `RegisterCustomerActivity` activity which can be found inside the `worker-one` application.
44+
- You can find in the workflow definition the configuration to make reference to an Activity that is hosted by a different Dapr application.
45+
```
46+
customer = ctx.callActivity("io.dapr.springboot.examples.workerone.RegisterCustomerActivity",
47+
customer,
48+
new WorkflowTaskOptions("worker-one"),
49+
Customer.class).
50+
await();
51+
```
52+
- Wait for an external event of type `CustomerReachOut` with a timeout of 5 minutes:
53+
```
54+
ctx.waitForExternalEvent("CustomerReachOut", Duration.ofMinutes(5), Customer.class).await();
55+
```
56+
You can call the following endpoint on the `orchestrator` app to raise the external event:
57+
```
58+
curl -X POST localhost:8080/customers/followup -H 'Content-Type: application/json' -d '{ "customerName": "salaboy" }'
59+
```
60+
- When the event is received, the workflow move forward to the last activity called `CustomerFollowUpActivity`, that can be found on the `worker-two` app.
61+
```
62+
customer = ctx.callActivity("io.dapr.springboot.examples.workertwo.CustomerFollowupActivity",
63+
customer,
64+
new WorkflowTaskOptions("worker-two"),
65+
Customer.class).
66+
await();
67+
```
68+
- The workflow completes by handing out the final version of the `Customer` object that has been modified the workflow activities. You can retrieve the `Customer` payload
69+
by running the following command:
70+
```
71+
curl -X POST localhost:8080/customers/output -H 'Content-Type: application/json' -d '{ "customerName": "salaboy" }'
72+
```
73+
74+
## Testing remote activities
75+
76+
77+

spring-boot-examples/workflows/remote-activities/orchestrator/src/main/java/io/dapr/springboot/examples/orchestrator/CustomerWorkflow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.dapr.springboot.examples.orchestrator;
1515

16+
import io.dapr.durabletask.TaskCanceledException;
17+
import io.dapr.durabletask.TaskFailedException;
1618
import io.dapr.workflows.Workflow;
1719
import io.dapr.workflows.WorkflowStub;
1820
import io.dapr.workflows.WorkflowTaskOptions;
@@ -32,7 +34,7 @@ public WorkflowStub create() {
3234
ctx.getLogger().info("Let's register the customer: {}", customer.getCustomerName());
3335

3436
customer = ctx.callActivity("io.dapr.springboot.examples.workerone.RegisterCustomerActivity", customer,
35-
new WorkflowTaskOptions("worker-one"), Customer.class).await();
37+
new WorkflowTaskOptions("worker-one"), Customer.class).await();
3638

3739
ctx.getLogger().info("Let's wait for the customer: {} to request a follow up.", customer.getCustomerName());
3840
ctx.waitForExternalEvent("CustomerReachOut", Duration.ofMinutes(5), Customer.class).await();

spring-boot-examples/workflows/remote-activities/orchestrator/src/main/java/io/dapr/springboot/examples/orchestrator/CustomersRestController.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import io.dapr.spring.workflows.config.EnableDaprWorkflows;
1717
import io.dapr.workflows.client.DaprWorkflowClient;
18+
import io.dapr.workflows.client.WorkflowInstanceStatus;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021
import org.springframework.beans.factory.annotation.Autowired;
@@ -73,6 +74,24 @@ public String customerNotification(@RequestBody Customer customer) {
7374
}
7475
}
7576

77+
/**
78+
* Request customer output.
79+
* @param customer associated with a workflow instance
80+
* @return Customer status after the workflow execution finished
81+
*/
82+
@PostMapping("/customers/output")
83+
public Customer getCustomerOutput(@RequestBody Customer customer) {
84+
logger.info("Customer output requested: {}", customer.getCustomerName());
85+
String workflowIdForCustomer = customersWorkflows.get(customer.getCustomerName());
86+
if (workflowIdForCustomer == null || workflowIdForCustomer.isEmpty()) {
87+
return null;
88+
} else {
89+
WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(workflowIdForCustomer, true);
90+
assert instanceState != null;
91+
return instanceState.readOutputAs(Customer.class);
92+
}
93+
}
94+
7695

7796

7897
}

spring-boot-examples/workflows/remote-activities/orchestrator/src/test/java/io/dapr/springboot/examples/orchestrator/DaprTestContainersConfig.java

Lines changed: 76 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
package io.dapr.springboot.examples.orchestrator;
1515

1616
import com.redis.testcontainers.RedisContainer;
17-
import io.dapr.testcontainers.Component;
18-
import io.dapr.testcontainers.DaprContainer;
17+
import io.dapr.testcontainers.*;
1918
import org.junit.runner.Description;
2019
import org.junit.runners.model.Statement;
2120
import org.springframework.beans.factory.annotation.Qualifier;
22-
import org.springframework.boot.test.context.SpringBootTest;
21+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2322
import org.springframework.boot.test.context.TestConfiguration;
2423
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
2524
import org.springframework.context.annotation.Bean;
@@ -28,13 +27,15 @@
2827
import org.testcontainers.containers.GenericContainer;
2928
import org.testcontainers.containers.Network;
3029
import org.testcontainers.containers.wait.strategy.Wait;
30+
import org.testcontainers.shaded.org.checkerframework.checker.nullness.qual.Nullable;
31+
import org.testcontainers.utility.DockerImageName;
3132
import org.testcontainers.utility.MountableFile;
3233

3334
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
3637

37-
import static io.dapr.testcontainers.DaprContainerConstants.DAPR_RUNTIME_IMAGE_TAG;
38+
import static io.dapr.testcontainers.DaprContainerConstants.*;
3839

3940
@TestConfiguration(proxyBeanMethods = false)
4041
public class DaprTestContainersConfig {
@@ -81,74 +82,111 @@ private Map<String, String> getRedisProps(){
8182
return redisProps;
8283
}
8384

85+
@Bean
86+
public DaprPlacementContainer placementContainer(Network daprNetwork, Environment env){
87+
boolean reuse = env.getProperty("reuse", Boolean.class, false);
88+
return new DaprPlacementContainer(DockerImageName.parse(DAPR_PLACEMENT_IMAGE_TAG))
89+
.withNetwork(daprNetwork)
90+
.withReuse(reuse)
91+
.withNetworkAliases("placement");
92+
}
93+
94+
@Bean
95+
public DaprSchedulerContainer schedulerContainer(Network daprNetwork, Environment env){
96+
boolean reuse = env.getProperty("reuse", Boolean.class, false);
97+
return new DaprSchedulerContainer(DockerImageName.parse(DAPR_SCHEDULER_IMAGE_TAG))
98+
.withNetwork(daprNetwork)
99+
.withReuse(reuse)
100+
.withNetworkAliases("scheduler");
101+
}
102+
84103
@Bean("workerOneDapr")
104+
@ConditionalOnProperty(prefix = "tests", name = "workers.enabled", havingValue = "true")
85105
public DaprContainer workerOneDapr(Network daprNetwork, RedisContainer redisContainer, Environment env,
86-
@Qualifier("daprContainer") DaprContainer orchestratorDaprContainer) {
106+
DaprPlacementContainer daprPlacementContainer,
107+
DaprSchedulerContainer daprSchedulerContainer) {
87108
boolean reuse = env.getProperty("reuse", Boolean.class, false);
88-
89109
return new DaprContainer(DAPR_RUNTIME_IMAGE_TAG)
90110
.withAppName("worker-one")
91-
.withNetworkAliases("worker-one-dapr")
111+
.withNetworkAliases("worker-one")
92112
.withNetwork(daprNetwork)
93-
.withReusablePlacement(reuse)
94-
.withReusableScheduler(reuse)
113+
.withPlacementContainer(daprPlacementContainer)
114+
.withSchedulerContainer(daprSchedulerContainer)
95115
.withComponent(new Component("kvstore", "state.redis", "v1", getRedisProps()))
96116
// .withDaprLogLevel(DaprLogLevel.DEBUG)
97117
// .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
98-
.withAppPort(8081)
99-
.withAppHealthCheckPath("/actuator/health")
100-
.withAppChannelAddress("host.testcontainers.internal")
101-
.dependsOn(orchestratorDaprContainer)
118+
// .withAppPort(8081)
119+
// .withAppHealthCheckPath("/actuator/health")
120+
// .withAppChannelAddress("host.testcontainers.internal")
121+
.dependsOn(daprPlacementContainer)
122+
.dependsOn(daprSchedulerContainer)
102123
.dependsOn(redisContainer);
103124
}
104125
@Bean
105-
public GenericContainer<?> workerOneContainer(Network daprNetwork, @Qualifier("workerOneDapr") DaprContainer workerOneDapr){
126+
@ConditionalOnProperty(prefix = "tests", name = "workers.enabled", havingValue = "true")
127+
public GenericContainer<?> workerOneContainer(Network daprNetwork,
128+
@Qualifier("workerOneDapr") DaprContainer workerOneDapr,
129+
DaprPlacementContainer daprPlacementContainer,
130+
DaprSchedulerContainer daprSchedulerContainer){
106131
return new GenericContainer<>("openjdk:17-jdk-slim")
107132
.withCopyFileToContainer(MountableFile.forHostPath("../worker-one/target"), "/app")
108133
.withWorkingDirectory("/app")
109134
.withCommand("java",
110-
"-Ddapr.grpc.endpoint=worker-one-dapr:50001",
111-
"-Ddapr.http.endpoint=worker-one-dapr:3500",
135+
"-Ddapr.grpc.endpoint=worker-one:50001",
136+
"-Ddapr.http.endpoint=worker-one:3500",
112137
"-jar",
113138
"worker-one-1.17.0-SNAPSHOT.jar")
114139
.withNetwork(daprNetwork)
115140
.dependsOn(workerOneDapr)
141+
.dependsOn(daprPlacementContainer)
142+
.dependsOn(daprSchedulerContainer)
116143
.waitingFor(Wait.forLogMessage(".*Started WorkerOneApplication.*", 1))
117144
.withLogConsumer(outputFrame -> System.out.println("WorkerOneApplication: " + outputFrame.getUtf8String()));
118145
}
119146

120147
@Bean("workerTwoDapr")
121-
public DaprContainer workerTwoDapr(Network daprNetwork, RedisContainer redisContainer, Environment env,
122-
@Qualifier("daprContainer") DaprContainer orchestratorDaprContainer) {
148+
@ConditionalOnProperty(prefix = "tests", name = "workers.enabled", havingValue = "true")
149+
public DaprContainer workerTwoDapr(Network daprNetwork, RedisContainer redisContainer,
150+
Environment env,
151+
DaprPlacementContainer daprPlacementContainer,
152+
DaprSchedulerContainer daprSchedulerContainer) {
123153
boolean reuse = env.getProperty("reuse", Boolean.class, false);
124154

125155
return new DaprContainer(DAPR_RUNTIME_IMAGE_TAG)
126156
.withAppName("worker-two")
127-
.withNetworkAliases("worker-two-dapr")
157+
.withNetworkAliases("worker-two")
128158
.withNetwork(daprNetwork)
129-
.withReusablePlacement(reuse)
130-
.withReusableScheduler(reuse)
159+
.withPlacementContainer(daprPlacementContainer)
160+
.withSchedulerContainer(daprSchedulerContainer)
131161
.withComponent(new Component("kvstore", "state.redis", "v1", getRedisProps()))
132162
// .withDaprLogLevel(DaprLogLevel.DEBUG)
133163
// .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
134-
.withAppPort(8082)
135-
.withAppHealthCheckPath("/actuator/health")
136-
.withAppChannelAddress("host.testcontainers.internal")
137-
.dependsOn(orchestratorDaprContainer)
164+
// .withAppPort(8082)
165+
// .withAppHealthCheckPath("/actuator/health")
166+
// .withAppChannelAddress("host.testcontainers.internal")
167+
.dependsOn(daprPlacementContainer)
168+
.dependsOn(daprSchedulerContainer)
138169
.dependsOn(redisContainer);
139170
}
171+
140172
@Bean
141-
public GenericContainer<?> workerTwoContainer(Network daprNetwork, @Qualifier("workerTwoDapr") DaprContainer workerTwoDapr){
173+
@ConditionalOnProperty(prefix = "tests", name = "workers.enabled", havingValue = "true")
174+
public GenericContainer<?> workerTwoContainer(Network daprNetwork,
175+
@Qualifier("workerTwoDapr") DaprContainer workerTwoDapr,
176+
DaprPlacementContainer daprPlacementContainer,
177+
DaprSchedulerContainer daprSchedulerContainer){
142178
return new GenericContainer<>("openjdk:17-jdk-slim")
143179
.withCopyFileToContainer(MountableFile.forHostPath("../worker-two/target"), "/app")
144180
.withWorkingDirectory("/app")
145181
.withCommand("java",
146-
"-Ddapr.grpc.endpoint=worker-two-dapr:50001",
147-
"-Ddapr.http.endpoint=worker-two-dapr:3500",
182+
"-Ddapr.grpc.endpoint=worker-two:50001",
183+
"-Ddapr.http.endpoint=worker-two:3500",
148184
"-jar",
149185
"worker-two-1.17.0-SNAPSHOT.jar")
150186
.withNetwork(daprNetwork)
151187
.dependsOn(workerTwoDapr)
188+
.dependsOn(daprPlacementContainer)
189+
.dependsOn(daprSchedulerContainer)
152190
.waitingFor(Wait.forLogMessage(".*Started WorkerTwoApplication.*", 1))
153191
.withLogConsumer(outputFrame -> System.out.println("WorkerTwoApplication: " + outputFrame.getUtf8String()));
154192
}
@@ -165,20 +203,24 @@ public RedisContainer redisContainer(Network daprNetwork, Environment env){
165203

166204
@Bean
167205
@ServiceConnection
168-
public DaprContainer daprContainer(Network daprNetwork, RedisContainer redisContainer, Environment env) {
169-
boolean reuse = env.getProperty("reuse", Boolean.class, false);
206+
public DaprContainer daprContainer(Network daprNetwork, RedisContainer redisContainer,
207+
Environment env,
208+
DaprPlacementContainer daprPlacementContainer,
209+
DaprSchedulerContainer daprSchedulerContainer) {
170210

171211
return new DaprContainer(DAPR_RUNTIME_IMAGE_TAG)
172212
.withAppName("orchestrator")
173213
.withNetwork(daprNetwork)
174-
// .withReusablePlacement(reuse)
175-
// .withReusableScheduler(reuse)
214+
.withPlacementContainer(daprPlacementContainer)
215+
.withSchedulerContainer(daprSchedulerContainer)
176216
.withComponent(new Component("kvstore", "state.redis", "v1", getRedisProps()))
177-
// .withDaprLogLevel(DaprLogLevel.DEBUG)
178-
// .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
217+
//.withDaprLogLevel(DaprLogLevel.DEBUG)
218+
//.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
179219
.withAppPort(8080)
180220
.withAppHealthCheckPath("/actuator/health")
181221
.withAppChannelAddress("host.testcontainers.internal")
222+
.dependsOn(daprPlacementContainer)
223+
.dependsOn(daprSchedulerContainer)
182224
.dependsOn(redisContainer);
183225
}
184226

0 commit comments

Comments
 (0)