Causation and Correlation Tracking
View SourceCausation tracking enables tracing event lineage through distributed systems. This guide covers how to track, query, and visualize event relationships.
Overview
The erl_esdb_causation module provides:
| Function | Purpose |
|---|---|
get_effects/2 | Find events caused by an event |
get_cause/2 | Find the event that caused this one |
get_chain/2 | Trace causation chain to root |
get_correlated/2 | Find all events in a saga |
build_graph/2 | Build visualization graph |
to_dot/1 | Export as Graphviz DOT |
Architecture
Metadata Convention
Events track lineage through standard metadata fields:
#event{
event_id = <<"evt-456">>,
event_type = <<"OrderShipped">>,
data = #{...},
metadata = #{
causation_id => <<"evt-123">>, %% Direct cause
correlation_id => <<"saga-789">>, %% Business process
actor_id => <<"user-001">> %% Who triggered this
}
}Field Definitions
| Field | Type | Purpose |
|---|---|---|
causation_id | binary | Event ID that directly caused this event |
correlation_id | binary | Groups events in a saga/process |
actor_id | binary | Identity that triggered the event |
Causation Queries
Finding Effects
Get all events caused by a specific event:
{ok, Effects} = erl_esdb_causation:get_effects(my_store, <<"evt-123">>),
%% Returns all events where metadata.causation_id = "evt-123"Finding Cause
Get the event that caused this one:
{ok, CauseEvent} = erl_esdb_causation:get_cause(my_store, <<"evt-456">>),
%% Looks up event with ID matching this event's causation_idTracing the Chain
Walk backward from an event to its root cause:
{ok, Chain} = erl_esdb_causation:get_chain(my_store, <<"evt-789">>),
%% Returns [RootEvent, ..., IntermediateEvents, ..., TargetEvent]
%% Ordered from root to targetFinding Correlated Events
Get all events in a saga or business process:
{ok, SagaEvents} = erl_esdb_causation:get_correlated(my_store, <<"saga-001">>),
%% Returns all events where metadata.correlation_id = "saga-001"
%% Sorted by epoch_us (temporal order)Graph Building
Build Causation Graph
%% Build from a specific event
{ok, Graph} = erl_esdb_causation:build_graph(my_store, <<"evt-123">>),
%% Or build from a correlation ID
{ok, Graph} = erl_esdb_causation:build_graph(my_store, <<"saga-001">>),
%% Graph structure:
%% #{
%% nodes => [Event1, Event2, ...],
%% edges => [{CauseId, EffectId}, ...],
%% root => RootEventId | undefined
%% }Export to Graphviz
Generate DOT format for visualization:
{ok, Graph} = erl_esdb_causation:build_graph(my_store, <<"saga-001">>),
DotBinary = erl_esdb_causation:to_dot(Graph),
%% Write to file
file:write_file("causation.dot", DotBinary),
%% Render with graphviz:
%% dot -Tpng -o causation.png causation.dot
%% dot -Tsvg -o causation.svg causation.dotHow It Works
Storage (No Index)
Currently, causation queries scan all streams:
scan_for_metadata(StoreId, Field, Value) ->
{ok, StreamIds} = erl_esdb_streams:list_streams(StoreId),
Events = lists:foldl(
fun(StreamId, Acc) ->
MatchingEvents = scan_stream_for_metadata(StoreId, StreamId, Field, Value),
Acc ++ MatchingEvents
end,
[],
StreamIds
),
{ok, lists:sort(fun(A, B) -> A#event.epoch_us =< B#event.epoch_us end, Events)}.Performance Note: This is O(n) where n = total events across all streams. For production systems with many events, consider adding secondary indexes.
Chain Building
Chains are built by walking backward through causation links:
build_chain_backward(StoreId, Event, Acc) ->
case maps:get(causation_id, Event#event.metadata, undefined) of
undefined ->
Acc; %% Reached root
CausationId ->
case find_event_by_id(StoreId, CausationId) of
{ok, CauseEvent} ->
build_chain_backward(StoreId, CauseEvent, [CauseEvent | Acc]);
{error, _} ->
Acc %% Chain broken (event deleted?)
end
end.Use Cases
1. Debugging Distributed Flows
%% An order failed - trace back to root cause
{ok, Chain} = erl_esdb_causation:get_chain(my_store, <<"order-failed-evt">>),
lists:foreach(
fun(Event) ->
io:format("~s: ~s (~s)~n", [
Event#event.epoch_us,
Event#event.event_type,
Event#event.stream_id
])
end,
Chain
).2. Saga Visualization
%% Visualize a checkout saga
{ok, Graph} = erl_esdb_causation:build_graph(my_store, <<"checkout-saga-123">>),
Dot = erl_esdb_causation:to_dot(Graph),
file:write_file("/tmp/checkout.dot", Dot),
os:cmd("dot -Tsvg -o /tmp/checkout.svg /tmp/checkout.dot").3. Audit Trail
%% Who/what triggered this sensitive operation?
{ok, Event} = find_event(my_store, <<"sensitive-action-evt">>),
{ok, Chain} = erl_esdb_causation:get_chain(my_store, Event#event.event_id),
RootEvent = hd(Chain),
ActorId = maps:get(actor_id, RootEvent#event.metadata, <<"unknown">>),
io:format("Sensitive action originated from actor: ~s~n", [ActorId]).4. Impact Analysis
%% What happened because of this event?
{ok, Effects} = erl_esdb_causation:get_effects(my_store, <<"payment-received-evt">>),
io:format("Payment triggered ~p downstream events~n", [length(Effects)]).Setting Causation IDs
When appending events, include causation metadata:
%% In a command handler
handle_command(Command, State, CausationContext) ->
Event = #{
event_type => <<"OrderPlaced">>,
data => #{order_id => Command#cmd.order_id, items => Command#cmd.items},
metadata => #{
causation_id => CausationContext#ctx.event_id,
correlation_id => CausationContext#ctx.correlation_id,
actor_id => CausationContext#ctx.user_id
}
},
{ok, _} = erl_esdb_streams:append(Store, Stream, ExpectedVersion, [Event]).Telemetry
Causation queries emit telemetry:
%% Event: [erl_esdb, causation, query]
%% Measurements:
%% #{duration => integer(), event_count => integer()}
%% Metadata:
%% #{store_id => atom(), id => binary(), query_type => atom()}
%% Query types: causation_effects, causation_cause, causation_chain, causation_correlatedDOT Format Example
Generated DOT output:
digraph causation {
"evt-001" [label="evt-001\nCommandReceived"];
"evt-002" [label="evt-002\nOrderCreated"];
"evt-003" [label="evt-003\nPaymentRequested"];
"evt-004" [label="evt-004\nPaymentReceived"];
"evt-005" [label="evt-005\nOrderShipped"];
"evt-001" -> "evt-002";
"evt-002" -> "evt-003";
"evt-003" -> "evt-004";
"evt-004" -> "evt-005";
}Rendered as:
CommandReceived → OrderCreated → PaymentRequested → PaymentReceived → OrderShippedError Handling
| Error | Cause | Resolution |
|---|---|---|
{error, not_found} | Event ID does not exist | Verify event ID |
{error, no_cause} | Event has no causation_id | This is a root event |
Best Practices
1. Always Set Correlation ID
%% Generate at saga/process start, propagate through all events
CorrelationId = uuid:uuid_to_string(uuid:get_v4()),
%% Use this same ID for all events in the business process2. Preserve Causation Context
%% When handling an event that triggers new events
handle_event(SourceEvent, State) ->
NewEventMetadata = #{
causation_id => SourceEvent#event.event_id,
correlation_id => maps:get(correlation_id, SourceEvent#event.metadata)
},
...3. Include Actor ID at Entry Points
%% At API/command entry points
Metadata = #{
correlation_id => new_correlation_id(),
actor_id => RequestContext#ctx.authenticated_user
}.Future Enhancements
Potential improvements for production scale:
- Secondary Index: Add Khepri index on
causation_idandcorrelation_id - Materialized Graph: Pre-compute causation graph per correlation
- Streaming Query: Process events in batches to reduce memory
- Time-Bounded Search: Limit search to recent time window
See Also
- Stream Links - Derived streams from causation patterns
- Temporal Queries - Time-based event retrieval
- Storage Internals - Khepri path structure