evoq_aggregate behaviour (evoq v1.14.1)

View Source

Aggregate behavior and GenServer implementation.

Aggregates are the consistency boundary in event sourcing. Each aggregate: - Has a unique stream ID - Declares a state module via state_module/0 - Processes commands via execute/2 callback - Applies events via apply/2 callback (typically delegates to state module) - Supports snapshots for fast recovery - Has configurable lifespan (TTL, hibernate, passivate)

Callbacks

Required: - state_module() -> module() - init(AggregateId) -> {ok, State} - execute(State, Command) -> {ok, [Event]} | {error, Reason} - apply(State, Event) -> NewState

Optional: - snapshot(State) -> SnapshotData - from_snapshot(SnapshotData) -> State

Summary

Functions

Execute a command against an aggregate.

Execute a command and return the post-event aggregate state.

Get the current state of an aggregate (for debugging).

Get the current version of an aggregate.

Start an aggregate process (uses env store_id).

Start an aggregate process with explicit store_id.

Callbacks

apply/2

-callback apply(State :: term(), Event :: map()) -> NewState :: term().

execute/2

-callback execute(State :: term(), Command :: map()) -> {ok, [Event :: map()]} | {error, Reason :: term()}.

from_snapshot/1

(optional)
-callback from_snapshot(SnapshotData :: term()) -> State :: term().

init/1

-callback init(AggregateId :: binary()) -> {ok, State :: term()}.

snapshot/1

(optional)
-callback snapshot(State :: term()) -> SnapshotData :: term().

state_module/0

-callback state_module() -> module().

Functions

execute_command(Pid, Evoq_command)

-spec execute_command(pid(),
                      #evoq_command{command_id :: binary() | undefined,
                                    command_type :: atom() | undefined,
                                    aggregate_type :: atom() | undefined,
                                    aggregate_id :: binary() | undefined,
                                    payload :: map(),
                                    metadata :: map(),
                                    causation_id :: binary() | undefined,
                                    correlation_id :: binary() | undefined,
                                    idempotency_key :: binary() | undefined}) ->
                         {ok, non_neg_integer(), [map()]} | {error, term()}.

Execute a command against an aggregate.

execute_command_with_state(Pid, Evoq_command)

-spec execute_command_with_state(pid(),
                                 #evoq_command{command_id :: binary() | undefined,
                                               command_type :: atom() | undefined,
                                               aggregate_type :: atom() | undefined,
                                               aggregate_id :: binary() | undefined,
                                               payload :: map(),
                                               metadata :: map(),
                                               causation_id :: binary() | undefined,
                                               correlation_id :: binary() | undefined,
                                               idempotency_key :: binary() | undefined}) ->
                                    {ok, non_neg_integer(), [map()], term()} | {error, term()}.

Execute a command and return the post-event aggregate state.

Like execute_command/2 but includes the aggregate state after applying all new events. Enables session-level consistency where the caller receives immediate truth about the resulting state.

get_state(Pid)

-spec get_state(pid()) -> {ok, term()}.

Get the current state of an aggregate (for debugging).

get_version(Pid)

-spec get_version(pid()) -> {ok, non_neg_integer()}.

Get the current version of an aggregate.

start_link(AggregateModule, AggregateId)

This function is deprecated. Use start_link/3 with explicit store_id instead..
-spec start_link(atom(), binary()) -> {ok, pid()} | {error, term()}.

Start an aggregate process (uses env store_id).

start_link(AggregateModule, AggregateId, StoreId)

-spec start_link(atom(), binary(), atom()) -> {ok, pid()} | {error, term()}.

Start an aggregate process with explicit store_id.