This is the multi-page printable view of this section. Click here to print.
Dapr Workflow .NET SDK
1 - DaprWorkflowClient 使用
生命周期管理
DaprWorkflowClient 可以访问网络资源,这些资源通过 TCP 套接字与 Dapr sidecar 以及其他用于管理和操作工作流的类型进行通信。DaprWorkflowClient 实现了 IAsyncDisposable 接口,以便快速清理资源。
依赖注入
AddDaprWorkflow() 方法用于通过 ASP.NET Core 的依赖注入机制注册 Dapr 工作流服务。此方法需要一个选项委托,用于定义您希望在应用程序中注册和使用的每个工作流和活动。
注意
此方法会尝试注册一个DaprClient 实例,但仅在尚未以其他生命周期注册的情况下才有效。例如,如果之前以单例生命周期调用了 AddDaprClient(),那么无论为工作流客户端选择何种生命周期,都会始终使用单例。DaprClient 实例用于与 Dapr sidecar 通信,如果尚未注册,则在 AddDaprWorkflow() 注册期间提供的生命周期将用于注册 DaprWorkflowClient 及其依赖项。单例注册
默认情况下,AddDaprWorkflow 方法会以单例生命周期注册 DaprWorkflowClient 和相关服务。这意味着服务只会被实例化一次。
以下是在典型的 Program.cs 文件中注册 DaprWorkflowClient 的示例:
builder.Services.AddDaprWorkflow(options => {
options.RegisterWorkflow<YourWorkflow>();
options.RegisterActivity<YourActivity>();
});
var app = builder.Build();
await app.RunAsync();
作用域注册
虽然默认的单例注册通常适用,但您可能希望指定不同的生命周期。这可以通过在 AddDaprWorkflow 中传递一个 ServiceLifetime 参数来实现。例如,您可能需要将另一个作用域服务注入到 ASP.NET Core 处理管道中,该管道需要 DaprClient 使用的上下文,如果前者服务注册为单例,则无法使用。
以下示例演示了这一点:
builder.Services.AddDaprWorkflow(options => {
options.RegisterWorkflow<YourWorkflow>();
options.RegisterActivity<YourActivity>();
}, ServiceLifecycle.Scoped);
var app = builder.Build();
await app.RunAsync();
瞬态注册
最后,Dapr 服务也可以使用瞬态生命周期注册,这意味着每次注入时都会重新初始化。这在以下示例中演示:
builder.Services.AddDaprWorkflow(options => {
options.RegisterWorkflow<YourWorkflow>();
options.RegisterActivity<YourActivity>();
}, ServiceLifecycle.Transient);
var app = builder.Build();
await app.RunAsync();
将服务注入到工作流活动中
工作流活动支持现代 C# 应用程序中常用的依赖注入。假设在启动时进行了适当的注册,任何此类类型都可以注入到工作流活动的构造函数中,并在工作流执行期间使用。这使得通过注入的 ILogger 添加日志记录或通过注入 DaprClient 或 DaprJobsClient 访问其他 Dapr 组件变得简单。
internal sealed class SquareNumberActivity : WorkflowActivity<int, int>
{
private readonly ILogger _logger;
public MyActivity(ILogger logger)
{
this._logger = logger;
}
public override Task<int> RunAsync(WorkflowActivityContext context, int input)
{
this._logger.LogInformation("Squaring the value {number}", input);
var result = input * input;
this._logger.LogInformation("Got a result of {squareResult}", result);
return Task.FromResult(result);
}
}
在工作流中使用 ILogger
由于工作流必须是确定性的,因此不能将任意服务注入其中。例如,如果您能够将标准 ILogger 注入到工作流中,并且由于错误需要重放它,日志记录的重复操作可能会导致混淆,因为这些操作实际上并没有再次发生。为了解决这个问题,工作流中提供了一种重放安全的日志记录器。它只会在工作流第一次运行时记录事件,而在重放时不会记录任何内容。
这种日志记录器可以通过工作流实例上的 WorkflowContext 中的方法获取,并可以像使用 ILogger 实例一样使用。
一个展示此功能的完整示例可以在 .NET SDK 仓库 中找到,以下是该示例的简要摘录。
public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>(); //使用此方法访问日志记录器实例
logger.LogInformation("Received order {orderId} for {quantity} {name} at ${totalCost}", orderId, order.Quantity, order.Name, order.TotalCost);
//...
}
}
2 - 如何:在 .NET SDK 中编写和管理 Dapr 工作流
我们来创建一个 Dapr 工作流并通过控制台调用它。在提供的订单处理工作流示例中,控制台会提示如何进行购买和补货。在本指南中,您将:
- 部署一个 .NET 控制台应用程序 (WorkflowConsoleApp)。
- 使用 .NET 工作流 SDK 和 API 调用来启动和查询工作流实例。
在 .NET 示例项目里:
- 主要的
Program.cs文件包含应用程序的设置,包括工作流和工作流活动的注册。 - 工作流定义位于
Workflows目录中。 - 工作流活动定义位于
Activities目录中。
先决条件
注意
Dapr.Workflows 在 v1.15 中支持 .NET 7 或更高版本。然而,从 Dapr v1.16 开始,仅支持 .NET 8 和 .NET 9。设置环境
克隆 .NET SDK 仓库。
git clone https://github.com/dapr/dotnet-sdk.git
从 .NET SDK 根目录,导航到 Dapr 工作流示例。
cd examples/Workflow
本地运行应用程序
要运行 Dapr 应用程序,您需要启动 .NET 程序和一个 Dapr sidecar。导航到 WorkflowConsoleApp 目录。
cd WorkflowConsoleApp
启动程序。
dotnet run
在一个新的终端中,再次导航到 WorkflowConsoleApp 目录,并在程序旁边运行 Dapr sidecar。
dapr run --app-id wfapp --dapr-grpc-port 4001 --dapr-http-port 3500
Dapr 会监听 HTTP 请求在
http://localhost:3500和内部工作流 gRPC 请求在http://localhost:4001。
启动工作流
要启动工作流,您有两种选择:
- 按照控制台提示的指示。
- 使用工作流 API 并直接向 Dapr 发送请求。
本指南重点介绍工作流 API 选项。
注意
- 您可以在
WorkflowConsoleApp/demo.http文件中找到以下命令。 - curl 请求的主体是作为工作流输入的采购订单信息。
- 命令中的 “12345678” 表示工作流的唯一标识符,可以替换为您选择的任何标识符。
运行以下命令以启动工作流。
curl -i -X POST http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 \
-H "Content-Type: application/json" \
-d '{"Name": "Paperclips", "TotalCost": 99.95, "Quantity": 1}'
curl -i -X POST http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 `
-H "Content-Type: application/json" `
-d '{"Name": "Paperclips", "TotalCost": 99.95, "Quantity": 1}'
如果成功,您应该会看到如下响应:
{"instanceID":"12345678"}
发送 HTTP 请求以获取已启动工作流的状态:
curl -i -X GET http://localhost:3500/v1.0/workflows/dapr/12345678
工作流设计为需要几秒钟才能完成。如果在您发出 HTTP 请求时工作流尚未完成,您将看到以下 JSON 响应(为便于阅读而格式化),工作流状态为 RUNNING:
{
"instanceID": "12345678",
"workflowName": "OrderProcessingWorkflow",
"createdAt": "2023-05-10T00:42:03.911444105Z",
"lastUpdatedAt": "2023-05-10T00:42:06.142214153Z",
"runtimeStatus": "RUNNING",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "{\"Name\": \"Paperclips\", \"TotalCost\": 99.95, \"Quantity\": 1}"
}
}
一旦工作流完成运行,您应该会看到以下输出,表明它已达到 COMPLETED 状态:
{
"instanceID": "12345678",
"workflowName": "OrderProcessingWorkflow",
"createdAt": "2023-05-10T00:42:03.911444105Z",
"lastUpdatedAt": "2023-05-10T00:42:18.527704176Z",
"runtimeStatus": "COMPLETED",
"properties": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "{\"Name\": \"Paperclips\", \"TotalCost\": 99.95, \"Quantity\": 1}",
"dapr.workflow.output": "{\"Processed\":true}"
}
}
当工作流完成时,工作流应用程序的标准输出应如下所示:
info: WorkflowConsoleApp.Activities.NotifyActivity[0]
Received order 12345678 for Paperclips at $99.95
info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
Reserving inventory: 12345678, Paperclips, 1
info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
Processing payment: 12345678, 99.95, USD
info: WorkflowConsoleApp.Activities.NotifyActivity[0]
Order 12345678 processed successfully!
如果您在本地机器上为 Dapr 配置了 Zipkin,那么您可以在 Zipkin Web UI(通常在 http://localhost:9411/zipkin/)中查看工作流跟踪跨度。
演示
观看此视频演示 .NET 工作流: