JetStream

NATS JetStream 组件的详细文档

组件格式

要设置 JetStream 发布订阅,请创建类型为 pubsub.jetstream 的组件。请参阅发布订阅代理组件文件以了解 ConsumerID 是如何自动生成的。阅读如何:发布和订阅指南以了解如何创建和应用发布订阅配置。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: jetstream-pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: jwt # 可选。用于去中心化 JWT 认证。
    value: "eyJhbGciOiJ...6yJV_adQssw5c"
  - name: seedKey # 可选。用于去中心化 JWT 认证。
    value: "SUACS34K232O...5Z3POU7BNIL4Y"
  - name: tls_client_cert # 可选。用于 TLS 客户端认证。
    value: "/path/to/tls.crt"
  - name: tls_client_key # 可选。用于 TLS 客户端认证。
    value: "/path/to/tls.key"
  - name: token # 可选。用于基于 Token 的认证。
    value: "my-token"
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"
  - name: startSequence
    value: 1
  - name: startTime # Unix 格式
    value: 1630349391
  - name: flowControl
    value: false
  - name: ackWait
    value: 10s
  - name: maxDeliver
    value: 5
  - name: backOff
    value: "50ms, 1s, 10s"
  - name: maxAckPending
    value: 5000
  - name: replicas
    value: 1
  - name: memoryStorage
    value: false
  - name: rateLimit
    value: 1024
  - name: heartbeat
    value: 15s
  - name: ackPolicy
    value: explicit
  - name: deliverPolicy
    value: all
  - name: domain
    value: hub
  - name: apiPrefix
    value: PREFIX

规范元数据字段

字段必填详情示例
natsURLYNATS 服务器地址 URL"nats://localhost:4222"
jwtNNATS 去中心化认证 JWT"eyJhbGciOiJ...6yJV_adQssw5c"
seedKeyNNATS 去中心化认证 Seed Key"SUACS34K232O...5Z3POU7BNIL4Y"
tls_client_certNNATS TLS 客户端认证证书"/path/to/tls.crt"
tls_client_keyNNATS TLS 客户端认证密钥"/path/to/tls.key"
tokenN[NATS 基于 Token 的认证]"my-token"
nameNNATS 连接名称"my-conn-name"
streamNameN要绑定的 JetStream Stream 名称"my-stream"
durableNameNDurable name"my-durable"
queueGroupNameN队列组名称"my-queue"
startSequenceNStart Sequence1
startTimeNUnix 格式的[开始时间]1630349391
flowControlN[流控]true
ackWaitNAck Wait10s
maxDeliverN[最大投递次数]15
backOffN[退避策略]"50ms, 1s, 5s, 10s"
maxAckPendingN[最大待确认消息数]5000
replicasN[副本数]3
memoryStorageN[内存存储]false
rateLimitN[速率限制]1024
heartbeatN[心跳]10s
ackPolicyN[确认策略]explicit
deliverPolicyN可选值:all、last、new、sequence、timeall
domainN[JetStream Leafondes]HUB
apiPrefixN[JetStream Leafnodes]PREFIX

创建 NATS 服务器

你可以使用 Docker 在本地运行启用了 JetStream 的 NATS 服务器:

docker run -d -p 4222:4222 nats:latest -js

然后你可以使用客户端端口与服务器交互:localhost:4222

在 Kubernetes 上安装 NATS JetStream,使用 helm

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install --set nats.jetstream.enabled=true my-nats nats/nats

这将在 default 命名空间中安装单个 NATS 服务器。要查找与 NATS 交互的服务,请使用:

kubectl get svc my-nats

有关 helm chart 设置的更多信息,请参阅 Helm chart 文档

创建 JetStream

为特定的主题创建 NATS JetStream 是必不可少的。例如,对于本地运行的 NATS 服务器,使用:

nats -s localhost:4222 stream add myStream --subjects mySubject

示例:竞争消费者模式

假设你希望每条消息仅由一个具有相同 app-id 的应用程序或 Pod 处理。通常,consumerID 元数据规范可以帮助你定义竞争消费者。

由于 NATS JetStream 不支持 consumerID,你需要指定 durableNamequeueGroupName 来实现竞争消费者模式。例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"

相关链接