Publishing & subscribing messages with Cloudevents

Learn why Dapr uses CloudEvents, how they work in Dapr pub/sub, and how to create CloudEvents.

To enable message routing and provide additional context with each message, Dapr uses the CloudEvents 1.0 specification as its message format. Any message sent by an application to a topic using Dapr is automatically wrapped in a CloudEvents envelope, using the Content-Type header value for datacontenttype attribute.

Dapr uses CloudEvents to provide additional context to the event payload, enabling features like:

  • Tracing
  • Content-type for proper deserialization of event data
  • Verification of sender application

You can choose any of three methods for publish a CloudEvent via pub/sub:

  1. Send a pub/sub event, which is then wrapped by Dapr in a CloudEvent envelope.
  2. Replace specific CloudEvents attributes provided by Dapr by overriding the standard CloudEvent properties.
  3. Write your own CloudEvent envelope as part of the pub/sub event.

Dapr-generated CloudEvents example

Sending a publish operation to Dapr automatically wraps it in a CloudEvent envelope containing the following fields:

  • id
  • source
  • specversion
  • type
  • traceparent
  • traceid
  • tracestate
  • topic
  • pubsubname
  • time
  • datacontenttype (optional)

The following example demonstrates a CloudEvent generated by Dapr for a publish operation to the orders topic that includes:

  • A W3C traceid unique to the message
  • The data and the fields for the CloudEvent where the data content is serialized as JSON
{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data": {
    "orderId": 1
  },
  "id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
  "specversion": "1.0",
  "datacontenttype": "application/json; charset=utf-8",
  "source": "checkout",
  "type": "com.dapr.event.sent",
  "time": "2020-09-23T06:23:21Z",
  "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}

As another example of a v1.0 CloudEvent, the following shows data as XML content in a CloudEvent message serialized as JSON:

{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data" : "<note><to></to><from>user2</from><message>Order</message></note>",
  "id" : "id-1234-5678-9101",
  "specversion" : "1.0",
  "datacontenttype" : "text/xml",
  "subject" : "Test XML Message",
  "source" : "https://example.com/message",
  "type" : "xml.message",
   "time" : "2020-09-23T06:23:21Z"
}

Replace Dapr generated CloudEvents values

Dapr automatically generates several CloudEvent properties. You can replace these generated CloudEvent properties by providing the following optional metadata key/value:

  • cloudevent.id: overrides id
  • cloudevent.source: overrides source
  • cloudevent.type: overrides type
  • cloudevent.traceid: overrides traceid
  • cloudevent.tracestate: overrides tracestate
  • cloudevent.traceparent: overrides traceparent

The ability to replace CloudEvents properties using these metadata properties applies to all pub/sub components.

Example

For example, to replace the source and id values from the CloudEvent example above in code:


with DaprClient() as client:
    order = {'orderId': i}
    # Publish an event/message using Dapr PubSub
    result = client.publish_event(
        pubsub_name='order_pub_sub',
        topic_name='orders',
        publish_metadata={'cloudevent.id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317', 'cloudevent.source': 'payment'}
    )

var order = new Order(i);
using var client = new DaprClientBuilder().Build();

// Override cloudevent metadata
var metadata = new Dictionary<string,string>() {
    { "cloudevent.source", "payment" },
    { "cloudevent.id", "d99b228f-6c73-4e78-8c4d-3f80a043d317" }
}

// Publish an event/message using Dapr PubSub
await client.PublishEventAsync("order_pub_sub", "orders", order, metadata);
Console.WriteLine("Published data: " + order);

await Task.Delay(TimeSpan.FromSeconds(1));

The JSON payload then reflects the new source and id values:

{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data": {
    "orderId": 1
  },
  "id": "d99b228f-6c73-4e78-8c4d-3f80a043d317",
  "specversion": "1.0",
  "datacontenttype": "application/json; charset=utf-8",
  "source": "payment",
  "type": "com.dapr.event.sent",
  "time": "2020-09-23T06:23:21Z",
  "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}

Publish your own CloudEvent

If you want to use your own CloudEvent, make sure to specify the datacontenttype as application/cloudevents+json.

If the CloudEvent that was authored by the app does not contain the minimum required fields in the CloudEvent specification, the message is rejected. Dapr adds the following fields to the CloudEvent if they are missing:

  • time
  • traceid
  • traceparent
  • tracestate
  • topic
  • pubsubname
  • source
  • type
  • specversion

You can add additional fields to a custom CloudEvent that are not part of the official CloudEvent specification. Dapr will pass these fields as-is.

Example


Publish a CloudEvent to the orders topic:

dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{\"orderId\": \"100\"}'

Publish a CloudEvent to the orders topic:

curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'

Publish a CloudEvent to the orders topic:

Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

Event deduplication

When using cloud events created by Dapr, the envelope contains an id field which can be used by the app to perform message deduplication. Dapr does not handle deduplication automatically. Dapr supports using message brokers that natively enable message deduplication.

Next steps


Last modified September 10, 2024: rm escape (#4348) (6ee5968)