NATS Streaming

Detailed documentation on the NATS Streaming pubsub component

⚠️ Deprecation notice

Component format

To set up NATS Streaming pub/sub, create a component of type pubsub.natsstreaming. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: natsstreaming-pubsub
spec:
  type: pubsub.natsstreaming
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: natsStreamingClusterID
    value: "clusterId"
  - name: concurrencyMode
    value: parallel
  - name: consumerID # Optional. If not supplied, runtime will create one.
    value: "channel1" 
    # below are subscription configuration.
  - name: subscriptionType
    value: <REPLACE-WITH-SUBSCRIPTION-TYPE> # Required. Allowed values: topic, queue.
  - name: ackWaitTime
    value: "" # Optional.
  - name: maxInFlight
    value: "" # Optional.
  - name: durableSubscriptionName
    value: "" # Optional.
  # following subscription options - only one can be used
  - name: deliverNew
    value: <bool>
  - name: startAtSequence
    value: 1
  - name: startWithLastReceived
    value: false
  - name: deliverAll
    value: false
  - name: startAtTimeDelta
    value: ""
  - name: startAtTime
    value: ""
  - name: startAtTimeFormat
    value: ""

Spec metadata fields

Field Required Details Example
natsURL Y NATS server address URL nats://localhost:4222
natsStreamingClusterID Y NATS cluster ID "clusterId"
subscriptionType Y Subscription type. Allowed values "topic", "queue" "topic"
consumerID N Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID) value. "channel1"
ackWaitTime N See here "300ms"
maxInFlight N See here "25"
durableSubscriptionName N Durable subscriptions identification name. "my-durable"
deliverNew N Subscription Options. Only one can be used. Deliver new messages only "true", "false"
startAtSequence N Subscription Options. Only one can be used. Sets the desired start sequence position and state "100000", "230420"
startWithLastReceived N Subscription Options. Only one can be used. Sets the start position to last received. "true", "false"
deliverAll N Subscription Options. Only one can be used. Deliver all available messages "true", "false"
startAtTimeDelta N Subscription Options. Only one can be used. Sets the desired start time position and state using the delta "10m", "23s"
startAtTime N Subscription Options. Only one can be used. Sets the desired start time position and state "Feb 3, 2013 at 7:54pm (PST)"
startAtTimeFormat N Must be used with startAtTime. Sets the format for the time "Jan 2, 2006 at 3:04pm (MST)"
concurrencyMode N Call the subscriber sequentially (“single” message at a time), or concurrently (in “parallel”). Default: "parallel" "single", "parallel"

Create a NATS server


Run a NATS server locally using Docker:

docker run -d --name nats-streaming -p 4222:4222 -p 8222:8222 nats-streaming

Interact with the server using the client port: localhost:4222.


Install NATS on Kubernetes by using the kubectl:

# Single server NATS

kubectl apply -f https://raw.githubusercontent.com/nats-io/k8s/master/nats-server/single-server-nats.yml

kubectl apply -f https://raw.githubusercontent.com/nats-io/k8s/master/nats-streaming-server/single-server-stan.yml

This installs a single NATS-Streaming and NATS into the default namespace. To interact with NATS, find the service with:

kubectl get svc stan

For example, if installing using the example above, the NATS Streaming address would be:

<YOUR-HOST>:4222