Dapr 工作流
Dapr Python SDK 提供了一个内置的 Dapr 工作流扩展 dapr.ext.workflow,用于创建 Dapr 服务。
安装
您可以通过以下命令下载并安装 Dapr 工作流扩展:
pip install dapr-ext-workflow
Note
开发版包将包含与 Dapr 运行时预发布版本兼容的功能和行为。在安装 <code>dapr-dev</code> 包之前,请确保已卸载任何稳定版本的 Python SDK 扩展。
pip install dapr-ext-workflow-dev
示例
from time import sleep
import dapr.ext.workflow as wf
wfr = wf.WorkflowRuntime()
@wfr.workflow(name='random_workflow')
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2]
@wfr.activity(name='step1')
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1
@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2
@wfr.activity
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work
if __name__ == '__main__':
wfr.start()
sleep(10) # wait for workflow runtime to start
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')
wfr.shutdown()
- 了解有关编写和管理工作流的更多信息:
- 访问 Python SDK 示例 获取代码示例和尝试 Dapr 工作流的说明: