Commanded.Event.Handler behaviour (Commanded v1.4.10)
View SourceDefines the behaviour an event handler must implement and provides a convenience macro that implements the behaviour, allowing you to handle only the events you are interested in processing.
You should start your event handlers using a Supervisor to ensure they are restarted on error.
Example
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler"
def handle(%AnEvent{..}, _metadata) do
# ... process the event
:ok
end
endStart your event handler process (or use a Supervisor):
{:ok, _handler} = ExampleHandler.start_link()Event handler name
The name you specify is used when subscribing to the event store. You must use a unique name for each event handler and process manager you start. Also, you should not change the name once the handler has been deployed. A new subscription will be created if you change the name and the event handler will receive already handled events.
You can use the module name of your event handler using the __MODULE__
special form:
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__
endSubscription options
You can choose to start the event handler's event store subscription from
:origin, :current position, or an exact event number using the
start_from option. The default is to use the origin so your handler will
receive all events.
Use the :current position when you don't want newly created event handlers
to go through all previous events. An example would be adding an event handler
to send transactional emails to an already deployed system containing many
historical events.
The start_from option only applies when the subscription is initially
created, the first time the handler starts. Whenever the handler restarts the
subscription will resume from the next event after the last successfully
processed event. Restarting an event handler does not restart its
subscription.
Example
Set the start_from option (:origin, :current, or an explicit event
number) when using Commanded.Event.Handler:
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler",
start_from: :origin
endYou can optionally override :start_from by passing it as option when
starting your handler:
{:ok, _handler} = ExampleHandler.start_link(start_from: :current)Subscribing to an individual stream
By default event handlers will subscribe to all events appended to any stream.
Provide a subscribe_to option to subscribe to a single stream.
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__,
subscribe_to: "stream1234"
endThis will ensure the handler only receives events appended to that stream.
Runtime event handler configuration
Runtime options can be provided to the event handler's start_link/1 function
or its child spec. The init/1 callback function can also be used to define
runtime configuration.
Example
Provide runtime configuration to start_link/1:
{:ok, _pid} =
ExampleHandler.start_link(
application: ExampleApp,
name: "ExampleHandler"
)Or when supervised:
Supervisor.start_link([
{ExampleHandler, application: ExampleApp, name: "ExampleHandler"}
], strategy: :one_for_one)Event handler state
An event handler can define and update state which is held in the GenServer
process memory. It is passed to the handle/2 (or handle_batch/1) function as part of the
metadata using the :state key. The state is transient and will be lost
whenever the process restarts.
Initial state can be set in the init/1 callback function by adding a
:state key to the config. It can also be provided when starting the handler
process:
ExampleHandler.start_link(state: initial_state)Or when supervised:
Supervisor.start_link([
{ExampleHandler, state: initial_state}
], strategy: :one_for_one)State can be updated by returning {:ok, new_state} from any handle/2
(or handle_batch/2) function. Returning an :ok reply will keep the state unchanged.
Handler state is also included in the Commanded.Event.FailureContext struct
passed to the error/3 callback function.
Example
defmodule StatefulEventHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__
def init(config) do
config = Keyword.put_new(config, :state, %{})
{:ok, config}
end
def handle(event, metadata) do
%{state: state} = metadata
new_state = mutate_state(state)
{:ok, new_state}
end
endConcurrency
An event handler may be configured to start multiple processes to handle the
events concurrently. By default one process will be started, processing events
one at a time in order. The :concurrency option determines how many event
handler processes are started. It must be a positive integer.
Note with concurrent processing events will likely by processed out of order.
If you need to enforce an order, such as per stream or by using a field from
an event, you can define a partition_by/2 callback function in the event
handler module. The function will receive each event and its metadata and must
return a consistent term indicating the event's partition. Events which return
the same term are guaranteed to be processed in order by the same event
handler instance. While events with different partitions may be processed
concurrently by another instance. An attempt will be made to distribute
events as evenly as possible to all running event handler instances.
Only :eventual consistency is supported when multiple handler processes are
configured with a :concurrency of greater than one.
Example
defmodule ConcurrentProcssingEventHandler do
alias Commanded.EventStore.RecordedEvent
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__,
concurrency: 10
def init(config) do
# Fetch the index of this event handler instance (0..9 in this example)
index = Keyword.fetch!(config, :index)
{:ok, config}
end
def handle(event, metadata) do
:ok
end
# Partition events by their stream
def partition_by(event, metadata) do
%{stream_id: stream_id} = metadata
stream_id
end
endConsistency
For each event handler you can define its consistency, as one of either
:strong or :eventual.
This setting is used when dispatching commands and specifying the
consistency option.
When you dispatch a command using :strong consistency, after successful
command dispatch the process will block until all event handlers configured to
use :strong consistency have processed the domain events created by the
command. This is useful when you have a read model updated by an event handler
that you wish to query for data affected by the command dispatch. With
:strong consistency you are guaranteed that the read model will be
up-to-date after the command has successfully dispatched. It can be safely
queried for data updated by any of the events created by the command.
The default setting is :eventual consistency. Command dispatch will return
immediately upon confirmation of event persistence, not waiting for any event
handlers.
Note strong consistency does not imply a transaction covers the command dispatch and event handling. It only guarantees that the event handler will have processed all events produced by the command: if event handling fails the events will have still been persisted.
Example
Define an event handler with :strong consistency:
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler",
consistency: :strong
endBatching
Event handlers can process events in batches rather than one at a time. This allows writing to a target system in a single transaction and acknowledging all events at once, reducing overhead.
To enable batching, set the batch_size option and implement handle_batch/1
instead of handle/2. The callback receives a list of {event, metadata}
tuples. On returning :ok, all events in the batch are acknowledged. There
is no partial acknowledgement.
Batching and concurrency are not supported together; setting both will raise a compilation error.
batch_size
The batch_size option controls the EventStore subscription's in-flight
buffer — how many events the subscription can deliver without waiting for
acknowledgement. It is not an accumulation mechanism at the handler level.
The handler processes each delivery from the subscription immediately.
During live operation, when events arrive one at a time, handle_batch/1
is called with a single event — regardless of the configured batch_size.
Larger batches form naturally only when the subscription has multiple events
queued, such as during catch-up replay, back-pressure recovery, or bulk
appends.
When batch_timeout is configured, batch_size also serves as the maximum
size of the handler's internal buffer — once the buffer reaches batch_size
events, it is flushed immediately regardless of the timeout.
batch_timeout
The batch_timeout option adds time-based buffering at the handler level.
When set, events are accumulated in the handler process and flushed when
either condition is met first:
batch_sizeevents have accumulated, ORbatch_timeoutmilliseconds have elapsed since the first buffered event
This enables use cases such as accumulating changes before writing them in one go to a projection, batching bulk database inserts, or reducing per-event overhead for side effects — even when events arrive individually during steady-state operation.
batch_timeout accepts a positive integer (milliseconds) or :infinity
(the default — no buffering, backwards compatible with previous behaviour).
Requires batch_size to be configured.
Example
defmodule OrderProjector do
use Commanded.Event.Handler,
application: ExampleApp,
name: "OrderProjector",
batch_size: 50,
batch_timeout: 100
def handle_batch(events) do
# Receives up to 50 events, or whatever accumulated within 100ms
:ok
end
endTelemetry
Batch telemetry events include a flush_reason field:
:size— batch reachedbatch_size:timeout—batch_timeoutelapsed before the batch filled:immediate— nobatch_timeoutconfigured; events processed as delivered
Example
Define an event handler with :strong consistency:
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler",
consistency: :strong
endDynamic application
An event handler's application can be provided as an option to start_link/1.
This can be used to start the same handler multiple times, each using a
separate Commanded application and event store.
Example
Start an event handler process for each tenant in a multi-tenanted app, guaranteeing that the data and processing remains isolated between tenants.
for tenant <- [:tenant1, :tenant2, :tenant3] do
{:ok, _app} = MyApp.Application.start_link(name: tenant)
{:ok, _handler} = ExampleHandler.start_link(application: tenant)
endTypically you would start the event handlers using a supervisor:
children =
for tenant <- [:tenant1, :tenant2, :tenant3] do
{ExampleHandler, application: tenant}
end
Supervisor.start_link(children, strategy: :one_for_one)The above example requires three named Commanded applications to have already been started.
Telemetry
[:commanded, :event, :handle, :start]- Description: Emitted when an event handler starts handling an event
- Measurements:
%{system_time: integer()} - Metadata:
%{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), recorded_event: RecordedEvent.t()}
[:commanded, :event, :handle, :stop]- Description: Emitted when an event handler stops handling an event
- Measurements:
%{duration: non_neg_integer()} - Metadata:
%{:application => Commanded.Application.t(), :context => map(), :handler_name => String.t(), :handler_module => atom(), :handler_state => map(), :recorded_event => RecordedEvent.t(), optional(:error) => any()}
[:commanded, :event, :handle, :exception]- Description: Emitted when an event handler raises an exception
- Measurements:
%{duration: non_neg_integer()} - Metadata:
%{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), recorded_event: RecordedEvent.t(), kind: :throw | :error | :exit, reason: any(), stacktrace: list()}
[:commanded, :event, :batch, :start]- Description: Emitted when an event handler starts handling a batch of events
- Measurements:
%{system_time: integer()} - Metadata:
%{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), first_event_id: binary(), last_event_id: binary(), event_count: integer(), flush_reason: :size | :timeout | :immediate}
[:commanded, :event, :batch, :stop]- Description: Emitted when an event handler stops handling a batch of events
- Measurements:
%{duration: non_neg_integer()} - Metadata:
%{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), first_event_id: binary(), last_event_id: binary(), event_count: integer(), recorded_event: RecordedEvent.t() | nil, flush_reason: :size | :timeout | :immediate, optional(:error) => any()}
[:commanded, :event, :batch, :exception]- Description: Emitted when an event batch handler raises an exception
- Measurements:
%{duration: non_neg_integer()} - Metadata:
%{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), recorded_event: => RecordedEvent.t() | nil, first_event_id: binary(), last_event_id: binary(), event_count: integer(), flush_reason: :size | :timeout | :immediate, kind: :throw | :error | :exit, reason: any(), optional(:stacktrace) => list()}
Summary
Callbacks
Optional initialisation callback function called when the handler starts.
Called before an event handler gets reset
Called when an event handle/2 callback returns an error.
Handle a domain event and its metadata.
Handle a batch of domain events and their metadata.
Optional callback function called to configure the handler before it starts.
Determine which partition an event belongs to.
Functions
Returns a specification to start this module under a supervisor.
Types
@type consistency() :: :eventual | :strong
@type domain_event() :: struct()
@type metadata() :: map()
@type subscribe_from() :: :origin | :current | non_neg_integer()
Callbacks
@callback after_start(handler_state :: term()) :: :ok | {:ok, state :: map()} | {:stop, reason :: any()}
Optional initialisation callback function called when the handler starts.
Can be used to start any related processes when the event handler is started.
This callback function must return :ok, {:ok, state} to return new state,
or {:stop, reason} to stop the handler process. Any other return value
will terminate the event handler with an error.
Example
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler"
# Optional initialisation
def after_start(handler_state) do
new_handler_state = Map.put(handler_state, :foo, "bar")
{:ok, new_handler_state}
end
def handle(%AnEvent{..}, _metadata) do
# Process the event ...
:ok
end
end
@callback before_reset() :: :ok
Called before an event handler gets reset
@callback error( error :: term(), failed_event :: domain_event() | [domain_event()] | nil, failure_context :: Commanded.Event.FailureContext.t() ) :: {:retry, context :: map() | Commanded.Event.FailureContext.t()} | {:retry, delay :: non_neg_integer(), context :: map() | Commanded.Event.FailureContext.t()} | :skip | {:stop, reason :: term()}
Called when an event handle/2 callback returns an error.
The error/3 function allows you to control how event handling failures
are handled. The function is passed the error returned by the event handler
(e.g. {:error, :failure}), the event causing the error, and a context map
containing state passed between retries.
Use pattern matching on the error and/or failed event to explicitly handle certain errors or events. Use the context map to track any transient state you need to access between retried failures.
You can return one of the following responses depending upon the error severity:
{:retry, context}- retry the failed event, provide a context map, or updatedCommanded.Event.FailureContextstruct, containing any state to be passed to subsequent failures. This could be used to count the number of failures, stopping after too many.{:retry, delay, context}- retry the failed event, after sleeping for the requested delay (in milliseconds). Context is a map orCommanded.Event.FailureContextstruct as described in{:retry, context}above.:skip- skip the failed event by acknowledging receipt. In batching mode, this will acknowledge and skip all events up to and including the failed event{:stop, reason}- stop the event handler with the given reason.
The default behaviour if you don't provide an error/3 callback is to stop
the event handler using the exact error reason returned from the handle/2
function. If the event handler is supervised using restart permanent or
transient stopping on error will cause the handler to be restarted. It will
likely crash again as it will reprocesse the problematic event. This can lead
to cascading failures going up the supervision tree.
Example error handling
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__
require Logger
alias Commanded.Event.FailureContext
def handle(%AnEvent{}, _metadata) do
# simulate event handling failure
{:error, :failed}
end
def error({:error, :failed}, %AnEvent{} = event, %FailureContext{context: context}) do
context = record_failure(context)
case Map.get(context, :failures) do
too_many when too_many >= 3 ->
# skip bad event after third failure
Logger.warning("Skipping bad event, too many failures: " <> inspect(event))
:skip
_ ->
# retry event, failure count is included in context map
{:retry, context}
end
end
defp record_failure(context) do
Map.update(context, :failures, 1, fn failures -> failures + 1 end)
end
end
@callback handle(domain_event(), metadata()) :: :ok | {:ok, new_state :: any()} | {:error, :already_seen_event} | {:error, reason :: any()}
Handle a domain event and its metadata.
Return :ok on success, {:error, :already_seen_event} to ack and skip the
event, or {:error, reason} on failure.
@callback handle_batch([{domain_event(), metadata()}]) :: :ok | {:ok, new_state :: any()} | {:error, reason :: any()}
Handle a batch of domain events and their metadata.
Return :ok on success. All events in the batch will be acknowledged.
On error, you can either return {:error, reason} to indicate something
with the whole batch went wrong
Note that this interface may change as more experience with use cases for batching is gained.
@callback init() :: :ok | {:stop, reason :: any()}
Optional callback function called to configure the handler before it starts.
It is passed the merged compile-time and runtime config, and must return the
updated config as {:ok, config}.
Note this function is called before the event handler process is started and
is not run from the handler's process. You cannot use self() to access the
handler's PID.
Example
The init/1 function is used to define the handler's application and name
based upon a value provided at runtime:
defmodule ExampleHandler do
use Commanded.Event.Handler
def init(config) do
{tenant, config} = Keyword.pop!(config, :tenant)
config =
config
|> Keyword.put(:application, Module.concat([ExampleApp, tenant]))
|> Keyword.put(:name, Module.concat([__MODULE__, tenant]))
{:ok, config}
end
endUsage:
{:ok, _pid} = ExampleHandler.start_link(tenant: :tenant1)
@callback partition_by(domain_event(), metadata()) :: any()
Determine which partition an event belongs to.
Only applicable when an event handler has been configured with more than one
instance via the :concurrency option.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.