Skip to content

Commit a7ee51c

Browse files
Merge pull request #1191 from marcduiker/add-workflow-tutorials-python
Workflow tutorials for Python
2 parents 91ef095 + d13e2fb commit a7ee51c

File tree

80 files changed

+2326
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+2326
-1
lines changed

tutorials/workflow/csharp/workflow-management/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ For more information on workflow management, see the [Dapr docs](https://docs.da
1313

1414
## Inspect the code
1515

16-
Open the `Program.cs` file in the `tutorials/workflow/csharp/child-workflows/WorkflowManagement` folder. This file contains the endpoint definitions that use the workflow management API. The workflow that is being managed is named `NeverEndingWorkflow` and is a counter that will keep running once it's started.
16+
Open the `Program.cs` file in the `tutorials/workflow/csharp/workflow-management/WorkflowManagement` folder. This file contains the endpoint definitions that use the workflow management API. The workflow that is being managed is named `NeverEndingWorkflow` and is a counter that will keep running once it's started.
1717

1818
## Run the tutorial
1919

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Using the Dapr Workflow API with Python
2+
3+
This folder contains tutorials of using the Dapr Workflow API with Python. All examples can be run locally on your machine.
4+
5+
Before you start, it's recommended to read though the Dapr docs to get familiar with the many [Workflow features, concepts, and patterns](https://docs.dapr.io/developing-applications/building-blocks/workflow/).
6+
7+
## Prerequisites
8+
9+
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)
10+
- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/) & [Initialization](https://docs.dapr.io/getting-started/install-dapr-selfhost/)
11+
- [Python 3](https://www.python.org/downloads/)
12+
- Optional: An IDE such as [VSCode](https://code.visualstudio.com/download) with a [REST client](https://marketplace.visualstudio.com/items?itemName=humao.rest-client).
13+
14+
## Tutorials
15+
16+
- [Workflow Basics](./fundamentals/README.md)
17+
- [Task Chaining](./task-chaining/README.md)
18+
- [Fan-out/Fan-in](./fan-out-fan-in/README.md)
19+
- [Monitor](./monitor-pattern/README.md)
20+
- [External Events](./external-system-interaction/README.md)
21+
- [Child Workflows](./child-workflows/README.md)
22+
- [Resiliency & Compensation](./resiliency-and-compensation/README.md)
23+
- [Combined Patterns](./combined-patterns/README.md)
24+
- [WorkflowManagement](./workflow-management/README.md)
25+
- [Challenges & Tips](./challenges-tips/README.md)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Workflow Challenges & Tips
2+
3+
Workflow systems are very powerful tools but also have their challenges & limitations as described in the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#limitations).
4+
5+
This section provides some tips with code snippets to understand the limitations and get the most out of the Dapr Workflow API. Read through the following examples to learn best practices to develop Dapr workflows.
6+
7+
- [Deterministic workflows](deterministic_workflow.py)
8+
- [Idempotent activities](idempotent_activity.py)
9+
- [Versioning workflows](versioning_workflow.py)
10+
- [Workflow & activity payload size](payload_size_workflow.py)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import dapr.ext.workflow as wf
2+
import datetime
3+
import uuid
4+
5+
wf_runtime = wf.WorkflowRuntime()
6+
7+
@wf_runtime.workflow(name='non_deterministic_workflow')
8+
def non_deterministic_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
9+
10+
"""
11+
Do not use non-deterministic operations in a workflow!
12+
These operations will create a new value every time the
13+
workflow is replayed.
14+
"""
15+
order_id = str(uuid.uuid4())
16+
order_date = datetime.now()
17+
yield ctx.call_activity(submit_id, input=order_id)
18+
yield ctx.call_activity(submit_date, input=order_date)
19+
20+
return order_id
21+
22+
23+
@wf_runtime.workflow(name='deterministic_workflow')
24+
def deterministic_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
25+
26+
"""
27+
Either wrap non-deterministic operations in an activity. Or use deterministic
28+
alternatives on the DaprWorkflowContext instead. These operations create the
29+
same value when the workflow is replayed.
30+
"""
31+
order_id = yield ctx.call_activity(create_order_id, input=wf_input)
32+
order_date = ctx.current_utc_datetime
33+
yield ctx.call_activity(submit_id, input=order_id)
34+
yield ctx.call_activity(submit_date, input=order_date)
35+
36+
return order_id
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import dapr.ext.workflow as wf
2+
3+
wf_runtime = wf.WorkflowRuntime()
4+
5+
@wf_runtime.activity(name='idempotent_activity')
6+
def idempotent_activity(ctx: wf.WorkflowActivityContext, order_item: Order) -> bool:
7+
"""
8+
Beware of non-idempotent operations in an activity.
9+
Dapr Workflow guarantees at-least-once execution of activities, so activities might be executed more than once
10+
in case an activity is not ran to completion successfully.
11+
For instance, can you insert a record to a database twice without side effects?
12+
var insertSql = $"INSERT INTO Orders (Id, Description, UnitPrice, Quantity) VALUES ('{order_item.id}', '{order_item.description}', {order_item.unit_price}, {order_item.quantity})";
13+
It's best to perform a check if an record already exists before inserting it.
14+
"""
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import dapr.ext.workflow as wf
2+
3+
wf_runtime = wf.WorkflowRuntime()
4+
5+
@wf_runtime.workflow(name='large_payload_size_workflow')
6+
def large_payload_size_workflow(ctx: wf.DaprWorkflowContext, doc_id: str):
7+
"""
8+
Do not pass large payloads between activities.
9+
They are stored in the Dapr state store twice, one as output argument
10+
for GetDocument, and once as input argument for UpdateDocument.
11+
"""
12+
document = yield ctx.call_activity(get_document, input=doc_id)
13+
updated_document = yield ctx.call_activity(update_document, input=document)
14+
15+
# More activities to process the updated document
16+
17+
return updated_document
18+
19+
@wf_runtime.workflow(name='small_payload_size_workflow')
20+
def small_payload_size_workflow(ctx: wf.DaprWorkflowContext, doc_id: str):
21+
"""
22+
Do pass small payloads between activities, preferably IDs only, or objects that are quick to (de)serialize in large volumes.
23+
Combine multiple actions, such as document retrieval and update, into a single activity, or use the Dapr State Store API to store more data.
24+
"""
25+
updated_doc_id = yield ctx.call_activity(get_and_update_document, input=doc_id)
26+
27+
# More activities to process the updated document
28+
29+
return updated_doc_id
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import dapr.ext.workflow as wf
2+
3+
wf_runtime = wf.WorkflowRuntime()
4+
5+
"""
6+
This is the initial version of the workflow.
7+
Note that the input argument for both activities is the orderItem (string).
8+
"""
9+
@wf_runtime.workflow(name='versioning_workflow_1')
10+
def versioning_workflow_1(ctx: wf.DaprWorkflowContext, order_item: str):
11+
result_a = yield ctx.call_activity(activity_a, input=order_item)
12+
result_b = yield ctx.call_activity(activity_b, input=order_item)
13+
14+
return result_a + result_b
15+
16+
@wf_runtime.activity(name='activity_a')
17+
def activity_a(ctx: wf.WorkflowActivityContext, order_item: str) -> int:
18+
"""
19+
This activity processes the order item and returns an integer result.
20+
"""
21+
print(f'activity_a: Received input: {order_item}.', flush=True)
22+
return 10
23+
24+
@wf_runtime.activity(name='activity_b')
25+
def activity_b(ctx: wf.WorkflowActivityContext, order_item: str) -> int:
26+
"""
27+
This activity processes the order item and returns another integer result.
28+
"""
29+
print(f'activity_b: Received input: {order_item}.', flush=True)
30+
return 20
31+
32+
"""
33+
This is the updated version of the workflow.
34+
The input for activity_b has changed from order_item (string) to result_a (int).
35+
If there are in-flight workflow instances that were started with the previous version
36+
of this workflow, these will fail when the new version of the workflow is deployed
37+
and the workflow name remains the same, since the runtime parameters do not match with the persisted state.
38+
It is recommended to version workflows by creating a new workflow class with a new name:
39+
{workflowname}_1 -> {workflowname}_2
40+
Try to avoid making breaking changes in perpetual workflows (that use the `continue_as_new` method)
41+
since these are difficult to replace with a new version.
42+
"""
43+
@wf_runtime.workflow(name='versioning_workflow_2')
44+
def versioning_workflow_2(ctx: wf.DaprWorkflowContext, order_item: str):
45+
result_a = yield ctx.call_activity(activity_a, input=order_item)
46+
result_b = yield ctx.call_activity(activity_b, input=result_a)
47+
48+
return result_a + result_b
49+
50+
@wf_runtime.activity(name='activity_a')
51+
def activity_a(ctx: wf.WorkflowActivityContext, order_item: str) -> int:
52+
"""
53+
This activity processes the order item and returns an integer result.
54+
"""
55+
print(f'activity_a: Received input: {order_item}.', flush=True)
56+
return 10
57+
58+
@wf_runtime.activity(name='activity_b')
59+
def activity_b(ctx: wf.WorkflowActivityContext, number: int) -> int:
60+
"""
61+
This activity processes a number and returns another integer result.
62+
"""
63+
print(f'activity_b: Received input: {number}.', flush=True)
64+
return number + 10
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# Child Workflows
2+
3+
This tutorial demonstrates how a workflow can call child workflows that are part of the same application. Child workflows can be used to break up large workflows into smaller, reusable parts. For more information about child workflows see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#child-workflows).
4+
5+
## Inspect the code
6+
7+
Open the `parent_child_workflow.py` file in the `tutorials/workflow/python/child-workflows/child_workflows` folder. This file contains the definition for the workflows and activities.
8+
9+
The parent workflow iterates over the input array and schedules an instance of the `child_workflow` for each of the input elements. The `child_workflow` is a basic task-chaining workflow that contains a sequence of two activities. When all of the instances of the `child_workflow` complete, then the `parent_workflow` finishes.
10+
11+
### Parent workflow
12+
13+
```mermaid
14+
graph LR
15+
SW((Start
16+
Workflow))
17+
subgraph for each word in the input
18+
GWL[Call child workflow]
19+
end
20+
ALL[Wait until all tasks
21+
are completed]
22+
EW((End
23+
Workflow))
24+
SW --> GWL
25+
GWL --> ALL
26+
ALL --> EW
27+
```
28+
29+
### Child workflow
30+
31+
```mermaid
32+
graph LR
33+
SW((Start
34+
Workflow))
35+
A1[activity1]
36+
A2[activity2]
37+
EW((End
38+
Workflow))
39+
SW --> A1
40+
A1 --> A2
41+
A2 --> EW
42+
```
43+
44+
## Run the tutorial
45+
46+
1. Use a terminal to navigate to the `tutorials/workflow/python/child-workflows/child_workflows` folder.
47+
2. Install the dependencies using pip:
48+
49+
```bash
50+
pip3 install -r requirements.txt
51+
```
52+
53+
3. Navigate one level back to the `child-workflows` folder and use the Dapr CLI to run the Dapr Multi-App run file
54+
55+
<!-- STEP
56+
name: Run multi app run template
57+
expected_stdout_lines:
58+
- 'Started Dapr with app id "childworkflows"'
59+
expected_stderr_lines:
60+
working_dir: .
61+
output_match_mode: substring
62+
background: true
63+
sleep: 15
64+
timeout_seconds: 30
65+
-->
66+
```bash
67+
dapr run -f .
68+
```
69+
<!-- END_STEP -->
70+
71+
4. Use the POST request in the [`childworkflows.http`](./childworkflows.http) file to start the workflow, or use this cURL command:
72+
73+
```bash
74+
curl -i --request POST \
75+
--url http://localhost:5259/start \
76+
--header 'content-type: application/json' \
77+
--data '["Item 1","Item 2"]'
78+
```
79+
80+
The input of the workflow is an array with two strings:
81+
82+
```json
83+
[
84+
"Item 1",
85+
"Item 2"
86+
]
87+
```
88+
89+
The app logs should show both the items in the input values array being processed by each activity in the child workflow as follows:
90+
91+
```text
92+
== APP - childworkflows == activity1: Received input: Item 1.
93+
== APP - childworkflows == activity2: Received input: Item 1 is processed.
94+
== APP - childworkflows == activity1: Received input: Item 2.
95+
== APP - childworkflows == activity2: Received input: Item 2 is processed.
96+
```
97+
98+
5. Use the GET request in the [`childworkflows.http`](./childworkflows.http) file to get the status of the workflow, or use this cURL command:
99+
100+
```bash
101+
curl --request GET --url http://localhost:3559/v1.0/workflows/dapr/<INSTANCEID>
102+
```
103+
104+
Where `<INSTANCEID>` is the workflow instance ID you received in the `instance_id` property in the previous step.
105+
106+
The expected serialized output of the workflow is an array with two strings:
107+
108+
```txt
109+
"[\"Item 1 is processed as a child workflow.\",\"Item 2 is processed as a child workflow.\"]"
110+
```
111+
112+
6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from fastapi import FastAPI, status
2+
from contextlib import asynccontextmanager
3+
from typing import List
4+
from parent_child_workflow import wf_runtime, parent_workflow
5+
import dapr.ext.workflow as wf
6+
import uvicorn
7+
8+
@asynccontextmanager
9+
async def lifespan(app: FastAPI):
10+
wf_runtime.start()
11+
yield
12+
wf_runtime.shutdown()
13+
14+
app = FastAPI(lifespan=lifespan)
15+
16+
@app.post("/start", status_code=status.HTTP_202_ACCEPTED)
17+
async def start_workflow(items: List[str]):
18+
wf_client = wf.DaprWorkflowClient()
19+
instance_id = wf_client.schedule_new_workflow(
20+
workflow=parent_workflow,
21+
input=items
22+
)
23+
return {"instance_id": instance_id}
24+
25+
if __name__ == "__main__":
26+
uvicorn.run(app, host="0.0.0.0", port=5259)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from typing import List
2+
import dapr.ext.workflow as wf
3+
4+
wf_runtime = wf.WorkflowRuntime()
5+
6+
@wf_runtime.workflow(name='parent_workflow')
7+
def parent_workflow(ctx: wf.DaprWorkflowContext, items: List[str]):
8+
9+
child_wf_tasks = [
10+
ctx.call_child_workflow(child_workflow, input=item) for item in items
11+
]
12+
wf_result = yield wf.when_all(child_wf_tasks)
13+
14+
return wf_result
15+
16+
@wf_runtime.workflow(name='child_workflow')
17+
def child_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
18+
result1 = yield ctx.call_activity(activity1, input=wf_input)
19+
wf_result = yield ctx.call_activity(activity2, input=result1)
20+
return wf_result
21+
22+
@wf_runtime.activity(name='activity1')
23+
def activity1(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
24+
print(f'activity1: Received input: {act_input}.', flush=True)
25+
return f"{act_input} is processed"
26+
27+
@wf_runtime.activity(name='activity2')
28+
def activity2(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
29+
print(f'activity2: Received input: {act_input}.', flush=True)
30+
return f"{act_input} as a child workflow."

0 commit comments

Comments
 (0)