操作指南:管理工作流

管理和运行工作流

现在您已经在应用程序中编写了工作流及其活动,可以使用 CLI 或 API 调用来启动、终止、重运行和获取工作流信息。

使用 Dapr CLI 管理工作流

Dapr CLI 提供了在自托管和 Kubernetes 环境中管理工作流实例的命令。

另请参阅工作流保留策略了解如何为已完成的工作流配置保留策略。

基本工作流操作

启动工作流

# 使用 `orderprocessing` 应用程序,启动一个新的工作流实例并传入输入数据
dapr workflow run OrderProcessingWorkflow \
  --app-id orderprocessing \
  --input '{"orderId": "12345", "amount": 100.50}'

# 使用特定实例 ID 启动新工作流
dapr workflow run OrderProcessingWorkflow \
  --app-id orderprocessing \
  --instance-id order-12345 \
  --input '{"orderId": "12345"}'

# 计划在 2024 年 12 月 25 日上午 10:00:00(协调世界时 UTC)启动新工作流
dapr workflow run OrderProcessingWorkflow \
  --app-id orderprocessing \
  --start-time "2024-12-25T10:00:00Z"

列出工作流实例

# 列出应用的所有工作流
dapr workflow list

# 按状态筛选
dapr workflow list --filter-status RUNNING

# 按工作流名称和应用 ID 筛选
dapr workflow list --app-id orderprocessing --filter-name OrderProcessingWorkflow

# 按时间筛选(过去 24 小时内启动的工作流)
dapr workflow list --filter-max-age 24h

# 获取详细输出
dapr workflow list -o wide

查看工作流历史

# 获取执行历史
dapr workflow history order-12345

# 在特定应用 ID 上以 JSON 格式获取历史
dapr workflow history order-12345 --app-id orderprocessing --output json

控制工作流执行

# 暂停正在运行的工作流
dapr workflow suspend order-12345 \
  --app-id orderprocessing \
  --reason "Waiting for manual approval"

# 恢复已暂停的工作流
dapr workflow resume order-12345 \
  --app-id orderprocessing \
  --reason "Approved by manager"

# 终止工作流
dapr workflow terminate order-12345 \
  --app-id orderprocessing \
  --output '{"reason": "Cancelled by customer"}'

触发外部事件

# 为等待中的工作流触发事件
dapr workflow raise-event order-12345/PaymentReceived \
  --app-id orderprocessing \
  --input '{"paymentId": "pay-67890", "amount": 100.50}'

重运行工作流

# 从头重运行
dapr workflow rerun order-12345

# 从特定事件 ID 重运行(通过 history 命令发现)
dapr workflow rerun order-12345 --event-id 5

# 使用新的指定实例 ID 重运行
dapr workflow rerun order-12345 --new-instance-id order-12345-retry

清理已完成的工作流

请注意,从 CLI 清理工作流也会删除所有关联的 Scheduler 提醒。

# 清理特定实例
dapr workflow purge order-12345

# 清理 30 天前已完成的所有工作流
dapr workflow purge --all-older-than 720h

# 清理所有终结状态的工作流(谨慎使用!)
dapr workflow purge --app-id orderprocessing --all

# 在没有运行工作流客户端的情况下强制清理(极度谨慎使用!)
dapr workflow purge order-12345 --force

Kubernetes 操作

所有命令都支持 Kubernetes 部署的 -k 标志:

# 在 Kubernetes 中列出工作流
dapr workflow list \
  --kubernetes \
  --namespace production \
  --app-id orderprocessing

# 在 Kubernetes 中暂停工作流
dapr workflow suspend order-12345 \
  --kubernetes \
  --namespace production \
  --app-id orderprocessing \
  --reason "Maintenance window"

列出工作流

在自托管模式下,只需运行:

dapr workflow list

在 Kubernetes 模式下,需要指定 --kubernetes/-k 标志以及命名空间和应用 ID:

dapr workflow list -k

工作流管理最佳实践

  1. 监控正在运行的工作流:使用筛选列表跟踪长期运行的实例

    dapr workflow list --app-id orderprocessing --filter-status RUNNING --filter-max-age 24h
    
  2. 使用实例 ID:分配有意义的实例 ID 以便更轻松地跟踪

    dapr workflow run OrderWorkflow --app-id orderprocessing --instance-id "order-$(date +%s)"
    
  3. 导出以供分析:导出工作流数据以供分析

    dapr workflow list --app-id orderprocessing --output json > workflows.json
    

使用 Dapr CLI 管理工作流提醒

工作流提醒存储在 Scheduler 中,可以使用 dapr scheduler CLI 进行管理。

列出工作流提醒

dapr scheduler list --filter workflow
NAME                                           BEGIN     COUNT  LAST TRIGGER
workflow/my-app/instance1/timer-0-ABC123       +50.0h    0
workflow/my-app/instance2/timer-0-XYZ789       +50.0h    0

获取提醒详情

dapr scheduler get workflow/my-app/instance1/timer-0-ABC123 -o yaml

删除工作流提醒

删除单个提醒:

dapr scheduler delete workflow/my-app/instance1/timer-0-ABC123

删除给定工作流应用的所有提醒:

dapr scheduler delete-all workflow/my-app

删除特定工作流实例的所有提醒:

dapr scheduler delete-all workflow/my-app/instance1

备份和恢复提醒

导出所有提醒:

dapr scheduler export -o workflow-reminders-backup.bin

从备份文件恢复:

dapr scheduler import -f workflow-reminders-backup.bin

在代码中管理工作流。在编写工作流指南的工作流示例中,工作流使用以下 API 注册到代码中:

  • schedule_new_workflow:启动工作流实例
  • get_workflow_state:获取工作流状态信息
  • pause_workflow:暂停或挂起工作流实例,以便后续恢复
  • resume_workflow:恢复已暂停的工作流实例
  • raise_workflow_event:向工作流触发事件
  • purge_workflow:移除与特定工作流实例相关的所有元数据
  • wait_for_workflow_completion:等待特定工作流实例完成
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

# 合理的参数
instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"

# 启动工作流
wf_client.schedule_new_workflow(
        workflow=hello_world_wf, input=input_data, instance_id=instance_id
    )

# 获取工作流信息
wf_client.get_workflow_state(instance_id=instance_id)

# 暂停工作流
wf_client.pause_workflow(instance_id=instance_id)
    metadata = wf_client.get_workflow_state(instance_id=instance_id)

# 恢复工作流
wf_client.resume_workflow(instance_id=instance_id)

# 向工作流触发事件
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)

# 清理工作流
wf_client.purge_workflow(instance_id=instance_id)

# 等待工作流完成
wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)

在代码中管理工作流。在编写工作流指南的工作流示例中,工作流使用以下 API 注册到代码中:

  • client.workflow.start:启动工作流实例
  • client.workflow.get:获取工作流状态信息
  • client.workflow.pause:暂停或挂起工作流实例,以便后续恢复
  • client.workflow.resume:恢复已暂停的工作流实例
  • client.workflow.purge:移除与特定工作流实例相关的所有元数据
  • client.workflow.terminate:终止或停止工作流的特定实例
import { DaprClient } from "@dapr/dapr";

async function printWorkflowStatus(client: DaprClient, instanceId: string) {
  const workflow = await client.workflow.get(instanceId);
  console.log(
    `Workflow ${workflow.workflowName}, created at ${workflow.createdAt.toUTCString()}, has status ${
      workflow.runtimeStatus
    }`,
  );
  console.log(`Additional properties: ${JSON.stringify(workflow.properties)}`);
  console.log("--------------------------------------------------\n\n");
}

async function start() {
  const client = new DaprClient();

  // 启动新的工作流实例
  const instanceId = await client.workflow.start("OrderProcessingWorkflow", {
    Name: "Paperclips",
    TotalCost: 99.95,
    Quantity: 4,
  });
  console.log(`Started workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // 暂停工作流实例
  await client.workflow.pause(instanceId);
  console.log(`Paused workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // 恢复工作流实例
  await client.workflow.resume(instanceId);
  console.log(`Resumed workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // 终止工作流实例
  await client.workflow.terminate(instanceId);
  console.log(`Terminated workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // 等待工作流完成,30 秒!
  await new Promise((resolve) => setTimeout(resolve, 30000));
  await printWorkflowStatus(client, instanceId);

  // 清理工作流实例
  await client.workflow.purge(instanceId);
  console.log(`Purged workflow instance ${instanceId}`);
  // 这会抛出错误,因为工作流实例已不存在
  await printWorkflowStatus(client, instanceId);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});

在代码中管理工作流。在编写工作流指南的 OrderProcessingWorkflow 示例中,工作流已注册到代码中。现在您可以启动、终止和获取运行中工作流的信息:

string orderId = "exampleOrderId";
OrderPayload input = new OrderPayload("Paperclips", 99.95);
Dictionary<string, string> workflowOptions; // 这是一个可选参数

// 使用 orderId 作为工作流 ID 启动工作流。这返回一个字符串,包含特定工作流实例的实例 ID,无论我们是自行提供还是由系统生成
await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow), orderId, input, workflowOptions);

// 获取工作流信息。此响应包含工作流状态、启动时间等信息!
WorkflowState currentState = await daprWorkflowClient.GetWorkflowStateAsync(orderId, orderId);

// 终止工作流
await daprWorkflowClient.TerminateWorkflowAsync(orderId);

// 触发事件(传入的采购订单),工作流将等待该事件
await daprWorkflowClient.RaiseEventAsync(orderId, "incoming-purchase-order", input);

// 暂停
await daprWorkflowClient.SuspendWorkflowAsync(orderId);

// 恢复
await daprWorkflowClient.ResumeWorkflowAsync(orderId);

// 清理工作流,从关联实例中移除所有收件箱和历史信息
await daprWorkflowClient.PurgeInstanceAsync(orderId);

在代码中管理工作流。在 Java SDK 的工作流示例中,工作流使用以下 API 注册到代码中:

  • scheduleNewWorkflow:启动新的工作流实例
  • getInstanceState:获取工作流状态信息
  • waitForInstanceStart:暂停或挂起工作流实例,以便后续恢复
  • raiseEvent:为运行中的工作流实例触发事件/任务
  • waitForInstanceCompletion:等待工作流完成任务
  • purgeInstance:移除与特定工作流实例相关的所有元数据
  • terminateWorkflow:终止工作流
  • purgeInstance:移除与特定工作流相关的所有元数据
package io.dapr.examples.workflows;

import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;

// ...
public class DemoWorkflowClient {

  // ...
  public static void main(String[] args) throws InterruptedException {
    DaprWorkflowClient client = new DaprWorkflowClient();

    try (client) {
      // 启动工作流
      String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
      
      // 获取工作流的状态信息
      WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);

      // 等待或暂停工作流实例启动
      try {
        WorkflowInstanceStatus waitForInstanceStartResult =
            client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
      }

      // 为工作流触发事件;您可以并行触发多个事件
      client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
      client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
      client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
      client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");

      // 等待工作流完成任务
      try {
        WorkflowInstanceStatus waitForInstanceCompletionResult =
            client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
      } 

      // 清理工作流实例,移除与其关联的所有元数据
      boolean purgeResult = client.purgeInstance(instanceId);

      // 终止工作流实例
      client.terminateWorkflow(instanceToTerminateId, null);

    System.exit(0);
  }
}

在代码中管理工作流。在 Go SDK 的工作流示例中,工作流使用以下 API 注册到代码中:

  • StartWorkflow:启动新的工作流实例
  • GetWorkflow:获取工作流状态信息
  • PauseWorkflow:暂停或挂起工作流实例,以便后续恢复
  • RaiseEventWorkflow:为运行中的工作流实例触发事件/任务
  • ResumeWorkflow:等待工作流完成任务
  • PurgeWorkflow:移除与特定工作流实例相关的所有元数据
  • TerminateWorkflow:终止工作流
// 启动工作流
type StartWorkflowRequest struct {
	InstanceID        string // 可选的实例标识符
	WorkflowComponent string
	WorkflowName      string
	Options           map[string]string // 可选的元数据
	Input             any               // 可选的输入
	SendRawInput      bool              // 设置为 True 以禁用输入上的序列化
}

type StartWorkflowResponse struct {
	InstanceID string
}

// 获取工作流状态
type GetWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

type GetWorkflowResponse struct {
	InstanceID    string
	WorkflowName  string
	CreatedAt     time.Time
	LastUpdatedAt time.Time
	RuntimeStatus string
	Properties    map[string]string
}

// 清理工作流
type PurgeWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// 终止工作流
type TerminateWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// 暂停工作流
type PauseWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// 恢复工作流
type ResumeWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// 为运行中的工作流触发事件
type RaiseEventWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
	EventName         string
	EventData         any
	SendRawData       bool // 设置为 True 以禁用数据上的序列化
}

使用 HTTP 调用管理工作流。下面的示例将编写工作流示例中的属性与随机实例 ID 结合使用。

启动工作流

使用 ID 12345678 启动工作流,运行:

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678"

请注意,工作流实例 ID 只能包含字母数字字符、下划线和破折号。

终止工作流

使用 ID 12345678 终止工作流,运行:

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/12345678/terminate"

触发事件

对于支持订阅外部事件的工作流组件(如 Dapr Workflow engine),您可以使用以下"触发事件" API 将命名事件传递到特定的工作流实例。

curl -X POST "http://localhost:3500/v1.0/workflows/<workflowComponentName>/<instanceID>/raiseEvent/<eventName>"

eventName 可以是任意函数。

暂停或恢复工作流

为了计划停机时间、等待输入等,您可以暂停然后恢复工作流。使用 ID 12345678 暂停工作流直到触发恢复,运行:

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/12345678/pause"

使用 ID 12345678 恢复工作流,运行:

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/12345678/resume"

清理工作流

清理 API 可用于从底层状态存储中永久删除工作流元数据,包括所有存储的输入、输出和工作流历史记录。这通常有助于实现数据保留策略和释放资源。

只有处于 COMPLETED、FAILED 或 TERMINATED 状态的工作流实例才能被清理。如果工作流处于任何其他状态,调用清理将返回错误。

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/12345678/purge"

获取工作流信息

使用 ID 12345678 获取工作流信息(输出和输入),运行:

curl -X GET "http://localhost:3500/v1.0/workflows/dapr/12345678"

下一步

现在您已了解如何管理工作流,学习如何在多个应用程序中执行工作流

多应用程序工作流>>

相关链接