如何操作:实现可插拔组件

了解如何编写和实现可插拔组件

在本指南中,您将了解为何以及如何实现可插拔组件。要了解如何配置和注册可插拔组件,请参阅如何操作:注册可插拔组件

实现可插拔组件

为了实现可插拔组件,您需要在组件中实现一个 gRPC 服务。实现 gRPC 服务需要三个步骤:

查找 proto 定义文件

为每个支持的服务接口(状态存储、发布订阅、绑定、密钥存储)都提供了 Proto 定义。

目前,支持以下组件 API:

  • 状态存储
  • 发布订阅
  • 绑定
  • 密钥存储
组件类型gRPC 定义内置参考实现文档
状态存储statestate.protoRedis概念, 如何操作, api 规范
发布订阅pubsubpubsub.protoRedis概念, 如何操作, api 规范
绑定bindingsbindings.protoKafka概念, 输入如何操作, 输出如何操作, api 规范
密钥存储secretstoressecretstore.protoHashicorp/Vault概念, howto-secrets, api 规范

以下是可插拔组件状态存储([state.proto])的 gRPC 服务定义片段:

// StateStore 服务为状态存储组件提供 gRPC 接口。
service StateStore {
  // 使用给定的元数据初始化状态存储组件。
  rpc Init(InitRequest) returns (InitResponse) {}
  // 返回已实现的状态存储功能列表。
  rpc Features(FeaturesRequest) returns (FeaturesResponse) {}
  // Ping 状态存储。用于存活性检查。
  rpc Ping(PingRequest) returns (PingResponse) {}
  
  // 从状态存储中删除指定的键。
  rpc Delete(DeleteRequest) returns (DeleteResponse) {}
  // 从给定键获取数据。
  rpc Get(GetRequest) returns (GetResponse) {}
  // 设置指定键的值。
  rpc Set(SetRequest) returns (SetResponse) {}


  // 一次删除多个键。
  rpc BulkDelete(BulkDeleteRequest) returns (BulkDeleteResponse) {}
  // 一次检索多个键。
  rpc BulkGet(BulkGetRequest) returns (BulkGetResponse) {}
  // 一次设置多个键的值。
  rpc BulkSet(BulkSetRequest) returns (BulkSetResponse) {}
}

StateStore 服务的接口总共公开了 9 个方法:

  • 2 个方法用于初始化和组件能力声明(Init 和 Features)
  • 1 个方法用于健康或存活性检查(Ping)
  • 3 个方法用于 CRUD 操作(Get、Set、Delete)
  • 3 个方法用于批量 CRUD 操作(BulkGet、BulkSet、BulkDelete)

创建服务脚手架

使用 协议缓冲区和 gRPC 工具 为服务创建必要的脚手架。通过 gRPC 概念文档了解有关这些工具的更多信息。

这些工具生成针对任何支持 gRPC 的语言的代码。此代码作为服务器的基座,它提供:

  • 处理客户端调用的功能
  • 基础设施,用于:
    • 解码传入请求
    • 执行服务方法
    • 编码服务响应

生成的代码是不完整的。它缺少:

  • 目标服务定义的方法的具体实现(可插拔组件的核心)。
  • 如何处理 Unix Socket Domain 集成的代码,这是 Dapr 特有的。
  • 处理与下游服务集成的代码。

在下一步中了解有关填补这些空白的信息。

定义服务

为所需服务提供具体实现。每个组件都有用于其核心功能的 gRPC 服务定义,这与核心组件接口相同。例如:

  • 状态存储

    可插拔状态存储必须提供 StateStore 服务接口的实现。

    除了此核心功能外,某些组件可能还会在其他可选服务下公开功能。例如,您可以通过为 QueriableStateStore 服务和 TransactionalStateStore 服务定义实现来添加额外功能。

  • 发布订阅

    可插拔发布订阅组件只有一个在 pubsub.proto 中定义的核心服务接口。它们没有可选服务接口。

  • 绑定

    可插拔输入和输出绑定在 bindings.proto 上有一个单一的核心服务定义。它们没有可选服务接口。

  • 密钥存储

    可插拔密钥存储在 secretstore.proto 上有一个单一的核心服务定义。它们没有可选服务接口。

使用 gRPC 和协议缓冲区工具生成上述状态存储示例的服务脚手架代码后,您可以为 service StateStore 下定义的 9 个方法定义具体实现,以及用于初始化和与依赖项通信的代码。

此具体实现和辅助代码是可插拔组件的核心。它们定义了组件在处理来自 Dapr 的 gRPC 请求时的行为。

返回语义错误

返回语义错误也是可插拔组件协议的一部分。组件必须返回对用户应用程序具有语义含义的特定 gRPC 代码,这些错误用于从并发要求到仅提供信息的各种情况。

错误gRPC 错误代码源组件描述
ETag 不匹配codes.FailedPrecondition状态存储用于满足并发要求的错误映射
ETag 无效codes.InvalidArgument状态存储
批量删除行不匹配codes.Internal状态存储

状态管理概述中了解有关并发要求的更多信息。

以下示例演示如何在您自己的可插拔组件中返回错误,您可以根据需要更改消息。

重要提示: 为了使用 .NET 进行错误映射,请先安装 Google.Api.CommonProtos NuGet 包

Etag 不匹配

var badRequest = new BadRequest();
var des = "提供的 ETag 字段与存储中的不匹配";
badRequest.FieldViolations.Add(    
   new Google.Rpc.BadRequest.Types.FieldViolation
       {        
         Field = "etag",
         Description = des
       });

var baseStatusCode = Grpc.Core.StatusCode.FailedPrecondition;
var status = new Google.Rpc.Status{    
   Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

Etag 无效

var badRequest = new BadRequest();
var des = "ETag 字段必须仅包含字母数字字符";
badRequest.FieldViolations.Add(
   new Google.Rpc.BadRequest.Types.FieldViolation
   {
      Field = "etag",
      Description = des
   });

var baseStatusCode = Grpc.Core.StatusCode.InvalidArgument;
var status = new Google.Rpc.Status
{
   Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(badRequest));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

批量删除行不匹配

var errorInfo = new Google.Rpc.ErrorInfo();

errorInfo.Metadata.Add("expected", "100");
errorInfo.Metadata.Add("affected", "99");

var baseStatusCode = Grpc.Core.StatusCode.Internal;
var status = new Google.Rpc.Status{
    Code = (int)baseStatusCode
};

status.Details.Add(Google.Protobuf.WellKnownTypes.Any.Pack(errorInfo));

var metadata = new Metadata();
metadata.Add("grpc-status-details-bin", status.ToByteArray());
throw new RpcException(new Grpc.Core.Status(baseStatusCode, "fake-err-msg"), metadata);

就像 Dapr Java SDK 一样,Java 可插拔组件 SDK 使用 Project Reactor,它为 Java 提供异步 API。

可以通过以下方式直接返回错误:

  1. 在方法返回的 MonoFlux 中调用 .error() 方法
  2. 提供适当的异常作为参数。

您也可以引发异常,只要它被捕获并反馈到生成的 MonoFlux 中。

ETag 不匹配

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.FAILED_PRECONDITION.value())
    .setMessage("fake-err-msg-for-etag-mismatch")
    .addDetails(Any.pack(BadRequest.FieldViolation.newBuilder()
        .setField("etag")
        .setDescription("提供的 ETag 字段与存储中的不匹配")
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

ETag 无效

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.INVALID_ARGUMENT.value())
    .setMessage("fake-err-msg-for-invalid-etag")
    .addDetails(Any.pack(BadRequest.FieldViolation.newBuilder()
        .setField("etag")
        .setDescription("ETag 字段必须仅包含字母数字字符")
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

批量删除行不匹配

final Status status = Status.newBuilder()
    .setCode(io.grpc.Status.Code.INTERNAL.value())
    .setMessage("fake-err-msg-for-bulk-delete-row-mismatch")
    .addDetails(Any.pack(ErrorInfo.newBuilder()
        .putAllMetadata(Map.ofEntries(
            Map.entry("affected", "99"),
            Map.entry("expected", "100")
        ))
        .build()))
    .build();
return Mono.error(StatusProto.toStatusException(status));

ETag 不匹配

st := status.New(codes.FailedPrecondition, "fake-err-msg")
desc := "提供的 ETag 字段与存储中的不匹配"
v := &errdetails.BadRequest_FieldViolation{
	Field:       etagField,
	Description: desc,
}
br := &errdetails.BadRequest{}
br.FieldViolations = append(br.FieldViolations, v)
st, err := st.WithDetails(br)

ETag 无效

st := status.New(codes.InvalidArgument, "fake-err-msg")
desc := "ETag 字段必须仅包含字母数字字符"
v := &errdetails.BadRequest_FieldViolation{
	Field:       etagField,
	Description: desc,
}
br := &errdetails.BadRequest{}
br.FieldViolations = append(br.FieldViolations, v)
st, err := st.WithDetails(br)

批量删除行不匹配

st := status.New(codes.Internal, "fake-err-msg")
br := &errdetails.ErrorInfo{}
br.Metadata = map[string]string{
	affected: "99",
	expected: "100",
}
st, err := st.WithDetails(br)

后续步骤