RocketMQ

RocketMQ 发布订阅组件的详细文档

组件格式

要设置 RocketMQ 发布订阅,需创建一个类型为 pubsub.rocketmq 的组件。请参阅发布订阅代理组件文件以了解 ConsumerID 如何自动生成。阅读如何操作:发布和订阅指南了解如何创建和应用发布订阅配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: rocketmq-pubsub
spec:
  type: pubsub.rocketmq
  version: v1
  metadata:
    - name: instanceName
      value: dapr-rocketmq-test
    - name: consumerGroup
      value: dapr-rocketmq-test-g-c
    - name: producerGroup 
      value: dapr-rocketmq-test-g-p
    - name: consumerID
      value: channel1
    - name: nameSpace
      value: dapr-test
    - name: nameServer
      value: "127.0.0.1:9876,127.0.0.2:9876"
    - name: retries
      value: 3
    - name: consumerModel
      value: "clustering"
    - name: consumeOrderly
      value: false

规范元数据字段

字段必填详情默认值示例
instanceNameN实例名称time.Now().String()dapr-rocketmq-test
consumerGroupN消费者组名称。推荐。如果 producerGroupnull,则使用 groupNamedapr-rocketmq-test-g-c
producerGroup (consumerID)N生产者组名称。推荐。如果 producerGroupnull,则使用 consumerID。如果 consumerID 也为 null,则使用 groupNamedapr-rocketmq-test-g-p
consumerIDN消费者 ID(消费者标签)将一个或多个消费者组织到一个组中。具有相同消费者 ID 的消费者作为一个虚拟消费者工作;例如,一条消息仅由组中的一个消费者处理一次。如果未提供 consumerID,Dapr 运行时会将其设置为 Dapr 应用程序 ID(appID)的值。可设置为字符串值(如上面示例中的 "channel1")或字符串格式值(如 "{podName}" 等)。查看您可以在组件元数据中使用的所有模板标签。
groupNameN消费者/生产者组名称。已弃用dapr-rocketmq-test-g
nameSpaceNRocketMQ 命名空间dapr-rocketmq
nameServerDomainNRocketMQ 名称服务器域名https://my-app.net:8080/nsaddr
nameServerNRocketMQ 名称服务器,用 “,” 或 “;” 分隔127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877
accessKeyN访问密钥(用户名)"admin"
secretKeyN密钥(密码)"password"
securityTokenN安全令牌
retriesN向代理发送消息的重试次数33
producerQueueSelector (queueSelector)N生产者队列选择器。队列选择器有五种实现:hashrandommanualroundRobindaprdaprhash
consumerModelN定义消息如何传递给每个消费者客户端的消息模型。RocketMQ 支持两种消息模型:clusteringbroadcastingclusteringbroadcastingclustering
fromWhere (consumeFromWhere)N消费者启动时的消费点。有三个消费点:CONSUME_FROM_LAST_OFFSETCONSUME_FROM_FIRST_OFFSETCONSUME_FROM_TIMESTAMPCONSUME_FROM_LAST_OFFSETCONSUME_FROM_LAST_OFFSET
consumeTimestampN以秒精度回溯消费时间。时间格式为 yyyymmddhhmmss。例如,20131223171201 表示时间为 17:12:01,日期为 2013 年 12 月 23 日time.Now().Add(time.Minute * (-30)).Format("20060102150405")20131223171201
consumeOrderlyN确定是否使用 FIFO 顺序的有序消息。falsefalse
consumeMessageBatchMaxSizeN批量消费大小,超出范围 [1, 1024]51210
consumeConcurrentlyMaxSpanN并发最大跨度偏移量。这对顺序消费没有影响。范围:[1, 65535]10001000
maxReconsumeTimesN最大重新消费次数。-1 表示 16 次。如果消息在成功之前重新消费的次数超过 {@link maxReconsumeTimes},它们将被定向到删除队列。顺序消息为 MaxInt32;并发消息为 1616
autoCommitN启用自动提交truefalse
consumeTimeoutN消息可能阻塞消费线程的最长时间。时间单位:分钟1515
consumerPullTimeoutNSocket 超时时间(毫秒)
pullIntervalN消息拉取间隔100100
pullBatchSizeN一次从代理拉取的消息数量。如果 pullBatchSizenull,则使用 ConsumerBatchSizepullBatchSize 超出范围 [1, 1024]3210
pullThresholdForQueueN队列级别的流控阈值。每个消息队列默认最多缓存 1000 条消息。考虑 PullBatchSize - 瞬时值可能会超过限制。范围:[1, 65535]10241000
pullThresholdForTopicN主题级别的流控阈值。如果 pullThresholdForTopic 不受限,pullThresholdForQueue 的值将被覆盖并基于 pullThresholdForTopic 计算。例如,如果 pullThresholdForTopic 的值为 1000,并且为此消费者分配了 10 个消息队列,那么 pullThresholdForQueue 将被设置为 100。范围:[1, 6553500]-1(不限)10
pullThresholdSizeForQueueN限制队列级别的缓存消息大小。考虑 pullBatchSize - 瞬时值可能会超过限制。消息的大小仅通过消息体测量,因此不准确。范围:[1, 1024]100100
pullThresholdSizeForTopicN限制主题级别的缓存消息大小。如果 pullThresholdSizeForTopic 不受限,pullThresholdSizeForQueue 的值将被覆盖并基于 pullThresholdSizeForTopic 计算。例如,如果 pullThresholdSizeForTopic 的值为 1000 MiB,并且为此消费者分配了 10 个消息队列,那么 pullThresholdSizeForQueue 将被设置为 100 MiB。范围:[1, 102400]-1100
content-typeN消息内容类型。"text/plain""application/cloudevents+json; charset=utf-8""application/octet-stream"
logLevelN日志级别warninfo
sendTimeOutN向 RocketMQ 的代理发送消息的超时时间,以纳秒为单位测量。已弃用3 秒10000000000
sendTimeOutSecN发布消息的超时时长(秒)。如果 sendTimeOutSecnull,则使用 sendTimeOut3 秒3
mspPropertiesN此集合中的 RocketMQ 消息属性在数据分离中传递给 APP。用 “,” 分隔多个属性key,mkey

出于向后兼容的原因,元数据中支持以下值,但不鼓励使用。

字段(支持但已弃用)必填详情示例
groupNameNRocketMQ 发布者的生产者组名称"my_unique_group_name"
sendTimeOutN发布消息的超时时长(纳秒)0
consumerBatchSizeN一次从代理拉取的消息数量32

设置 RocketMQ

请参阅 https://rocketmq.apache.org/docs/quick-start/ 以设置本地 RocketMQ 实例。

每次调用元数据字段

分区键

调用 RocketMQ 发布订阅时,可以通过在请求 URL 中使用 metadata 查询参数来提供可选的分区键。

您需要在 metadata 中指定 rocketmq-tag"rocketmq-key"rocketmq-shardingkeyrocketmq-queue

示例:

curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag=?&metadata.rocketmq-key=?&metadata.rocketmq-shardingkey=key&metadata.rocketmq-queue=1 \
  -H "Content-Type: application/json" \
  -d '{
        "data": {
          "message": "Hi"
        }
      }'

QueueSelector

RocketMQ 组件总共包含五个队列选择器。RocketMQ 客户端提供以下队列选择器:

  • HashQueueSelector
  • RandomQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

要了解有关这些 RocketMQ 客户端队列选择器的更多信息,请阅读 RocketMQ 文档

Dapr RocketMQ 组件实现了以下队列选择器:

  • DaprQueueSelector

本文重点介绍 DaprQueueSelector 的设计。

DaprQueueSelector

DaprQueueSelector 集成了三个队列选择器:

  • HashQueueSelector
  • RoundRobinQueueSelector
  • ManualQueueSelector

DaprQueueSelector 从请求参数中获取队列 ID。您可以通过运行以下命令来设置队列 ID:

http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1

上述方法实现了 ManualQueueSelector

接下来,DaprQueueSelector 尝试:

  • 获取 ShardingKey
  • ShardingKey 进行哈希处理以确定队列 ID。

您可以通过以下方式设置 ShardingKey

http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key

如果 ShardingKey 不存在,则使用 RoundRobin 算法来确定队列 ID。

相关链接