Causation and Correlation Tracking

View Source

Causation tracking enables tracing event lineage through distributed systems. This guide covers how to track, query, and visualize event relationships.

Overview

The erl_esdb_causation module provides:

FunctionPurpose
get_effects/2Find events caused by an event
get_cause/2Find the event that caused this one
get_chain/2Trace causation chain to root
get_correlated/2Find all events in a saga
build_graph/2Build visualization graph
to_dot/1Export as Graphviz DOT

Architecture

Causation Graph

Metadata Convention

Events track lineage through standard metadata fields:

#event{
    event_id = <<"evt-456">>,
    event_type = <<"OrderShipped">>,
    data = #{...},
    metadata = #{
        causation_id => <<"evt-123">>,     %% Direct cause
        correlation_id => <<"saga-789">>,  %% Business process
        actor_id => <<"user-001">>         %% Who triggered this
    }
}

Field Definitions

FieldTypePurpose
causation_idbinaryEvent ID that directly caused this event
correlation_idbinaryGroups events in a saga/process
actor_idbinaryIdentity that triggered the event

Causation Queries

Finding Effects

Get all events caused by a specific event:

{ok, Effects} = erl_esdb_causation:get_effects(my_store, <<"evt-123">>),
%% Returns all events where metadata.causation_id = "evt-123"

Finding Cause

Get the event that caused this one:

{ok, CauseEvent} = erl_esdb_causation:get_cause(my_store, <<"evt-456">>),
%% Looks up event with ID matching this event's causation_id

Tracing the Chain

Walk backward from an event to its root cause:

{ok, Chain} = erl_esdb_causation:get_chain(my_store, <<"evt-789">>),
%% Returns [RootEvent, ..., IntermediateEvents, ..., TargetEvent]
%% Ordered from root to target

Finding Correlated Events

Get all events in a saga or business process:

{ok, SagaEvents} = erl_esdb_causation:get_correlated(my_store, <<"saga-001">>),
%% Returns all events where metadata.correlation_id = "saga-001"
%% Sorted by epoch_us (temporal order)

Graph Building

Build Causation Graph

%% Build from a specific event
{ok, Graph} = erl_esdb_causation:build_graph(my_store, <<"evt-123">>),

%% Or build from a correlation ID
{ok, Graph} = erl_esdb_causation:build_graph(my_store, <<"saga-001">>),

%% Graph structure:
%% #{
%%     nodes => [Event1, Event2, ...],
%%     edges => [{CauseId, EffectId}, ...],
%%     root => RootEventId | undefined
%% }

Export to Graphviz

Generate DOT format for visualization:

{ok, Graph} = erl_esdb_causation:build_graph(my_store, <<"saga-001">>),
DotBinary = erl_esdb_causation:to_dot(Graph),

%% Write to file
file:write_file("causation.dot", DotBinary),

%% Render with graphviz:
%% dot -Tpng -o causation.png causation.dot
%% dot -Tsvg -o causation.svg causation.dot

How It Works

Storage (No Index)

Currently, causation queries scan all streams:

scan_for_metadata(StoreId, Field, Value) ->
    {ok, StreamIds} = erl_esdb_streams:list_streams(StoreId),
    Events = lists:foldl(
        fun(StreamId, Acc) ->
            MatchingEvents = scan_stream_for_metadata(StoreId, StreamId, Field, Value),
            Acc ++ MatchingEvents
        end,
        [],
        StreamIds
    ),
    {ok, lists:sort(fun(A, B) -> A#event.epoch_us =< B#event.epoch_us end, Events)}.

Performance Note: This is O(n) where n = total events across all streams. For production systems with many events, consider adding secondary indexes.

Chain Building

Chains are built by walking backward through causation links:

build_chain_backward(StoreId, Event, Acc) ->
    case maps:get(causation_id, Event#event.metadata, undefined) of
        undefined ->
            Acc;  %% Reached root
        CausationId ->
            case find_event_by_id(StoreId, CausationId) of
                {ok, CauseEvent} ->
                    build_chain_backward(StoreId, CauseEvent, [CauseEvent | Acc]);
                {error, _} ->
                    Acc  %% Chain broken (event deleted?)
            end
    end.

Use Cases

1. Debugging Distributed Flows

%% An order failed - trace back to root cause
{ok, Chain} = erl_esdb_causation:get_chain(my_store, <<"order-failed-evt">>),
lists:foreach(
    fun(Event) ->
        io:format("~s: ~s (~s)~n", [
            Event#event.epoch_us,
            Event#event.event_type,
            Event#event.stream_id
        ])
    end,
    Chain
).

2. Saga Visualization

%% Visualize a checkout saga
{ok, Graph} = erl_esdb_causation:build_graph(my_store, <<"checkout-saga-123">>),
Dot = erl_esdb_causation:to_dot(Graph),
file:write_file("/tmp/checkout.dot", Dot),
os:cmd("dot -Tsvg -o /tmp/checkout.svg /tmp/checkout.dot").

3. Audit Trail

%% Who/what triggered this sensitive operation?
{ok, Event} = find_event(my_store, <<"sensitive-action-evt">>),
{ok, Chain} = erl_esdb_causation:get_chain(my_store, Event#event.event_id),
RootEvent = hd(Chain),
ActorId = maps:get(actor_id, RootEvent#event.metadata, <<"unknown">>),
io:format("Sensitive action originated from actor: ~s~n", [ActorId]).

4. Impact Analysis

%% What happened because of this event?
{ok, Effects} = erl_esdb_causation:get_effects(my_store, <<"payment-received-evt">>),
io:format("Payment triggered ~p downstream events~n", [length(Effects)]).

Setting Causation IDs

When appending events, include causation metadata:

%% In a command handler
handle_command(Command, State, CausationContext) ->
    Event = #{
        event_type => <<"OrderPlaced">>,
        data => #{order_id => Command#cmd.order_id, items => Command#cmd.items},
        metadata => #{
            causation_id => CausationContext#ctx.event_id,
            correlation_id => CausationContext#ctx.correlation_id,
            actor_id => CausationContext#ctx.user_id
        }
    },
    {ok, _} = erl_esdb_streams:append(Store, Stream, ExpectedVersion, [Event]).

Telemetry

Causation queries emit telemetry:

%% Event: [erl_esdb, causation, query]
%% Measurements:
%%   #{duration => integer(), event_count => integer()}
%% Metadata:
%%   #{store_id => atom(), id => binary(), query_type => atom()}

%% Query types: causation_effects, causation_cause, causation_chain, causation_correlated

DOT Format Example

Generated DOT output:

digraph causation {
  "evt-001" [label="evt-001\nCommandReceived"];
  "evt-002" [label="evt-002\nOrderCreated"];
  "evt-003" [label="evt-003\nPaymentRequested"];
  "evt-004" [label="evt-004\nPaymentReceived"];
  "evt-005" [label="evt-005\nOrderShipped"];
  "evt-001" -> "evt-002";
  "evt-002" -> "evt-003";
  "evt-003" -> "evt-004";
  "evt-004" -> "evt-005";
}

Rendered as:

CommandReceived  OrderCreated  PaymentRequested  PaymentReceived  OrderShipped

Error Handling

ErrorCauseResolution
{error, not_found}Event ID does not existVerify event ID
{error, no_cause}Event has no causation_idThis is a root event

Best Practices

1. Always Set Correlation ID

%% Generate at saga/process start, propagate through all events
CorrelationId = uuid:uuid_to_string(uuid:get_v4()),
%% Use this same ID for all events in the business process

2. Preserve Causation Context

%% When handling an event that triggers new events
handle_event(SourceEvent, State) ->
    NewEventMetadata = #{
        causation_id => SourceEvent#event.event_id,
        correlation_id => maps:get(correlation_id, SourceEvent#event.metadata)
    },
    ...

3. Include Actor ID at Entry Points

%% At API/command entry points
Metadata = #{
    correlation_id => new_correlation_id(),
    actor_id => RequestContext#ctx.authenticated_user
}.

Future Enhancements

Potential improvements for production scale:

  1. Secondary Index: Add Khepri index on causation_id and correlation_id
  2. Materialized Graph: Pre-compute causation graph per correlation
  3. Streaming Query: Process events in batches to reduce memory
  4. Time-Bounded Search: Limit search to recent time window

See Also