evoq_process_manager behaviour (evoq v1.14.1)

View Source

Process manager (saga) behavior for evoq.

Process managers coordinate long-running business processes that span multiple aggregates. They: - Subscribe to events by type - Correlate events to process instances - Dispatch commands based on events - Support compensation for saga rollback

Callbacks

Required: - interested_in() -> [binary()] Event types this PM processes

- correlate(Event, Metadata) -> correlation_result() Determines process instance for an event

- handle(State, Event, Metadata) -> handle_result() Process the event and optionally dispatch commands

- apply(State, Event) -> NewState Apply event to process manager state

Optional: - compensate(State, FailedCommand) -> compensation_result() Generate compensating commands for saga rollback

Example

  -module(order_fulfillment_pm).
  -behaviour(evoq_process_manager).
 
  interested_in() -> [<<"OrderPlaced">>, <<"PaymentReceived">>, <<"ItemShipped">>].
 
  correlate(#{data := #{order_id := OrderId}}, _Meta) ->
      {continue, OrderId}.
 
  handle(State, #{event_type := <<"OrderPlaced">>} = Event, _Meta) ->
      %% Start payment process
      Cmd = evoq_command:new(process_payment, payment, OrderId, #{...}),
      {ok, State, [Cmd]}.

Summary

Functions

Get event types this process manager is interested in.

Register a process manager (start subscription).

Register a process manager with options. This registers the PM module with the router so it receives events.

Types

compensation_result/0

-type compensation_result() ::
          {ok,
           CompensatingCommands ::
               [#evoq_command{command_id :: binary() | undefined,
                              command_type :: atom() | undefined,
                              aggregate_type :: atom() | undefined,
                              aggregate_id :: binary() | undefined,
                              payload :: map(),
                              metadata :: map(),
                              causation_id :: binary() | undefined,
                              correlation_id :: binary() | undefined,
                              idempotency_key :: binary() | undefined}]} |
          skip.

correlation_result/0

-type correlation_result() ::
          {start, ProcessId :: binary()} |
          {continue, ProcessId :: binary()} |
          {stop, ProcessId :: binary()} |
          false.

handle_result/0

-type handle_result() ::
          {ok, NewState :: term()} |
          {ok,
           NewState :: term(),
           Commands ::
               [#evoq_command{command_id :: binary() | undefined,
                              command_type :: atom() | undefined,
                              aggregate_type :: atom() | undefined,
                              aggregate_id :: binary() | undefined,
                              payload :: map(),
                              metadata :: map(),
                              causation_id :: binary() | undefined,
                              correlation_id :: binary() | undefined,
                              idempotency_key :: binary() | undefined}]} |
          {error, Reason :: term()}.

Callbacks

apply/2

-callback apply(State :: term(), Event :: map()) -> NewState :: term().

compensate/2

(optional)
-callback compensate(State :: term(),
                     FailedCommand ::
                         #evoq_command{command_id :: binary() | undefined,
                                       command_type :: atom() | undefined,
                                       aggregate_type :: atom() | undefined,
                                       aggregate_id :: binary() | undefined,
                                       payload :: map(),
                                       metadata :: map(),
                                       causation_id :: binary() | undefined,
                                       correlation_id :: binary() | undefined,
                                       idempotency_key :: binary() | undefined}) ->
                        compensation_result().

correlate/2

-callback correlate(Event :: map(), Metadata :: map()) -> correlation_result().

handle/3

-callback handle(State :: term(), Event :: map(), Metadata :: map()) -> handle_result().

init/1

(optional)
-callback init(ProcessId :: binary()) -> {ok, State :: term()}.

interested_in/0

-callback interested_in() -> [EventType :: binary()].

Functions

get_event_types(PMModule)

-spec get_event_types(atom()) -> [binary()].

Get event types this process manager is interested in.

start(PMModule, Config)

-spec start(atom(), map()) -> ok.

Register a process manager (start subscription).

start(PMModule, Config, Opts)

-spec start(atom(), map(), map()) -> ok.

Register a process manager with options. This registers the PM module with the router so it receives events.