Correlation IDs for Event Tracing in AsyncAPI

When an order event fans out into payment, inventory, and shipping events, a single shared identifier is what lets you stitch the whole flow back together in a tracing backend. AsyncAPI 3.0 expresses this with a correlationId object whose location is a runtime expression such as $message.header#/correlationId. This guide defines the header schema, the correlationId object, and the propagation rules that keep a trace unbroken across services. It is part of the AsyncAPI event-driven patterns section and the broader OpenAPI & AsyncAPI Schema Authoring guide.

Correlation ID propagation across services An order service sets a correlation ID; payment and shipping services copy the same ID onto their own events so a tracing backend links all of them. Order service sets correlationId Payment service copies correlationId Shipping service copies correlationId Tracing backend groups events by correlationId

Problem & Context

Distributed tracing needs one stable key that travels with a logical transaction across every hop. In synchronous HTTP you get this from W3C traceparent headers almost for free. In event-driven systems there is no call stack — a producer fires a message and forgets it, and a consumer that emits a downstream event has no implicit link back to the original cause unless it copies an identifier forward.

AsyncAPI 3.0 documents that identifier with a correlationId object on the message. It has two parts:

  • description — human guidance on what the ID means.
  • location — a runtime expression that says exactly where the value lives at runtime, e.g. $message.header#/correlationId for a header or $message.payload#/metadata/traceId for a body field.

Crucially, AsyncAPI only documents the location. It does not create, inject, or validate the value at runtime — that is the job of your services. The contract’s value is that every team reads the same definition and propagates the same field. This is the missing-correlation-IDs pitfall called out in AsyncAPI vs OpenAPI for event-driven architectures, solved properly.

Step-by-Step Solution

1. Install the AsyncAPI CLI

npm install -g @asyncapi/cli
asyncapi --version

Expected output (version line will vary):

@asyncapi/cli/2.16.0 linux-x64 node-v20.11.1

2. Add a header schema

Give the correlation ID a typed home. Define a headers object on the message with a correlationId property. Reuse the schema component as covered in defining JSON Schema components.

components:
  schemas:
    EventHeaders:
      type: object
      required: [correlationId]
      properties:
        correlationId:
          type: string
          format: uuid
          description: Shared trace identifier for the logical transaction.

3. Declare a reusable correlationId object

Put the correlationId in components.correlationIds so several messages can share one definition. The location is a runtime expression: $message.header#/correlationId. The fragment after # is a JSON Pointer into the headers object.

components:
  correlationIds:
    traceId:
      description: Correlation ID carried in the message header.
      location: $message.header#/correlationId

4. Reference it from the message

Wire the message to both the header schema and the correlation ID object.

components:
  messages:
    OrderCreated:
      name: OrderCreated
      contentType: application/json
      headers:
        $ref: '#/components/schemas/EventHeaders'
      correlationId:
        $ref: '#/components/correlationIds/traceId'
      payload:
        $ref: '#/components/schemas/OrderCreatedPayload'

5. Propagate the header across services

The contract is only as good as the code that honors it. Each consumer must read the incoming correlationId and set the same value on every event it emits. Example with the standard kafkajs client:

// consumer that re-emits a downstream event
await consumer.run({
  eachMessage: async ({ message }) => {
    const correlationId =
      message.headers?.correlationId?.toString() ?? crypto.randomUUID();

    await producer.send({
      topic: 'payments.authorized',
      messages: [{
        key: message.key,
        // copy the SAME id forward — do not mint a new one here
        headers: { correlationId },
        value: JSON.stringify(buildPaymentEvent(message)),
      }],
    });
  },
});

Only the first producer in a flow mints a new ID; every downstream service copies it.

6. Validate the document

asyncapi validate asyncapi.yaml

Expected output:

File asyncapi.yaml is valid! File asyncapi.yaml and referenced documents don't have governance problems.

Complete Working Example

A self-contained asyncapi.yaml with a typed header, a reusable correlationId using a runtime expression, and a message that references both.

asyncapi: 3.0.0
info:
  title: Orders Service
  version: 1.0.0
  description: Emits order events carrying a propagated correlation ID.

servers:
  production:
    host: kafka.internal:9092
    protocol: kafka

channels:
  ordersCreated:
    address: orders.created
    messages:
      OrderCreated:
        $ref: '#/components/messages/OrderCreated'

operations:
  publishOrderCreated:
    action: send
    channel:
      $ref: '#/channels/ordersCreated'
    messages:
      - $ref: '#/channels/ordersCreated/messages/OrderCreated'

components:
  correlationIds:
    traceId:
      description: Correlation ID carried in the message header.
      location: $message.header#/correlationId

  messages:
    OrderCreated:
      name: OrderCreated
      title: Order created event
      contentType: application/json
      headers:
        $ref: '#/components/schemas/EventHeaders'
      correlationId:
        $ref: '#/components/correlationIds/traceId'
      payload:
        $ref: '#/components/schemas/OrderCreatedPayload'

  schemas:
    EventHeaders:
      type: object
      required: [correlationId]
      properties:
        correlationId:
          type: string
          format: uuid
          description: Shared trace identifier for the logical transaction.

    OrderCreatedPayload:
      type: object
      required: [orderId, customerId, createdAt]
      properties:
        orderId:
          type: string
          format: uuid
        customerId:
          type: string
          format: uuid
        createdAt:
          type: string
          format: date-time

Run asyncapi validate asyncapi.yaml to confirm the runtime expression resolves before committing.

Gotchas & Edge Cases

The runtime expression must point at something that exists. $message.header#/correlationId only resolves if the headers schema actually declares correlationId. A typo in the JSON Pointer (for example #/correlation_id) validates structurally but means nothing at runtime, so trace stitching silently fails. Keep the pointer and the header property name identical.

Payload-located IDs survive worse than header-located ones. $message.payload#/... ties the trace key to your body schema, so a payload refactor can move or rename it and break tracing. Prefer the header location; middleware, broker interceptors, and gateways can read and forward a header without deserializing the body.

A new ID per hop destroys the trace. The most common runtime bug is each service minting a fresh correlationId instead of copying the inbound one. Only the originating producer generates a value; everyone downstream propagates it. Add a service-level test asserting the outbound ID equals the inbound ID.

FAQ

What runtime expression do I use for a correlationId in a header?

Use $message.header#/correlationId, where the part after the hash is a JSON Pointer into the headers object. Use $message.payload#/... instead when the trace key lives in the body.

Does AsyncAPI generate or inject the correlation ID at runtime?

No. The correlationId object only documents where the value lives. Your producer must set the value and every consumer must copy it onto outgoing events to keep the trace intact.

Should the correlation ID go in the header or the payload?

Prefer the header so brokers and middleware can read it without parsing the body, and so it survives schema changes. Use a payload field only when the broker cannot carry custom headers.