Azure Event Hubs

Azure Event Hubs 发布订阅组件的详细文档

组件格式

要设置 Azure Event Hubs 发布订阅,请创建类型为 pubsub.azure.eventhubs 的组件。请参阅发布订阅代理组件文件以了解 ConsumerID 是如何自动生成的。阅读操作指南:发布和订阅以了解如何创建和应用发布订阅配置。

除了下面显示的配置元数据字段外,Azure Event Hubs 还支持 Azure 身份验证机制。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: eventhubs-pubsub
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
    # connectionString 或 eventHubNamespace 是必需的
    # 当*不*使用 Microsoft Entra ID 时使用 connectionString
    - name: connectionString
      value: "Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}"
    # 使用 Microsoft Entra ID 时使用 eventHubNamespace
    - name: eventHubNamespace
      value: "namespace"
    - name: consumerID # 可选。如果未提供,运行时将创建一个。
      value: "channel1"
    - name: enableEntityManagement
      value: "false"
    - name: enableInOrderMessageDelivery
      value: "false"
    # 仅当 enableEntityManagement 设置为 true 时才需要以下四个属性
    - name: resourceGroupName
      value: "test-rg"
    - name: subscriptionID
      value: "Azure 订阅 ID 的值"
    - name: partitionCount
      value: "1"
    - name: messageRetentionInDays
      value: "3"
    # 检查点存储属性
    - name: storageAccountName
      value: "myeventhubstorage"
    - name: storageAccountKey
      value: "112233445566778899"
    - name: storageContainerName
      value: "myeventhubstoragecontainer"
    # 传递 storageAccountKey 的替代方案
    - name: storageConnectionString
      value: "DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<account-key>"

规范元数据字段

字段必填详情示例
connectionStringY*Event Hub 或 Event Hub 命名空间的连接字符串。
* 与 eventHubNamespace 字段互斥。
* 在不使用 Microsoft Entra ID 身份验证时必需。
"Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key};EntityPath={EventHub}""Endpoint=sb://{EventHubNamespace}.servicebus.windows.net/;SharedAccessKeyName={PolicyName};SharedAccessKey={Key}"
eventHubNamespaceY*Event Hub 命名空间名称。
* 与 connectionString 字段互斥。
* 在使用 Microsoft Entra ID 身份验证时必需。
"namespace"
consumerIDN消费者 ID(消费者标签)将一个或多个消费者组织成一个组。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,一条消息只由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时会将其设置为 Dapr 应用程序 ID(appID)的值。可以设置为字符串值(如上例中的 "channel1")或字符串格式的值(如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。
enableEntityManagementN布尔值,允许管理 EventHub 命名空间和存储账户。默认值:false"true", "false"
enableInOrderMessageDeliveryN输入/输出布尔值,允许按照消息发布的顺序传递消息。这假设在发布或发布时设置了 partitionKey 以确保跨分区的顺序。默认值:false
storageAccountNameY用于检查点存储的存储账户名称。"myeventhubstorage"
storageAccountKeyY*检查点存储账户的存储账户密钥。
* 使用 Microsoft Entra ID 时,如果服务主体也有权访问存储账户,则可以省略此项。
"112233445566778899"
storageConnectionStringY*检查点存储的连接字符串,指定 storageAccountKey 的替代方案"DefaultEndpointsProtocol=https;AccountName=myeventhubstorage;AccountKey=<account-key>"
storageContainerNameY存储账户名称的存储容器名称。"myeventhubstoragecontainer"
resourceGroupNameNEvent Hub 命名空间所属的资源组名称。在启用实体管理时必需"test-rg"
subscriptionIDNAzure 订阅 ID 值。在启用实体管理时必需"azure subscription id"
partitionCountN新 Event Hub 命名空间的分区数量。仅在启用实体管理时使用。默认值:"1""2"
messageRetentionInDaysN在新创建的 Event Hub 命名空间中保留消息的天数。仅在启用实体管理时使用。默认值:"1""90"

Microsoft Entra ID 身份验证

Azure Event Hubs 发布订阅组件支持使用所有 Microsoft Entra ID 机制进行身份验证。有关更多信息以及根据所选的 Microsoft Entra ID 身份验证机制需要提供的组件元数据字段,请参阅Azure 身份验证文档

配置示例

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: eventhubs-pubsub
spec:
  type: pubsub.azure.eventhubs
  version: v1
  metadata:
    # 使用的 Azure 身份验证
    - name: azureTenantId
      value: "***"
    - name: azureClientId
      value: "***"
    - name: azureClientSecret
      value: "***"
    - name: eventHubNamespace 
      value: "namespace"
    - name: enableEntityManagement
      value: "false"
    # 仅当 enableEntityManagement 设置为 true 时才需要以下四个属性
    - name: resourceGroupName
      value: "test-rg"
    - name: subscriptionID
      value: "Azure 订阅 ID 的值"
    - name: partitionCount
      value: "1"
    - name: messageRetentionInDays
    # 检查点存储属性
    # 在此情况下,我们也使用 Microsoft Entra ID 访问存储账户
    - name: storageAccountName
      value: "myeventhubstorage"
    - name: storageContainerName
      value: "myeventhubstoragecontainer"

发送和接收多条消息

Azure Event Hubs 支持使用批量发布订阅 API 在单个操作中发送和接收多条消息。

配置批量发布

要设置批量发布操作的元数据,请在 HTTP 请求或 gRPC 元数据上设置查询参数,如 API 参考中所述

元数据默认值
metadata.maxBulkPubBytes1000000

配置批量订阅

订阅主题时,您可以配置 bulkSubscribe 选项。有关更多详细信息,请参阅批量订阅消息并了解批量订阅 API

配置默认值
maxMessagesCount100
maxAwaitDurationMs10000

配置检查点频率

订阅主题时,您可以在 HTTP 或 gRPC 订阅请求中设置元数据来配置分区中的检查点频率。此元数据在分区事件序列中达到配置数量的事件后启用检查点。通过将频率设置为 0 来禁用检查点。

了解有关检查点的更多信息

元数据默认值
metadata.checkPointFrequencyPerPartition1

以下示例显示了使用 checkPointFrequencyPerPartition 元数据的声明式订阅的示例订阅文件。同样,您也可以在程序化订阅中传递元数据。

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: order-pub-sub
  metadata:
    checkPointFrequencyPerPartition: 1
scopes:
- orderprocessing
- checkout

创建 Azure Event Hub

按照文档中的说明设置 Azure Event Hubs。

由于此组件使用 Azure Storage 作为检查点存储,您还需要一个 Azure Storage 账户。按照文档中的说明管理存储账户访问密钥。

请参阅文档了解如何获取 Event Hubs 连接字符串(注意这不是 Event Hubs 命名空间的连接字符串)。

为每个订阅者创建消费者组

对于每个想要订阅事件的 Dapr 应用程序,请创建一个以 Dapr 应用程序 ID 命名的 Event Hubs 消费者组。例如,一个在 Kubernetes 上运行且 dapr.io/app-id: "myapp" 的 Dapr 应用程序将需要一个名为 myapp 的 Event Hubs 消费者组。

注意:Dapr 将消费者组的名称传递给 Event Hub,因此不需要在元数据中提供。

实体管理

当在元数据中启用实体管理时,只要应用程序具有操作 Event Hub 命名空间的正确角色和权限,Dapr 就可以自动为您创建 Event Hub 和消费者组。

Event Hub 名称是发布或订阅的传入请求中的 topic 字段,而消费者组名称是订阅给定 Event Hub 的 Dapr 应用程序的名称。例如,一个在 Kubernetes 上运行且名称为 dapr.io/app-id: "myapp" 的 Dapr 应用程序需要一个名为 myapp 的 Event Hubs 消费者组。

实体管理仅在使用 Microsoft Entra ID 身份验证时才可用,而不使用连接字符串。

Dapr 将消费者组的名称传递给 Event Hub,因此不需要在元数据中提供。

接收自定义属性

默认情况下,Dapr 不会转发自定义属性。但是,通过将订阅元数据 requireAllProperties 设置为 "true",您可以接收自定义属性作为 HTTP 头。

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: order-pub-sub
  metadata:
    requireAllProperties: "true"

可以使用 Dapr SDK 实现相同的效果:

[Topic("order-pub-sub", "orders")]
[TopicMetadata("requireAllProperties", "true")]
[HttpPost("checkout")]
public ActionResult Checkout(Order order, [FromHeader] int priority)
{
    return Ok();
}

订阅 Azure IoT Hub 事件

Azure IoT Hub 提供了一个与 Event Hubs 兼容的端点,因此 Azure Event Hubs 发布订阅组件也可用于订阅 Azure IoT Hub 事件。

由 Azure IoT Hub 设备创建的设备到云事件将包含额外的 IoT Hub 系统属性,Dapr 的 Azure Event Hubs 发布订阅组件将把以下内容作为响应元数据的一部分返回:

系统属性名称描述和路由查询关键字
iothub-connection-auth-generation-id发送消息的设备的 connectionDeviceGenerationId。请参阅 IoT Hub 设备身份属性
iothub-connection-auth-method用于对发送消息的设备进行身份验证的 connectionAuthMethod
iothub-connection-device-id发送消息的设备的 deviceId。请参阅 IoT Hub 设备身份属性
iothub-connection-module-id发送消息的设备的 moduleId。请参阅 IoT Hub 设备身份属性
iothub-enqueuedtimeIoT Hub 接收到设备到云消息的 RFC3339 格式的 enqueuedTime
message-id用户可设置的 AMQP messageId

例如,传递的 HTTP 订阅消息的头将包含:

{
  'user-agent': 'fasthttp',
  'host': '127.0.0.1:3000',
  'content-type': 'application/json',
  'content-length': '120',
  'iothub-connection-device-id': 'my-test-device',
  'iothub-connection-auth-generation-id': '637618061680407492',
  'iothub-connection-auth-method': '{"scope":"module","type":"sas","issuer":"iothub","acceptingIpFilterRule":null}',
  'iothub-connection-module-id': 'my-test-module-a',
  'iothub-enqueuedtime': '2021-07-13T22:08:09Z',
  'message-id': 'my-custom-message-id',
  'x-opt-sequence-number': '35',
  'x-opt-enqueued-time': '2021-07-13T22:08:09Z',
  'x-opt-offset': '21560',
  'traceparent': '00-4655608164bc48b985b42d39865f3834-ed6cf3697c86e7bd-01'
}

相关链接