Behaviour and macros for defining event-sourced state machines.
This module provides a behaviour that can be implemented by modules to define event-sourced state machines. It also includes macros to automatically generate boilerplate code for managing state, handling events, taking snapshots, and persisting events.
Usage
To use this module, define a new module and use Hume.Projection with the
desired options. Implement the required callbacks to define the initial state,
event handling logic, and snapshot management.
Options
When using Hume.Projection, you can provide the following options:
:store(required): The module implementing the event store behaviour.:use_ets: Use ETS for snapshot storage (default is false).:snapshot_every: Number of events after which to take a snapshot (default is 100).:snapshot_after: Time in milliseconds after which to take a snapshot (default is 30 seconds).:catch_up_after: Time in milliseconds after which to check for new events (default is 30 seconds).:strict_online: Whether to enforce strict event handling during online catch-up (default is true).
Callbacks
The following callbacks must be implemented by the module using Hume.Projection:
init_state/1: Initializes the state of the machine.handle_event/2: Handles an event and updates the state.last_snapshot/1: Retrieves the last snapshot of the machine.persist_snapshot/2: Persists a snapshot of the current state.
Example
defmodule MyProjection do
use Hume.Projection, use_ets: true, store: MyEventStore
@impl true
def init_state(_), do: %{}
@impl true
def handle_event({:add, key, value}, state),
do: {:ok, Map.put(state || %{}, key, value)}
@impl true
def handle_event({:remove, key}, state),
do: {:ok, Map.delete(state || %{}, key)}
endTelemetry
This module emits telemetry events for various operations, including:
[:hume, :projection, :init][:hume, :projection, :catch_up, :start][:hume, :projection, :catch_up, :stop][:hume, :projection, :snapshot][:hume, :projection, :evolve][:hume, :projection, :replay][:hume, :projection, :on_caught_up]
Shutdown
On :normal termination, the projection will automatically persist its current snapshot.
Summary
Callbacks
Handles an event and updates the state accordingly.
Initial state of the machine.
Retrieves the last snapshot of the machine.
Callback invoked when the projection has caught up with the event store.
Callback invoked when the projection process is initialized. This is optional and can be used to perform any setup required.
Persists a snapshot of the current state.
Functions
Requests the state machine to catch up by processing any new events.
Synchronously requests the state machine to catch up by processing any new events.
Evolves the state by applying a single event.
Replays a list of events starting from a given snapshot.
Retrieves the current snapshot of the state machine.
Retrieves the current state of the state machine.
Retrieves the event store module used by the projection.
Requests the state machine to take a snapshot of its current state.
Validates that the given module implements the Hume.Projection behaviour.
Types
@type macro_option() :: {:use_ets, boolean()} | {:store, module()} | {:snapshot_every, non_neg_integer()} | {:snapshot_after, non_neg_integer()} | {:catch_up_after, non_neg_integer()} | {:strict_online, boolean()}
@type offset() :: seq()
@type option() :: GenServer.option() | {:stream, stream() | [stream()]} | {:projection, projection()}
@type projection() :: term()
@type seq() :: integer()
@type state() :: term()
@type stream() :: term()
Callbacks
Handles an event and updates the state accordingly.
Parameters
- event: The event to be handled.
- state: The current state of the machine.
Returns
{:ok, new_state}if the event was handled successfully, wherenew_stateis the updated state.{:error, reason}if an error occurred while handling the event.
@callback init_state(projection()) :: state()
Initial state of the machine.
Parameters
- projection: The name or identifier of the machine.
Returns
- The initial state of the machine.
@callback last_snapshot(projection()) :: snapshot() | nil
Retrieves the last snapshot of the machine.
Returns
snapshotif a snapshot exists, wheresnapshotis a tuple containing the offset and state.nilif no snapshot exists.
@callback on_caught_up(snapshot()) :: :ok
Callback invoked when the projection has caught up with the event store.
This can be used to perform any actions needed after catching up, such as notifying other parts of the system or updating internal state.
Parameters
- snapshot: The current snapshot of the projection after catching up.
Returns
:ok
@callback on_init(projection()) :: :ok
Callback invoked when the projection process is initialized. This is optional and can be used to perform any setup required.
Parameters
- projection: The name or identifier of the machine.
Returns
:ok
@callback persist_snapshot(projection(), snapshot()) :: :ok | {:error, term()}
Persists a snapshot of the current state.
Parameters
- snapshot: A tuple containing the offset and the current state.
Returns
:okif the snapshot was taken successfully.{:error, reason}if an error occurred while taking the snapshot.
Functions
@spec catch_up(GenServer.server()) :: :ok
Requests the state machine to catch up by processing any new events.
Parameters
- server: The PID or name of the state machine process.
Returns
:okThe catch-up request has been sent.
@spec catch_up_sync(GenServer.server(), timeout()) :: snapshot()
Synchronously requests the state machine to catch up by processing any new events.
Parameters
server: The PID or name of the state machine process.timeout: The maximum time to wait for a response (default is 5000 ms).
Returns
snapshot: The current snapshot of the state machine after catching up.
Evolves the state by applying a single event.
Parameters
- mod: The module implementing the
Hume.Projectionbehaviour. - event: The event to be applied.
- snapshot: A tuple containing the current offset and state.
Returns
{:ok, snapshot}if the event is applied successfully.{:error, reason}if an error occurs during event handling or persistence.
Replays a list of events starting from a given snapshot.
Parameters
- mod: The module implementing the
Hume.Projectionbehaviour. - snapshot: A tuple containing the offset and the state to start replaying from.
- events: A ordered list of events to be replayed.
Returns
{:ok, snapshot}if all events are replayed successfully.{:error, reason}if an error occurs during event handling.
@spec snapshot(GenServer.server(), [{:timeout, timeout()} | :dirty]) :: snapshot()
Retrieves the current snapshot of the state machine.
Parameters
- machine: The PID or name of the state machine process.
- opts: Options for the call (default is an empty list).
:timeout: The maximum time to wait for a response (default is 5000 ms).:dirty: If specified, allows a dirty read of the snapshot.
Returns
snapshot: The current snapshot of the state machine.
@spec state(GenServer.server(), [{:timeout, timeout()} | :dirty]) :: state()
Retrieves the current state of the state machine.
Parameters
- machine: The PID or name of the state machine process.
- opts: Options for the call (default is an empty list).
:timeout: The maximum time to wait for a response (default is 5000 ms).:dirty: If specified, allows a dirty read of the snapshot.
Returns
state: The current state of the state machine.
Retrieves the event store module used by the projection.
Parameters
- mod: The module implementing the
Hume.Projectionbehaviour.
Returns
store: The event store module.
@spec take_snapshot(GenServer.server()) :: :ok
Requests the state machine to take a snapshot of its current state.
Parameters
- server: The PID or name of the state machine process.
Returns
:ok: The snapshot request has been sent.
@spec validate(module()) :: :ok | {:error, :invalid_module}
Validates that the given module implements the Hume.Projection behaviour.
Parameters
- mod: The module to be validated.
Returns
:okif the module implements theHume.Projectionbehaviour.{:error, reason}if the module does not implement the behaviour.