Dapr Workflow Python SDK 入门

如何使用 Dapr Python SDK 快速上手工作流

让我们创建一个 Dapr 工作流并通过控制台调用它。借助提供的工作流示例,你将:

  • 运行一个 Python 控制台应用程序,该程序演示包含活动、子工作流和外部事件的工作流编排
  • 了解如何处理重试、超时以及工作流状态管理
  • 使用 Python 工作流 SDK 来启动、暂停、恢复和清理工作流实例

本示例使用自托管模式下通过 dapr init 初始化的默认配置。

在 Python 示例项目中,simple.py 文件包含应用程序的设置,包括:

  • 工作流定义
  • 工作流活动定义
  • 工作流和工作流活动的注册

前置条件

设置环境

首先克隆 [Python SDK 仓库]。

git clone https://github.com/dapr/python-sdk.git

从 Python SDK 根目录导航到 Dapr Workflow 示例。

cd examples/workflow

运行以下命令,安装使用 Dapr Python SDK 运行此工作流示例所需的所有依赖。

pip3 install -r workflow/requirements.txt

在本地运行应用程序

要运行 Dapr 应用程序,你需要启动 Python 程序和一个 Dapr 边车。在终端中运行:

dapr run --app-id wf-simple-example --dapr-grpc-port 50001 --resources-path components -- python3 simple.py

注意: 由于 Windows 上未定义 Python3.exe,你可能需要使用 python simple.py 而不是 python3 simple.py

预期输出

- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: Suspended"
- "== APP == Get response from hello_world_wf after resume call: Running"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: "Completed"

发生了什么?

当你运行应用程序时,会演示几个关键的工作流功能:

  1. 工作流和活动注册:应用程序使用 Python 装饰器自动向运行时注册工作流和活动。这种基于装饰器的方法提供了一种简洁、声明式的方式来定义你的工作流组件:

    @wfr.workflow(name='hello_world_wf')
    def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
        # Workflow definition...
    
    @wfr.activity(name='hello_act')
    def hello_act(ctx: WorkflowActivityContext, wf_input):
        # Activity definition...
    
  2. 运行时设置:应用程序初始化工作流运行时和客户端:

    wfr = WorkflowRuntime()
    wfr.start()
    wf_client = DaprWorkflowClient()
    
  3. 活动执行:工作流执行一系列活动来递增计数器:

    @wfr.workflow(name='hello_world_wf')
    def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
        yield ctx.call_activity(hello_act, input=1)
        yield ctx.call_activity(hello_act, input=10)
    
  4. 重试逻辑:工作流演示了使用重试策略进行错误处理:

    retry_policy = RetryPolicy(
        first_retry_interval=timedelta(seconds=1),
        max_number_of_attempts=3,
        backoff_coefficient=2,
        max_retry_interval=timedelta(seconds=10),
        retry_timeout=timedelta(seconds=100),
    )
    yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
    
  5. 子工作流:子工作流使用自己的重试策略执行:

    yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
    
  6. 外部事件处理:工作流等待一个带有超时的外部事件:

    event = ctx.wait_for_external_event(event_name)
    timeout = ctx.create_timer(timedelta(seconds=30))
    winner = yield when_any([event, timeout])
    
  7. 工作流生命周期管理:示例演示如何暂停和恢复工作流:

    wf_client.pause_workflow(instance_id=instance_id)
    metadata = wf_client.get_workflow_state(instance_id=instance_id)
    # ... check status ...
    wf_client.resume_workflow(instance_id=instance_id)
    
  8. 事件触发:恢复后,工作流触发一个事件:

    wf_client.raise_workflow_event(
        instance_id=instance_id,
        event_name=event_name,
        data=event_data
    )
    
  9. 完成与清理:最后,工作流等待完成并进行清理:

    state = wf_client.wait_for_workflow_completion(
        instance_id,
        timeout_in_seconds=30
    )
    wf_client.purge_workflow(instance_id=instance_id)
    

后续步骤