View Source Commanded.Event.Handler behaviour (Commanded v1.4.6)
Defines the behaviour an event handler must implement and provides a convenience macro that implements the behaviour, allowing you to handle only the events you are interested in processing.
You should start your event handlers using a Supervisor to ensure they are restarted on error.
Example
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler"
def handle(%AnEvent{..}, _metadata) do
# ... process the event
:ok
end
end
Start your event handler process (or use a Supervisor):
{:ok, _handler} = ExampleHandler.start_link()
Event handler name
The name you specify is used when subscribing to the event store. You must use a unique name for each event handler and process manager you start. Also, you should not change the name once the handler has been deployed. A new subscription will be created if you change the name and the event handler will receive already handled events.
You can use the module name of your event handler using the __MODULE__
special form:
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__
end
Subscription options
You can choose to start the event handler's event store subscription from
:origin
, :current
position, or an exact event number using the
start_from
option. The default is to use the origin so your handler will
receive all events.
Use the :current
position when you don't want newly created event handlers
to go through all previous events. An example would be adding an event handler
to send transactional emails to an already deployed system containing many
historical events.
The start_from
option only applies when the subscription is initially
created, the first time the handler starts. Whenever the handler restarts the
subscription will resume from the next event after the last successfully
processed event. Restarting an event handler does not restart its
subscription.
Example
Set the start_from
option (:origin
, :current
, or an explicit event
number) when using Commanded.Event.Handler
:
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler",
start_from: :origin
end
You can optionally override :start_from
by passing it as option when
starting your handler:
{:ok, _handler} = ExampleHandler.start_link(start_from: :current)
Subscribing to an individual stream
By default event handlers will subscribe to all events appended to any stream.
Provide a subscribe_to
option to subscribe to a single stream.
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__,
subscribe_to: "stream1234"
end
This will ensure the handler only receives events appended to that stream.
Runtime event handler configuration
Runtime options can be provided to the event handler's start_link/1
function
or its child spec. The init/1
callback function can also be used to define
runtime configuration.
Example
Provide runtime configuration to start_link/1
:
{:ok, _pid} =
ExampleHandler.start_link(
application: ExampleApp,
name: "ExampleHandler"
)
Or when supervised:
Supervisor.start_link([
{ExampleHandler, application: ExampleApp, name: "ExampleHandler"}
], strategy: :one_for_one)
Event handler state
An event handler can define and update state which is held in the GenServer
process memory. It is passed to the handle/2
function as part of the
metadata using the :state
key. The state is transient and will be lost
whenever the process restarts.
Initial state can be set in the init/1
callback function by adding a
:state
key to the config. It can also be provided when starting the handler
process:
ExampleHandler.start_link(state: initial_state)
Or when supervised:
Supervisor.start_link([
{ExampleHandler, state: initial_state}
], strategy: :one_for_one)
State can be updated by returning {:ok, new_state}
from any handle/2
function. Returning an :ok
reply will keep the state unchanged.
Handler state is also included in the Commanded.Event.FailureContext
struct
passed to the error/3
callback function.
Example
defmodule StatefulEventHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__
def init(config) do
config = Keyword.put_new(config, :state, %{})
{:ok, config}
end
def handle(event, metadata) do
%{state: state} = metadata
new_state = mutate_state(state)
{:ok, new_state}
end
end
Concurrency
An event handler may be configured to start multiple processes to handle the
events concurrently. By default one process will be started, processing events
one at a time in order. The :concurrency
option determines how many event
handler processes are started. It must be a positive integer.
Note with concurrent processing events will likely by processed out of order.
If you need to enforce an order, such as per stream or by using a field from
an event, you can define a partition_by/2
callback function in the event
handler module. The function will receive each event and its metadata and must
return a consistent term indicating the event's partition. Events which return
the same term are guaranteed to be processed in order by the same event
handler instance. While events with different partitions may be processed
concurrently by another instance. An attempt will be made to distribute
events as evenly as possible to all running event handler instances.
Only :eventual
consistency is supported when multiple handler processes are
configured with a :concurrency
of greater than one.
Example
defmodule ConcurrentProcssingEventHandler do
alias Commanded.EventStore.RecordedEvent
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__,
concurrency: 10
def init(config) do
# Fetch the index of this event handler instance (0..9 in this example)
index = Keyword.fetch!(config, :index)
{:ok, config}
end
def handle(event, metadata) do
:ok
end
# Partition events by their stream
def partition_by(event, metadata) do
%{stream_id: stream_id} = metadata
stream_id
end
end
Consistency
For each event handler you can define its consistency, as one of either
:strong
or :eventual
.
This setting is used when dispatching commands and specifying the
consistency
option.
When you dispatch a command using :strong
consistency, after successful
command dispatch the process will block until all event handlers configured to
use :strong
consistency have processed the domain events created by the
command. This is useful when you have a read model updated by an event handler
that you wish to query for data affected by the command dispatch. With
:strong
consistency you are guaranteed that the read model will be
up-to-date after the command has successfully dispatched. It can be safely
queried for data updated by any of the events created by the command.
The default setting is :eventual
consistency. Command dispatch will return
immediately upon confirmation of event persistence, not waiting for any event
handlers.
Note strong consistency does not imply a transaction covers the command dispatch and event handling. It only guarantees that the event handler will have processed all events produced by the command: if event handling fails the events will have still been persisted.
Example
Define an event handler with :strong
consistency:
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler",
consistency: :strong
end
Dynamic application
An event handler's application can be provided as an option to start_link/1
.
This can be used to start the same handler multiple times, each using a
separate Commanded application and event store.
Example
Start an event handler process for each tenant in a multi-tenanted app, guaranteeing that the data and processing remains isolated between tenants.
for tenant <- [:tenant1, :tenant2, :tenant3] do
{:ok, _app} = MyApp.Application.start_link(name: tenant)
{:ok, _handler} = ExampleHandler.start_link(application: tenant)
end
Typically you would start the event handlers using a supervisor:
children =
for tenant <- [:tenant1, :tenant2, :tenant3] do
{ExampleHandler, application: tenant}
end
Supervisor.start_link(children, strategy: :one_for_one)
The above example requires three named Commanded applications to have already been started.
Telemetry
[:commanded, :event, :handle, :start]
- Description: Emitted when an event handler starts handling an event
- Measurements:
%{system_time: integer()}
- Metadata:
%{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), recorded_event: RecordedEvent.t()}
[:commanded, :event, :handle, :stop]
- Description: Emitted when an event handler stops handling an event
- Measurements:
%{duration: non_neg_integer()}
- Metadata:
%{:application => Commanded.Application.t(), :context => map(), :handler_name => String.t(), :handler_module => atom(), :handler_state => map(), :recorded_event => RecordedEvent.t(), optional(:error) => any()}
[:commanded, :event, :handle, :exception]
- Description: Emitted when an event handler raises an exception
- Measurements:
%{duration: non_neg_integer()}
- Metadata:
%{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), recorded_event: RecordedEvent.t(), kind: :throw | :error | :exit, reason: any(), stacktrace: list()}
Summary
Callbacks
Optional initialisation callback function called when the handler starts.
Called before an event handler gets reset
Called when an event handle/2
callback returns an error.
Handle a domain event and its metadata.
Optional callback function called to configure the handler before it starts.
Determine which partition an event belongs to.
Functions
Returns a specification to start this module under a supervisor.
Types
@type consistency() :: :eventual | :strong
@type domain_event() :: struct()
@type metadata() :: map()
@type subscribe_from() :: :origin | :current | non_neg_integer()
Callbacks
@callback after_start(handler_state :: term()) :: :ok | {:ok, state :: map()} | {:stop, reason :: any()}
Optional initialisation callback function called when the handler starts.
Can be used to start any related processes when the event handler is started.
This callback function must return :ok
, {:ok, state}
to return new state,
or {:stop, reason}
to stop the handler process. Any other return value
will terminate the event handler with an error.
Example
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: "ExampleHandler"
# Optional initialisation
def after_start(handler_state) do
new_handler_state = Map.put(handler_state, :foo, "bar")
{:ok, new_handler_state}
end
def handle(%AnEvent{..}, _metadata) do
# Process the event ...
:ok
end
end
@callback before_reset() :: :ok
Called before an event handler gets reset
@callback error( error :: term(), failed_event :: domain_event(), failure_context :: Commanded.Event.FailureContext.t() ) :: {:retry, context :: map() | Commanded.Event.FailureContext.t()} | {:retry, delay :: non_neg_integer(), context :: map() | Commanded.Event.FailureContext.t()} | :skip | {:stop, reason :: term()}
Called when an event handle/2
callback returns an error.
The error/3
function allows you to control how event handling failures
are handled. The function is passed the error returned by the event handler
(e.g. {:error, :failure}
), the event causing the error, and a context map
containing state passed between retries.
Use pattern matching on the error and/or failed event to explicitly handle certain errors or events. Use the context map to track any transient state you need to access between retried failures.
You can return one of the following responses depending upon the error severity:
{:retry, context}
- retry the failed event, provide a context map, or updatedCommanded.Event.FailureContext
struct, containing any state to be passed to subsequent failures. This could be used to count the number of failures, stopping after too many.{:retry, delay, context}
- retry the failed event, after sleeping for the requested delay (in milliseconds). Context is a map orCommanded.Event.FailureContext
struct as described in{:retry, context}
above.:skip
- skip the failed event by acknowledging receipt.{:stop, reason}
- stop the event handler with the given reason.
The default behaviour if you don't provide an error/3
callback is to stop
the event handler using the exact error reason returned from the handle/2
function. If the event handler is supervised using restart permanent
or
transient
stopping on error will cause the handler to be restarted. It will
likely crash again as it will reprocesse the problematic event. This can lead
to cascading failures going up the supervision tree.
Example error handling
defmodule ExampleHandler do
use Commanded.Event.Handler,
application: ExampleApp,
name: __MODULE__
require Logger
alias Commanded.Event.FailureContext
def handle(%AnEvent{}, _metadata) do
# simulate event handling failure
{:error, :failed}
end
def error({:error, :failed}, %AnEvent{} = event, %FailureContext{context: context}) do
context = record_failure(context)
case Map.get(context, :failures) do
too_many when too_many >= 3 ->
# skip bad event after third failure
Logger.warning("Skipping bad event, too many failures: " <> inspect(event))
:skip
_ ->
# retry event, failure count is included in context map
{:retry, context}
end
end
defp record_failure(context) do
Map.update(context, :failures, 1, fn failures -> failures + 1 end)
end
end
@callback handle(domain_event(), metadata()) :: :ok | {:ok, new_state :: any()} | {:error, :already_seen_event} | {:error, reason :: any()}
Handle a domain event and its metadata.
Return :ok
on success, {:error, :already_seen_event}
to ack and skip the
event, or {:error, reason}
on failure.
@callback init() :: :ok | {:stop, reason :: any()}
Optional callback function called to configure the handler before it starts.
It is passed the merged compile-time and runtime config, and must return the
updated config as {:ok, config}
.
Note this function is called before the event handler process is started and
is not run from the handler's process. You cannot use self()
to access the
handler's PID.
Example
The init/1
function is used to define the handler's application and name
based upon a value provided at runtime:
defmodule ExampleHandler do
use Commanded.Event.Handler
def init(config) do
{tenant, config} = Keyword.pop!(config, :tenant)
config =
config
|> Keyword.put(:application, Module.concat([ExampleApp, tenant]))
|> Keyword.put(:name, Module.concat([__MODULE__, tenant]))
{:ok, config}
end
end
Usage:
{:ok, _pid} = ExampleHandler.start_link(tenant: :tenant1)
@callback partition_by(domain_event(), metadata()) :: any()
Determine which partition an event belongs to.
Only applicable when an event handler has been configured with more than one
instance via the :concurrency
option.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor
.