方法指南:编写工作流
本文提供有关如何编写由 Dapr 工作流引擎执行的工作流的高级概述。
Note
如果你还没有尝试过,建议先体验一下 工作流快速入门 ,快速了解如何使用工作流。以代码形式编写工作流
Dapr 工作流逻辑使用通用编程语言实现,使你能够:
- 使用你偏好的编程语言(无需学习新的 DSL 或 YAML 模式)。
- 访问语言的标准库。
- 构建你自己的库和抽象。
- 使用调试器并检查局部变量。
- 像应用程序的其他部分一样编写工作流的单元测试。
Dapr 边车不会加载任何工作流定义。相反,边车只是驱动工作流的执行,所有工作流活动都作为应用程序的一部分。
编写工作流活动
工作流活动 是工作流中的基本工作单元,是在业务流程中被编排的任务。
定义你希望工作流执行的工作流活动。活动是一个函数定义,可以接受输入和输出。以下示例创建了一个名为 hello_act 的计数器(活动),用于通知用户当前的计数器值。hello_act 是一个从名为 WorkflowActivityContext 的类派生的函数。
@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
global counter
counter += wf_input
print(f'New counter value is: {counter}!', flush=True)
定义你希望工作流执行的工作流活动。活动被包装在 WorkflowActivityContext 类中,该类实现了工作流活动。
export default class WorkflowActivityContext {
private readonly _innerContext: ActivityContext;
constructor(innerContext: ActivityContext) {
if (!innerContext) {
throw new Error("ActivityContext cannot be undefined");
}
this._innerContext = innerContext;
}
public getWorkflowInstanceId(): string {
return this._innerContext.orchestrationId;
}
public getWorkflowActivityId(): number {
return this._innerContext.taskId;
}
}
定义你希望工作流执行的工作流活动。活动是一个类定义,可以接受输入和输出。活动还参与依赖注入,例如绑定到 Dapr 客户端。
以下示例中调用的活动包括:
NotifyActivity:接收新订单通知。ReserveInventoryActivity:检查是否有足够的库存来满足新订单。ProcessPaymentActivity:处理订单付款。包括NotifyActivity用于发送成功订单通知。
NotifyActivity
public class NotifyActivity : WorkflowActivity<Notification, object>
{
//...
public NotifyActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<NotifyActivity>();
}
//...
}
查看完整的 NotifyActivity.cs 工作流活动示例。
ReserveInventoryActivity
public class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
{
//...
public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
{
this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
this.client = client;
}
//...
}
查看完整的 ReserveInventoryActivity.cs 工作流活动示例。
ProcessPaymentActivity
public class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
{
//...
public ProcessPaymentActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
}
//...
}
定义你希望工作流执行的工作流活动。活动被包装在公共 DemoWorkflowActivity 类中,该类实现了工作流活动。
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class DemoWorkflowActivity implements WorkflowActivity {
@Override
public DemoActivityOutput run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class);
logger.info("Starting Activity: " + ctx.getName());
var message = ctx.getInput(DemoActivityInput.class).getMessage();
var newMessage = message + " World!, from Activity";
logger.info("Message Received from input: " + message);
logger.info("Sending message to output: " + newMessage);
logger.info("Sleeping for 5 seconds to simulate long running operation...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("Activity finished");
var output = new DemoActivityOutput(message, newMessage);
logger.info("Activity returned: " + output);
return output;
}
}
定义工作流活动
定义你希望工作流执行的每个工作流活动。活动输入可以使用 ctx.GetInput 从上下文中解组。活动应定义为接受 ctx workflow.ActivityContext 参数并返回接口和错误的函数。
func BusinessActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
// Do something here
return "result", nil
}
定义工作流
使用参数 ctx *workflow.WorkflowContext 定义工作流函数,并返回 any 和 error。从工作流内部调用你定义的活动。
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(BusinessActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
if err := ctx.WaitForExternalEvent("businessEvent", time.Minute*60).Await(&output); err != nil {
return nil, err
}
if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
return nil, nil
}
return output, nil
}
注册工作流和活动
在你的应用程序可以执行工作流之前,你必须将工作流编排器和其活动注册到工作流注册表中。这确保 Dapr 知道在执行工作流时调用哪些函数。
func main() {
// Create a workflow registry
r := workflow.NewRegistry()
// Register the workflow orchestrator
if err := r.AddWorkflow(BusinessWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("BusinessWorkflow registered")
// Register the workflow activities
if err := r.AddActivity(BusinessActivity); err != nil {
log.Fatal(err)
}
fmt.Println("BusinessActivity registered")
// Create workflow client and start worker
wclient, err := client.NewWorkflowClient()
if err != nil {
log.Fatal(err)
}
fmt.Println("Worker initialized")
ctx, cancel := context.WithCancel(context.Background())
if err = wclient.StartWorker(ctx, r); err != nil {
log.Fatal(err)
}
fmt.Println("runner started")
// Your application logic continues here...
// Example: Start a workflow
instanceID, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)
// Stop workflow worker when done
cancel()
fmt.Println("workflow worker successfully shutdown")
}
关于注册的关键要点:
- 使用
workflow.NewRegistry()创建工作流注册表 - 使用
r.AddWorkflow()注册工作流函数 - 使用
r.AddActivity()注册活动函数 - 使用
client.NewWorkflowClient()创建工作流客户端 - 调用
wclient.StartWorker()开始处理工作流 - 使用
wclient.ScheduleWorkflow调度命名的工作流实例
编写工作流
接下来,在工作流中注册和调用活动。
hello_world_wf 函数是一个从名为 DaprWorkflowContext 的类派生的函数,具有输入和输出参数类型。它还包括一个 yield 语句,该语句完成工作流的核心逻辑并调用工作流活动。
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
# Change in event handling: Use when_any to handle both event and timeout
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield when_any([event, timeout])
if winner == timeout:
print('Workflow timed out waiting for event')
return 'Timeout'
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
return 'Completed'
接下来,向 WorkflowRuntime 类注册工作流并启动工作流运行时。
export default class WorkflowRuntime {
//..
// Register workflow implementation for handling orchestrations
public registerWorkflow(workflow: TWorkflow): WorkflowRuntime {
const name = getFunctionName(workflow);
const workflowWrapper = (ctx: OrchestrationContext, input: any): any => {
const workflowContext = new WorkflowContext(ctx);
return workflow(workflowContext, input);
};
this.worker.addNamedOrchestrator(name, workflowWrapper);
return this;
}
// Register workflow activities
public registerActivity(fn: TWorkflowActivity<TInput, TOutput>): WorkflowRuntime {
const name = getFunctionName(fn);
const activityWrapper = (ctx: ActivityContext, input: TInput): TOutput => {
const wfActivityContext = new WorkflowActivityContext(ctx);
return fn(wfActivityContext, input);
};
this.worker.addNamedActivity(name, activityWrapper);
return this;
}
// Start the workflow runtime processing items and block.
public async start() {
await this.worker.start();
}
}
OrderProcessingWorkflow 类是从名为 Workflow 的基类派生的,具有输入和输出参数类型。它还包括一个 RunAsync 方法,该方法完成工作流的核心逻辑并调用工作流活动。
class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
//...
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));
//...
InventoryResult result = await context.CallActivityAsync<InventoryResult>(
nameof(ReserveInventoryActivity),
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
//...
await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} processed successfully!"));
// End the workflow with a success result
return new OrderResult(Processed: true);
}
}
接下来,向 WorkflowRuntimeBuilder 注册工作流并启动工作流运行时。
public class DemoWorkflowWorker {
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class);
builder.registerActivity(DemoWorkflowActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start();
}
System.exit(0);
}
}
使用参数 ctx *workflow.WorkflowContext 定义工作流函数,并返回 any 和 error。从工作流内部调用你定义的活动。
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(BusinessActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
if err := ctx.WaitForExternalEvent("businessEvent", time.Minute*60).Await(&output); err != nil {
return nil, err
}
if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
return nil, nil
}
return output, nil
}
编写应用程序
最后,使用工作流编写应用程序。
在以下示例中,对于使用 Python SDK 的基本 Python hello world 应用程序,你的项目代码应包括:
- 一个名为
DaprClient的 Python 包,用于接收 Python SDK 功能。 - 一个带有扩展的构建器:
- API 调用。在以下示例中,这些调用启动、暂停、恢复、清除和完成工作流。
from datetime import timedelta
from time import sleep
from dapr.ext.workflow import (
WorkflowRuntime,
DaprWorkflowContext,
WorkflowActivityContext,
RetryPolicy,
DaprWorkflowClient,
when_any,
)
from dapr.conf import Settings
from dapr.clients.exceptions import DaprInternalError
settings = Settings()
counter = 0
retry_count = 0
child_orchestrator_count = 0
child_orchestrator_string = ''
child_act_retry_count = 0
instance_id = 'exampleInstanceID'
child_instance_id = 'childInstanceID'
workflow_name = 'hello_world_wf'
child_workflow_name = 'child_wf'
input_data = 'Hi Counter!'
event_name = 'event1'
event_data = 'eventData'
non_existent_id_error = 'no such instance exists'
retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
)
wfr = WorkflowRuntime()
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
# Change in event handling: Use when_any to handle both event and timeout
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield when_any([event, timeout])
if winner == timeout:
print('Workflow timed out waiting for event')
return 'Timeout'
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
return 'Completed'
@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
global counter
counter += wf_input
print(f'New counter value is: {counter}!', flush=True)
@wfr.activity(name='hello_retryable_act')
def hello_retryable_act(ctx: WorkflowActivityContext):
global retry_count
if (retry_count % 2) == 0:
print(f'Retry count value is: {retry_count}!', flush=True)
retry_count += 1
raise ValueError('Retryable Error')
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
retry_count += 1
@wfr.workflow(name='child_retryable_wf')
def child_retryable_wf(ctx: DaprWorkflowContext):
global child_orchestrator_string, child_orchestrator_count
if not ctx.is_replaying:
child_orchestrator_count += 1
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
child_orchestrator_string += str(child_orchestrator_count)
yield ctx.call_activity(
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
)
if child_orchestrator_count < 3:
raise ValueError('Retryable Error')
@wfr.activity(name='act_for_child_wf')
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
global child_orchestrator_string, child_act_retry_count
inp_char = chr(96 + inp)
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
child_orchestrator_string += inp_char
if child_act_retry_count % 2 == 0:
child_act_retry_count += 1
raise ValueError('Retryable Error')
child_act_retry_count += 1
def main():
wfr.start()
wf_client = DaprWorkflowClient()
print('==========Start Counter Increase as per Input:==========')
wf_client.schedule_new_workflow(
workflow=hello_world_wf, input=input_data, instance_id=instance_id
)
wf_client.wait_for_workflow_start(instance_id)
# Sleep to let the workflow run initial activities
sleep(12)
assert counter == 11
assert retry_count == 2
assert child_orchestrator_string == '1aa2bb3cc'
# Pause Test
wf_client.pause_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')
# Resume Test
wf_client.resume_workflow(instance_id=instance_id)
metadata = wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')
sleep(2) # Give the workflow time to reach the event wait state
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)
print('========= Waiting for Workflow completion', flush=True)
try:
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
if state.runtime_status.name == 'COMPLETED':
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
else:
print(f'Workflow failed! Status: {state.runtime_status.name}')
except TimeoutError:
print('*** Workflow timed out!')
wf_client.purge_workflow(instance_id=instance_id)
try:
wf_client.get_workflow_state(instance_id=instance_id)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')
sleep(10000)
wfr.shutdown()
if __name__ == '__main__':
main()
以下示例 是使用 JavaScript SDK 的基本 JavaScript 应用程序。与此示例一样,你的项目代码应包括:
- 一个带有扩展的构建器:
- API 调用。以下示例是一个使用工作流 API 的简单项目:
mkdir my-wf && cd my-wf
npm init -y
npm i @dapr/dapr @microsoft/durabletask-js
npm i -D typescript ts-node @types/node
创建以下 tsconfig.json 文件:
{
"compilerOptions": {
"target": "ES2020",
"module": "CommonJS",
"moduleResolution": "Node",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"outDir": "dist"
},
"include": ["src"]
}
创建以下 src/app.ts 文件:
import {
WorkflowRuntime,
WorkflowActivityContext,
WorkflowContext,
DaprWorkflowClient,
TWorkflow
} from "@dapr/dapr";
const workflowClient = new DaprWorkflowClient();
const workflowRuntime = new WorkflowRuntime();
// simple activity
const hello = async (_: WorkflowActivityContext, name: string) => `Hello ${name}!`;
// simple workflow: call the activity 3 times
const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
const out: string[] = [];
out.push(yield ctx.callActivity(hello, "Tokyo"));
out.push(yield ctx.callActivity(hello, "Seattle"));
out.push(yield ctx.callActivity(hello, "London"));
out.push(yield ctx.waitForExternalEvent("continue"));
return out;
};
async function main() {
workflowRuntime.registerWorkflow(sequence).registerActivity(hello);
await workflowRuntime.start();
const id = await workflowClient.scheduleNewWorkflow(sequence);
console.log("Scheduled:", id);
workflowClient.raiseEvent(id, "continue", "Go go go!");
const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
console.log("Done:", state?.runtimeStatus, "output:", state?.serializedOutput);
await new Promise(f => setTimeout(f, 100000));
await workflowRuntime.stop();
await workflowClient.stop();
}
main().catch((e) => { console.error(e); });
在以下 Program.cs 示例中,对于使用 .NET SDK 的基本 ASP.NET 订单处理应用程序,你的项目代码应包括:
- 一个名为
Dapr.Workflow的 NuGet 包,用于接收 .NET SDK 功能 - 一个带有扩展方法
AddDaprWorkflow的构建器- 这允许你注册工作流和工作流活动(工作流可以调度的任务)
- HTTP API 调用
- 一个用于提交新订单
- 一个用于检查现有订单的状态
using Dapr.Workflow;
//...
// Dapr Workflows are registered as part of the service configuration
builder.Services.AddDaprWorkflow(options =>
{
// Note that it's also possible to register a lambda function as the workflow
// or activity implementation instead of a class.
options.RegisterWorkflow<OrderProcessingWorkflow>();
// These are the activities that get invoked by the workflow(s).
options.RegisterActivity<NotifyActivity>();
options.RegisterActivity<ReserveInventoryActivity>();
options.RegisterActivity<ProcessPaymentActivity>();
});
WebApplication app = builder.Build();
// POST starts new order workflow instance
app.MapPost("/orders", async (DaprWorkflowClient client, [FromBody] OrderPayload orderInfo) =>
{
if (orderInfo?.Name == null)
{
return Results.BadRequest(new
{
message = "Order data was missing from the request",
example = new OrderPayload("Paperclips", 99.95),
});
}
//...
});
// GET fetches state for order workflow to report status
app.MapGet("/orders/{orderId}", async (string orderId, DaprWorkflowClient client) =>
{
WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
if (!state.Exists)
{
return Results.NotFound($"No order with ID = '{orderId}' was found.");
}
var httpResponsePayload = new
{
details = state.ReadInputAs<OrderPayload>(),
status = state.RuntimeStatus.ToString(),
result = state.ReadOutputAs<OrderResult>(),
};
//...
}).WithName("GetOrderInfoEndpoint");
app.Run();
如以下示例所示,使用 Java SDK 和 Dapr Workflow 的 hello-world 应用程序应包括:
- 一个名为
io.dapr.workflows.client的 Java 包,用于接收 Java SDK 客户端功能。 - 导入
io.dapr.workflows.Workflow DemoWorkflow类,它扩展了Workflow- 使用输入和输出创建工作流。
- API 调用。在以下示例中,这些调用启动并调用工作流活动。
package io.dapr.examples.workflows;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
/**
* Implementation of the DemoWorkflow for the server side.
*/
public class DemoWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
// ...
ctx.getLogger().info("Calling Activity...");
var input = new DemoActivityInput("Hello Activity!");
var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
// ...
};
}
}
如以下示例所示,使用 Go SDK 和 Dapr Workflow 的 hello-world 应用程序应包括:
- 一个名为
client的 Go 包,用于接收 Go SDK 客户端功能。 BusinessWorkflow方法- 使用输入和输出创建工作流。
- API 调用。在以下示例中,这些调用启动并调用工作流活动。
package main
import (
"context"
"errors"
"fmt"
"log"
"strconv"
"time"
"github.com/dapr/durabletask-go/workflow"
"github.com/dapr/go-sdk/client"
)
var stage = 0
var failActivityTries = 0
func main() {
r := workflow.NewRegistry()
if err := r.AddWorkflow(BusinessWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("BusinessWorkflow registered")
if err := r.AddActivity(BusinessActivity); err != nil {
log.Fatal(err)
}
fmt.Println("BusinessActivity registered")
if err := r.AddActivity(FailActivity); err != nil {
log.Fatal(err)
}
fmt.Println("FailActivity registered")
wclient, err := client.NewWorkflowClient()
if err != nil {
log.Fatal(err)
}
fmt.Println("Worker initialized")
ctx, cancel := context.WithCancel(context.Background())
if err = wclient.StartWorker(ctx, r); err != nil {
log.Fatal(err)
}
fmt.Println("runner started")
// Start workflow test
// Set the start time to the current time to not wait for the workflow to
// "start". This is useful for increasing the throughput of creating
// workflows.
// workflow.WithStartTime(time.Now())
instanceID, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput("1"))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)
// Pause workflow test
err = wclient.SuspendWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}
respFetch, err := wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to fetch workflow: %v", err)
}
if respFetch.RuntimeStatus != workflow.StatusSuspended {
log.Fatalf("workflow not paused: %s: %v", respFetch.RuntimeStatus, respFetch)
}
fmt.Printf("workflow paused\n")
// Resume workflow test
err = wclient.ResumeWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respFetch.RuntimeStatus != workflow.StatusRunning {
log.Fatalf("workflow not running")
}
fmt.Println("workflow resumed")
fmt.Printf("stage: %d\n", stage)
// Raise Event
err = wclient.RaiseEvent(ctx, instanceID, "businessEvent", workflow.WithEventPayload("testData"))
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
fmt.Println("workflow event raised")
time.Sleep(time.Second) // allow workflow to advance
fmt.Printf("stage: %d\n", stage)
_, err = wclient.WaitForWorkflowCompletion(ctx, instanceID)
if err != nil {
log.Fatalf("failed to wait for workflow: %v", err)
}
fmt.Printf("fail activity executions: %d\n", failActivityTries)
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %v\n", respFetch.String())
// Purge workflow test
err = wclient.PurgeWorkflowState(ctx, instanceID)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err == nil || respFetch != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")
fmt.Printf("stage: %d\n", stage)
// Terminate workflow test
id, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput("1"))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)
metadata, err := wclient.WaitForWorkflowStart(ctx, id)
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %s\n", metadata.String())
err = wclient.TerminateWorkflow(ctx, id)
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
}
fmt.Println("workflow terminated")
err = wclient.PurgeWorkflowState(ctx, id)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")
<-ctx.Done()
cancel()
fmt.Println("workflow worker successfully shutdown")
}
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(BusinessActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}
err := ctx.WaitForExternalEvent("businessEvent", time.Minute*60).Await(&output)
if err != nil {
return nil, err
}
if err := ctx.CallActivity(BusinessActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}
if err := ctx.CallActivity(FailActivity, workflow.WithActivityRetryPolicy(&workflow.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 1 * time.Second,
})).Await(nil); err == nil {
return nil, fmt.Errorf("unexpected no error executing fail activity")
}
return output, nil
}
func BusinessActivity(ctx workflow.ActivityContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return "", err
}
iinput, err := strconv.Atoi(input)
if err != nil {
return "", err
}
stage += iinput
return fmt.Sprintf("Stage: %d", stage), nil
}
func FailActivity(ctx workflow.ActivityContext) (any, error) {
failActivityTries += 1
return nil, errors.New("dummy activity error")
}
重要提示
由于基于重放的工作流的执行方式,你需要在活动内部编写执行 I/O 和与系统交互等逻辑的代码。同时,工作流方法仅用于编排这些活动。运行工作流并使用 Diagrid Dashboard 检查工作流执行
通过你的 IDE 或 Dapr CLI 启动工作流应用程序(如果你想启动多个应用程序,请使用 Dapr 多应用运行;如果只启动一个应用程序,请使用常规 Dapr run 命令),然后调度一个新的工作流实例。
使用本地 Diagrid Dashboard 可视化和检查你的工作流状态,并深入查看详细的工作流执行历史。仪表板作为容器运行,连接到 Dapr 工作流使用的状态存储(默认情况下是本地 Redis 实例)。

使用 Docker 启动 Diagrid Dashboard 容器:
docker run -p 8080:8080 ghcr.io/diagridio/diagrid-dashboard:latest
Note
如果你使用的状态存储不是默认的 Redis 实例,你需要提供一些额外的参数来运行容器,请参阅 Diagrid Dashboard 参考文档。在浏览器中打开仪表板,地址为 http://localhost:8080。
通过 Dapr CLI 测试工作流
编写工作流后,你可以使用 Dapr CLI 进行测试:
运行工作流应用程序
dapr run --app-id workflow-app python3 app.py
确保应用程序正在运行:
dapr list
运行工作流
dapr workflow run hello_world_wf --app-id workflow-app --input 'hello world' --instance-id test-run
检查工作流状态
dapr workflow list --app-id workflow-app -o wide
查看已完成的工作流
dapr workflow list --app-id workflow-app --filter-status COMPLETED -o wide
查看工作流历史
dapr workflow history --app-id workflow-app test-run
运行工作流应用程序
dapr run --app-id workflow-app npx ts-node src/app.ts
确保应用程序正在运行:
dapr list
运行工作流
dapr workflow run sequence --app-id workflow-app --input 'hello world' --instance-id test-run
检查工作流状态
dapr workflow list --app-id workflow-app -o wide
触发等待的外部事件
dapr workflow raise-event --app-id workflow-app test-run/businessEvent
查看已完成的工作流
dapr workflow list --app-id workflow-app --filter-status COMPLETED -o wide
查看工作流历史
dapr workflow history --app-id workflow-app test-run
运行工作流应用程序
dapr run --app-id workflow-app dotnet run
确保应用程序正在运行:
dapr list
运行工作流
dapr workflow run OrderProcessingWorkflow --app-id workflow-app --instance-id test-run --input '{"name": "Paperclips", "totalCost": 99.95}'
检查工作流状态
dapr workflow list --app-id workflow-app -o wide
触发等待的外部事件
dapr workflow raise-event --app-id workflow-app test-run/incoming-purchase-order --input '{"name": "Paperclips", "totalCost": 99.95}'
查看已完成的工作流
dapr workflow list --app-id workflow-app --filter-status COMPLETED -o wide
查看工作流历史
dapr workflow history --app-id workflow-app test-run
运行工作流应用程序
dapr run --app-id workflow-app -- java -jar target/WorkflowService-0.0.1-SNAPSHOT.jar
确保应用程序正在运行:
dapr list
运行工作流
dapr workflow run DemoWorkflow --app-id workflow-app --instance-id test-run --input "input data"
检查工作流状态
dapr workflow list --app-id workflow-app -o wide
触发等待的外部事件
dapr workflow raise-event --app-id workflow-app test-run/TestEvent --input 'TestEventPayload'
dapr workflow raise-event --app-id workflow-app test-run/event1 --input 'TestEvent 1 Payload'
dapr workflow raise-event --app-id workflow-app test-run/event2 --input 'TestEvent 2 Payload'
dapr workflow raise-event --app-id workflow-app test-run/event3 --input 'TestEvent 3 Payload'
查看已完成的工作流
dapr workflow list --app-id workflow-app --filter-status COMPLETED -o wide
查看工作流历史
dapr workflow history --app-id workflow-app test-run
运行工作流应用程序
dapr run --app-id workflow-app go run main.go
确保应用程序正在运行:
dapr list
运行工作流
dapr workflow run BusinessWorkflow --app-id workflow-app --input '1' --instance-id test-run
检查工作流状态
dapr workflow list --app-id workflow-app -o wide
触发等待的外部事件
dapr workflow raise-event --app-id workflow-app test-run/businessEvent
查看已完成的工作流
dapr workflow list --app-id workflow-app --filter-status COMPLETED -o wide
查看工作流历史
dapr workflow history test-run --app-id workflow-app
监控工作流执行
dapr workflow list --app-id workflow-app --filter-status RUNNING -o wide
dapr workflow list --app-id workflow-app --filter-status FAILED -o wide
dapr workflow list --app-id workflow-app --filter-status COMPLETED -o wide
测试外部事件
# 触发你的工作流正在等待的事件
dapr workflow raise-event <instance-id>/ApprovalReceived \
--app-id workflow-app \
--input '{"approved": true, "approver": "manager@company.com"}'
调试失败的工作流
# 列出失败的工作流
dapr workflow list --app-id workflow-app --filter-status FAILED --output wide
# 获取失败工作流的详细历史
dapr workflow history <failed-instance-id> --app-id workflow-app --output json
# 修复问题后重新运行工作流
dapr workflow rerun <failed-instance-id> --app-id workflow-app --input '<new-input-json-data>'
下一步
现在你已经编写了一个工作流,学习如何管理它。
管理工作流 >>