evoq_projection behaviour (evoq v1.14.1)

View Source

Projection behavior for evoq.

Projections transform events into read model updates. They subscribe to event types and maintain query-optimized views.

Design Principles

- Projections do all calculations (events -> read model) - Read models are simple key-value lookups (no joins) - Projections are idempotent (can be replayed safely) - Checkpoints track progress for resume after restart

Callbacks

Required: - interested_in() -> [binary()] Event types this projection handles

- init(Config) -> {ok, State, ReadModel} Initialize projection with read model

- project(Event, Metadata, State, ReadModel) -> {ok, NewState, NewReadModel} | {error, Reason} Transform event into read model updates

Optional: - on_error(Error, Event, FailureContext, State) -> error_action() Handle projection errors

Example

  -module(order_summary_projection).
  -behaviour(evoq_projection).
 
  interested_in() -> [<<"OrderPlaced">>, <<"OrderShipped">>].
 
  init(_Config) ->
      {ok, RM} = evoq_read_model:new(evoq_read_model_ets, #{}),
      {ok, #{}, RM}.
 
  project(#{event_type := <<"OrderPlaced">>, data := Data}, Meta, State, RM) ->
      OrderId = maps:get(order_id, Data),
      Summary = #{status => placed, items => maps:get(items, Data, [])},
      {ok, RM2} = evoq_read_model:put(OrderId, Summary, RM),
      {ok, State, RM2}.

Summary

Functions

Get the current checkpoint position.

Get event types this projection handles.

Get the read model instance.

Notify projection of an event.

Rebuild the projection from scratch. Clears the read model and replays all events.

Rebuild with options.

Start a projection with options. Options: - store_id: Event store to replay from (overrides app env) - checkpoint_store: Module for persistent checkpoint storage - start_from: origin | latest | {position, N}

Callbacks

init/1

-callback init(Config :: map()) ->
                  {ok, State :: term(), ReadModel :: evoq_read_model:read_model()} |
                  {error, Reason :: term()}.

interested_in/0

-callback interested_in() -> [EventType :: binary()].

on_error/4

(optional)
-callback on_error(Error :: term(),
                   Event :: map(),
                   FailureContext ::
                       #evoq_failure_context{handler_module :: atom(),
                                             event :: map(),
                                             error :: term(),
                                             attempt_number :: pos_integer(),
                                             first_failure_at :: integer(),
                                             last_failure_at :: integer(),
                                             stacktrace :: list()},
                   State :: term()) ->
                      evoq_error_handler:error_action().

project/4

-callback project(Event :: map(),
                  Metadata :: map(),
                  State :: term(),
                  ReadModel :: evoq_read_model:read_model()) ->
                     {ok, NewState :: term(), NewReadModel :: evoq_read_model:read_model()} |
                     {skip, State :: term(), ReadModel :: evoq_read_model:read_model()} |
                     {error, Reason :: term()}.

Functions

get_checkpoint(Pid)

-spec get_checkpoint(pid()) -> non_neg_integer().

Get the current checkpoint position.

get_event_types(Pid)

-spec get_event_types(pid()) -> [binary()].

Get event types this projection handles.

get_read_model(Pid)

-spec get_read_model(pid()) -> evoq_read_model:read_model().

Get the read model instance.

notify(Pid, EventType, Event, Metadata)

-spec notify(pid(), binary(), map(), map()) -> ok | {error, term()}.

Notify projection of an event.

rebuild(Pid)

-spec rebuild(pid()) -> ok | {error, term()}.

Rebuild the projection from scratch. Clears the read model and replays all events.

rebuild(Pid, Opts)

-spec rebuild(pid(), map()) -> ok | {error, term()}.

Rebuild with options.

start_link(ProjectionModule, Config)

-spec start_link(atom(), map()) -> {ok, pid()} | {error, term()}.

Start a projection.

start_link(ProjectionModule, Config, Opts)

-spec start_link(atom(), map(), map()) -> {ok, pid()} | {error, term()}.

Start a projection with options. Options: - store_id: Event store to replay from (overrides app env) - checkpoint_store: Module for persistent checkpoint storage - start_from: origin | latest | {position, N}