evoq_process_manager behaviour (evoq v1.9.1)
View SourceProcess manager (saga) behavior for evoq.
Process managers coordinate long-running business processes that span multiple aggregates. They: - Subscribe to events by type - Correlate events to process instances - Dispatch commands based on events - Support compensation for saga rollback
Callbacks
Required: - interested_in() -> [binary()] Event types this PM processes
- correlate(Event, Metadata) -> correlation_result() Determines process instance for an event
- handle(State, Event, Metadata) -> handle_result() Process the event and optionally dispatch commands
- apply(State, Event) -> NewState Apply event to process manager state
Optional: - compensate(State, FailedCommand) -> compensation_result() Generate compensating commands for saga rollback
Example
-module(order_fulfillment_pm).
-behaviour(evoq_process_manager).
interested_in() -> [<<"OrderPlaced">>, <<"PaymentReceived">>, <<"ItemShipped">>].
correlate(#{data := #{order_id := OrderId}}, _Meta) ->
{continue, OrderId}.
handle(State, #{event_type := <<"OrderPlaced">>} = Event, _Meta) ->
%% Start payment process
Cmd = evoq_command:new(process_payment, payment, OrderId, #{...}),
{ok, State, [Cmd]}.
Summary
Functions
Get event types this process manager is interested in.
Register a process manager (start subscription).
Register a process manager with options. This registers the PM module with the router so it receives events.
Types
-type compensation_result() :: {ok, CompensatingCommands :: [#evoq_command{command_id :: binary() | undefined, command_type :: atom() | undefined, aggregate_type :: atom() | undefined, aggregate_id :: binary() | undefined, payload :: map(), metadata :: map(), causation_id :: binary() | undefined, correlation_id :: binary() | undefined, idempotency_key :: binary() | undefined}]} | skip.
-type handle_result() :: {ok, NewState :: term()} | {ok, NewState :: term(), Commands :: [#evoq_command{command_id :: binary() | undefined, command_type :: atom() | undefined, aggregate_type :: atom() | undefined, aggregate_id :: binary() | undefined, payload :: map(), metadata :: map(), causation_id :: binary() | undefined, correlation_id :: binary() | undefined, idempotency_key :: binary() | undefined}]} | {error, Reason :: term()}.
Callbacks
-callback compensate(State :: term(), FailedCommand :: #evoq_command{command_id :: binary() | undefined, command_type :: atom() | undefined, aggregate_type :: atom() | undefined, aggregate_id :: binary() | undefined, payload :: map(), metadata :: map(), causation_id :: binary() | undefined, correlation_id :: binary() | undefined, idempotency_key :: binary() | undefined}) -> compensation_result().
-callback correlate(Event :: map(), Metadata :: map()) -> correlation_result().
-callback handle(State :: term(), Event :: map(), Metadata :: map()) -> handle_result().
-callback interested_in() -> [EventType :: binary()].
Functions
Get event types this process manager is interested in.
Register a process manager (start subscription).
Register a process manager with options. This registers the PM module with the router so it receives events.