Correlation IDs for Event Tracing in AsyncAPI
When an order event fans out into payment, inventory, and shipping events, a single shared identifier is what lets you stitch the whole flow back together in a tracing backend. AsyncAPI 3.0 expresses this with a correlationId object whose location is a runtime expression such as $message.header#/correlationId. This guide defines the header schema, the correlationId object, and the propagation rules that keep a trace unbroken across services. It is part of the AsyncAPI event-driven patterns section and the broader OpenAPI & AsyncAPI Schema Authoring guide.
Problem & Context
Distributed tracing needs one stable key that travels with a logical transaction across every hop. In synchronous HTTP you get this from W3C traceparent headers almost for free. In event-driven systems there is no call stack — a producer fires a message and forgets it, and a consumer that emits a downstream event has no implicit link back to the original cause unless it copies an identifier forward.
AsyncAPI 3.0 documents that identifier with a correlationId object on the message. It has two parts:
description— human guidance on what the ID means.location— a runtime expression that says exactly where the value lives at runtime, e.g.$message.header#/correlationIdfor a header or$message.payload#/metadata/traceIdfor a body field.
Crucially, AsyncAPI only documents the location. It does not create, inject, or validate the value at runtime — that is the job of your services. The contract’s value is that every team reads the same definition and propagates the same field. This is the missing-correlation-IDs pitfall called out in AsyncAPI vs OpenAPI for event-driven architectures, solved properly.
Step-by-Step Solution
1. Install the AsyncAPI CLI
npm install -g @asyncapi/cli
asyncapi --version
Expected output (version line will vary):
@asyncapi/cli/2.16.0 linux-x64 node-v20.11.1
2. Add a header schema
Give the correlation ID a typed home. Define a headers object on the message with a correlationId property. Reuse the schema component as covered in defining JSON Schema components.
components:
schemas:
EventHeaders:
type: object
required: [correlationId]
properties:
correlationId:
type: string
format: uuid
description: Shared trace identifier for the logical transaction.
3. Declare a reusable correlationId object
Put the correlationId in components.correlationIds so several messages can share one definition. The location is a runtime expression: $message.header#/correlationId. The fragment after # is a JSON Pointer into the headers object.
components:
correlationIds:
traceId:
description: Correlation ID carried in the message header.
location: $message.header#/correlationId
4. Reference it from the message
Wire the message to both the header schema and the correlation ID object.
components:
messages:
OrderCreated:
name: OrderCreated
contentType: application/json
headers:
$ref: '#/components/schemas/EventHeaders'
correlationId:
$ref: '#/components/correlationIds/traceId'
payload:
$ref: '#/components/schemas/OrderCreatedPayload'
5. Propagate the header across services
The contract is only as good as the code that honors it. Each consumer must read the incoming correlationId and set the same value on every event it emits. Example with the standard kafkajs client:
// consumer that re-emits a downstream event
await consumer.run({
eachMessage: async ({ message }) => {
const correlationId =
message.headers?.correlationId?.toString() ?? crypto.randomUUID();
await producer.send({
topic: 'payments.authorized',
messages: [{
key: message.key,
// copy the SAME id forward — do not mint a new one here
headers: { correlationId },
value: JSON.stringify(buildPaymentEvent(message)),
}],
});
},
});
Only the first producer in a flow mints a new ID; every downstream service copies it.
6. Validate the document
asyncapi validate asyncapi.yaml
Expected output:
File asyncapi.yaml is valid! File asyncapi.yaml and referenced documents don't have governance problems.
Complete Working Example
A self-contained asyncapi.yaml with a typed header, a reusable correlationId using a runtime expression, and a message that references both.
asyncapi: 3.0.0
info:
title: Orders Service
version: 1.0.0
description: Emits order events carrying a propagated correlation ID.
servers:
production:
host: kafka.internal:9092
protocol: kafka
channels:
ordersCreated:
address: orders.created
messages:
OrderCreated:
$ref: '#/components/messages/OrderCreated'
operations:
publishOrderCreated:
action: send
channel:
$ref: '#/channels/ordersCreated'
messages:
- $ref: '#/channels/ordersCreated/messages/OrderCreated'
components:
correlationIds:
traceId:
description: Correlation ID carried in the message header.
location: $message.header#/correlationId
messages:
OrderCreated:
name: OrderCreated
title: Order created event
contentType: application/json
headers:
$ref: '#/components/schemas/EventHeaders'
correlationId:
$ref: '#/components/correlationIds/traceId'
payload:
$ref: '#/components/schemas/OrderCreatedPayload'
schemas:
EventHeaders:
type: object
required: [correlationId]
properties:
correlationId:
type: string
format: uuid
description: Shared trace identifier for the logical transaction.
OrderCreatedPayload:
type: object
required: [orderId, customerId, createdAt]
properties:
orderId:
type: string
format: uuid
customerId:
type: string
format: uuid
createdAt:
type: string
format: date-time
Run asyncapi validate asyncapi.yaml to confirm the runtime expression resolves before committing.
Gotchas & Edge Cases
The runtime expression must point at something that exists. $message.header#/correlationId only resolves if the headers schema actually declares correlationId. A typo in the JSON Pointer (for example #/correlation_id) validates structurally but means nothing at runtime, so trace stitching silently fails. Keep the pointer and the header property name identical.
Payload-located IDs survive worse than header-located ones. $message.payload#/... ties the trace key to your body schema, so a payload refactor can move or rename it and break tracing. Prefer the header location; middleware, broker interceptors, and gateways can read and forward a header without deserializing the body.
A new ID per hop destroys the trace. The most common runtime bug is each service minting a fresh correlationId instead of copying the inbound one. Only the originating producer generates a value; everyone downstream propagates it. Add a service-level test asserting the outbound ID equals the inbound ID.
FAQ
What runtime expression do I use for a correlationId in a header?
Use $message.header#/correlationId, where the part after the hash is a JSON Pointer into the headers object. Use $message.payload#/... instead when the trace key lives in the body.
Does AsyncAPI generate or inject the correlation ID at runtime?
No. The correlationId object only documents where the value lives. Your producer must set the value and every consumer must copy it onto outgoing events to keep the trace intact.
Should the correlation ID go in the header or the payload?
Prefer the header so brokers and middleware can read it without parsing the body, and so it survives schema changes. Use a payload field only when the broker cannot carry custom headers.
Related
- AsyncAPI event-driven patterns — parent guide for channels, operations, and message metadata.
- OpenAPI & AsyncAPI Schema Authoring — the full contract-authoring guide.
- Modeling Kafka topics in AsyncAPI 3.0 — channels, bindings, and keys for Kafka.
- AsyncAPI vs OpenAPI for event-driven architectures — protocol boundaries and CI validation.
- Defining JSON Schema components — reuse the header schema across messages.