Getting started with the Dapr Workflow Python SDK
Let’s create a Dapr workflow and invoke it using the console. With the provided workflow example, you will:
- Run a Python console application that demonstrates workflow orchestration with activities, child workflows, and external events
- Learn how to handle retries, timeouts, and workflow state management
- Use the Python workflow SDK to start, pause, resume, and purge workflow instances
This example uses the default configuration from dapr init
in self-hosted mode.
In the Python example project, the simple.py
file contains the setup of the app, including:
- The workflow definition
- The workflow activity definitions
- The registration of the workflow and workflow activities
Prerequisites
- Dapr CLI installed
- Initialized Dapr environment
- Python 3.9+ installed
- Dapr Python package and the workflow extension installed
- Verify you’re using the latest proto bindings
Set up the environment
Start by cloning the [Python SDK repo].
git clone https://github.com/dapr/python-sdk.git
From the Python SDK root directory, navigate to the Dapr Workflow example.
cd examples/workflow
Run the following command to install the requirements for running this workflow sample with the Dapr Python SDK.
pip3 install -r workflow/requirements.txt
Run the application locally
To run the Dapr application, you need to start the Python program and a Dapr sidecar. In the terminal, run:
dapr run --app-id wf-simple-example --dapr-grpc-port 50001 --resources-path components -- python3 simple.py
Note: Since Python3.exe is not defined in Windows, you may need to use
python simple.py
instead ofpython3 simple.py
.
Expected output
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: Suspended"
- "== APP == Get response from hello_world_wf after resume call: Running"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: "Completed"
What happened?
When you run the application, several key workflow features are shown:
-
Workflow and Activity Registration: The application uses Python decorators to automatically register workflows and activities with the runtime. This decorator-based approach provides a clean, declarative way to define your workflow components:
@wfr.workflow(name='hello_world_wf') def hello_world_wf(ctx: DaprWorkflowContext, wf_input): # Workflow definition... @wfr.activity(name='hello_act') def hello_act(ctx: WorkflowActivityContext, wf_input): # Activity definition...
-
Runtime Setup: The application initializes the workflow runtime and client:
wfr = WorkflowRuntime() wfr.start() wf_client = DaprWorkflowClient()
-
Activity Execution: The workflow executes a series of activities that increment a counter:
@wfr.workflow(name='hello_world_wf') def hello_world_wf(ctx: DaprWorkflowContext, wf_input): yield ctx.call_activity(hello_act, input=1) yield ctx.call_activity(hello_act, input=10)
-
Retry Logic: The workflow demonstrates error handling with a retry policy:
retry_policy = RetryPolicy( first_retry_interval=timedelta(seconds=1), max_number_of_attempts=3, backoff_coefficient=2, max_retry_interval=timedelta(seconds=10), retry_timeout=timedelta(seconds=100), ) yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
-
Child Workflow: A child workflow is executed with its own retry policy:
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
-
External Event Handling: The workflow waits for an external event with a timeout:
event = ctx.wait_for_external_event(event_name) timeout = ctx.create_timer(timedelta(seconds=30)) winner = yield when_any([event, timeout])
-
Workflow Lifecycle Management: The example demonstrates how to pause and resume the workflow:
wf_client.pause_workflow(instance_id=instance_id) metadata = wf_client.get_workflow_state(instance_id=instance_id) # ... check status ... wf_client.resume_workflow(instance_id=instance_id)
-
Event Raising: After resuming, the workflow raises an event:
wf_client.raise_workflow_event( instance_id=instance_id, event_name=event_name, data=event_data )
-
Completion and Cleanup: Finally, the workflow waits for completion and cleans up:
state = wf_client.wait_for_workflow_completion( instance_id, timeout_in_seconds=30 ) wf_client.purge_workflow(instance_id=instance_id)
Next steps
- Learn more about Dapr workflow
- Workflow API reference
- Try implementing more complex workflow patterns
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.