View Source Commanded.Event.Handler behaviour (Commanded v1.4.6)

Defines the behaviour an event handler must implement and provides a convenience macro that implements the behaviour, allowing you to handle only the events you are interested in processing.

You should start your event handlers using a Supervisor to ensure they are restarted on error.

Example

defmodule ExampleHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: "ExampleHandler"

  def handle(%AnEvent{..}, _metadata) do
    # ... process the event
    :ok
  end
end

Start your event handler process (or use a Supervisor):

{:ok, _handler} = ExampleHandler.start_link()

Event handler name

The name you specify is used when subscribing to the event store. You must use a unique name for each event handler and process manager you start. Also, you should not change the name once the handler has been deployed. A new subscription will be created if you change the name and the event handler will receive already handled events.

You can use the module name of your event handler using the __MODULE__ special form:

defmodule ExampleHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: __MODULE__
end

Subscription options

You can choose to start the event handler's event store subscription from :origin, :current position, or an exact event number using the start_from option. The default is to use the origin so your handler will receive all events.

Use the :current position when you don't want newly created event handlers to go through all previous events. An example would be adding an event handler to send transactional emails to an already deployed system containing many historical events.

The start_from option only applies when the subscription is initially created, the first time the handler starts. Whenever the handler restarts the subscription will resume from the next event after the last successfully processed event. Restarting an event handler does not restart its subscription.

Example

Set the start_from option (:origin, :current, or an explicit event number) when using Commanded.Event.Handler:

defmodule ExampleHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: "ExampleHandler",
    start_from: :origin
end

You can optionally override :start_from by passing it as option when starting your handler:

{:ok, _handler} = ExampleHandler.start_link(start_from: :current)

Subscribing to an individual stream

By default event handlers will subscribe to all events appended to any stream. Provide a subscribe_to option to subscribe to a single stream.

defmodule ExampleHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: __MODULE__,
    subscribe_to: "stream1234"
end

This will ensure the handler only receives events appended to that stream.

Runtime event handler configuration

Runtime options can be provided to the event handler's start_link/1 function or its child spec. The init/1 callback function can also be used to define runtime configuration.

Example

Provide runtime configuration to start_link/1:

{:ok, _pid} =
  ExampleHandler.start_link(
    application: ExampleApp,
    name: "ExampleHandler"
  )

Or when supervised:

Supervisor.start_link([
  {ExampleHandler, application: ExampleApp, name: "ExampleHandler"}
], strategy: :one_for_one)

Event handler state

An event handler can define and update state which is held in the GenServer process memory. It is passed to the handle/2 function as part of the metadata using the :state key. The state is transient and will be lost whenever the process restarts.

Initial state can be set in the init/1 callback function by adding a :state key to the config. It can also be provided when starting the handler process:

ExampleHandler.start_link(state: initial_state)

Or when supervised:

Supervisor.start_link([
  {ExampleHandler, state: initial_state}
], strategy: :one_for_one)

State can be updated by returning {:ok, new_state} from any handle/2 function. Returning an :ok reply will keep the state unchanged.

Handler state is also included in the Commanded.Event.FailureContext struct passed to the error/3 callback function.

Example

defmodule StatefulEventHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: __MODULE__

  def init(config) do
    config = Keyword.put_new(config, :state, %{})

    {:ok, config}
  end

  def handle(event, metadata) do
    %{state: state} = metadata

    new_state = mutate_state(state)

    {:ok, new_state}
  end
end

Concurrency

An event handler may be configured to start multiple processes to handle the events concurrently. By default one process will be started, processing events one at a time in order. The :concurrency option determines how many event handler processes are started. It must be a positive integer.

Note with concurrent processing events will likely by processed out of order. If you need to enforce an order, such as per stream or by using a field from an event, you can define a partition_by/2 callback function in the event handler module. The function will receive each event and its metadata and must return a consistent term indicating the event's partition. Events which return the same term are guaranteed to be processed in order by the same event handler instance. While events with different partitions may be processed concurrently by another instance. An attempt will be made to distribute events as evenly as possible to all running event handler instances.

Only :eventual consistency is supported when multiple handler processes are configured with a :concurrency of greater than one.

Example

defmodule ConcurrentProcssingEventHandler do
  alias Commanded.EventStore.RecordedEvent

  use Commanded.Event.Handler,
    application: ExampleApp,
    name: __MODULE__,
    concurrency: 10

  def init(config) do
    # Fetch the index of this event handler instance (0..9 in this example)
    index = Keyword.fetch!(config, :index)

    {:ok, config}
  end

  def handle(event, metadata) do
    :ok
  end

  # Partition events by their stream
  def partition_by(event, metadata) do
    %{stream_id: stream_id} = metadata

    stream_id
  end
end

Consistency

For each event handler you can define its consistency, as one of either :strong or :eventual.

This setting is used when dispatching commands and specifying the consistency option.

When you dispatch a command using :strong consistency, after successful command dispatch the process will block until all event handlers configured to use :strong consistency have processed the domain events created by the command. This is useful when you have a read model updated by an event handler that you wish to query for data affected by the command dispatch. With :strong consistency you are guaranteed that the read model will be up-to-date after the command has successfully dispatched. It can be safely queried for data updated by any of the events created by the command.

The default setting is :eventual consistency. Command dispatch will return immediately upon confirmation of event persistence, not waiting for any event handlers.

Note strong consistency does not imply a transaction covers the command dispatch and event handling. It only guarantees that the event handler will have processed all events produced by the command: if event handling fails the events will have still been persisted.

Example

Define an event handler with :strong consistency:

defmodule ExampleHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: "ExampleHandler",
    consistency: :strong
end

Dynamic application

An event handler's application can be provided as an option to start_link/1. This can be used to start the same handler multiple times, each using a separate Commanded application and event store.

Example

Start an event handler process for each tenant in a multi-tenanted app, guaranteeing that the data and processing remains isolated between tenants.

for tenant <- [:tenant1, :tenant2, :tenant3] do
  {:ok, _app} = MyApp.Application.start_link(name: tenant)
  {:ok, _handler} = ExampleHandler.start_link(application: tenant)
end

Typically you would start the event handlers using a supervisor:

children =
  for tenant <- [:tenant1, :tenant2, :tenant3] do
    {ExampleHandler, application: tenant}
  end

Supervisor.start_link(children, strategy: :one_for_one)

The above example requires three named Commanded applications to have already been started.

Telemetry

  • [:commanded, :event, :handle, :start]

    • Description: Emitted when an event handler starts handling an event
    • Measurements: %{system_time: integer()}
    • Metadata: %{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), recorded_event: RecordedEvent.t()}
  • [:commanded, :event, :handle, :stop]

    • Description: Emitted when an event handler stops handling an event
    • Measurements: %{duration: non_neg_integer()}
    • Metadata: %{:application => Commanded.Application.t(), :context => map(), :handler_name => String.t(), :handler_module => atom(), :handler_state => map(), :recorded_event => RecordedEvent.t(), optional(:error) => any()}
  • [:commanded, :event, :handle, :exception]

    • Description: Emitted when an event handler raises an exception
    • Measurements: %{duration: non_neg_integer()}
    • Metadata: %{application: Commanded.Application.t(), context: map(), handler_name: String.t(), handler_module: atom(), handler_state: map(), recorded_event: RecordedEvent.t(), kind: :throw | :error | :exit, reason: any(), stacktrace: list()}

Summary

Callbacks

Optional initialisation callback function called when the handler starts.

Called before an event handler gets reset

Called when an event handle/2 callback returns an error.

Handle a domain event and its metadata.

init() deprecated

Optional callback function called to configure the handler before it starts.

Determine which partition an event belongs to.

Functions

Returns a specification to start this module under a supervisor.

Types

@type consistency() :: :eventual | :strong
@type domain_event() :: struct()
@type metadata() :: map()
@type subscribe_from() :: :origin | :current | non_neg_integer()

Callbacks

Link to this callback

after_start(handler_state)

View Source
@callback after_start(handler_state :: term()) ::
  :ok | {:ok, state :: map()} | {:stop, reason :: any()}

Optional initialisation callback function called when the handler starts.

Can be used to start any related processes when the event handler is started.

This callback function must return :ok, {:ok, state} to return new state, or {:stop, reason} to stop the handler process. Any other return value will terminate the event handler with an error.

Example

defmodule ExampleHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: "ExampleHandler"

  # Optional initialisation
  def after_start(handler_state) do
    new_handler_state = Map.put(handler_state, :foo, "bar")
    {:ok, new_handler_state}
  end

  def handle(%AnEvent{..}, _metadata) do
    # Process the event ...
    :ok
  end
end
Link to this callback

before_reset()

View Source (optional)
@callback before_reset() :: :ok

Called before an event handler gets reset

Link to this callback

error(error, failed_event, failure_context)

View Source (optional)
@callback error(
  error :: term(),
  failed_event :: domain_event(),
  failure_context :: Commanded.Event.FailureContext.t()
) ::
  {:retry, context :: map() | Commanded.Event.FailureContext.t()}
  | {:retry, delay :: non_neg_integer(),
     context :: map() | Commanded.Event.FailureContext.t()}
  | :skip
  | {:stop, reason :: term()}

Called when an event handle/2 callback returns an error.

The error/3 function allows you to control how event handling failures are handled. The function is passed the error returned by the event handler (e.g. {:error, :failure}), the event causing the error, and a context map containing state passed between retries.

Use pattern matching on the error and/or failed event to explicitly handle certain errors or events. Use the context map to track any transient state you need to access between retried failures.

You can return one of the following responses depending upon the error severity:

  • {:retry, context} - retry the failed event, provide a context map, or updated Commanded.Event.FailureContext struct, containing any state to be passed to subsequent failures. This could be used to count the number of failures, stopping after too many.

  • {:retry, delay, context} - retry the failed event, after sleeping for the requested delay (in milliseconds). Context is a map or Commanded.Event.FailureContext struct as described in {:retry, context} above.

  • :skip - skip the failed event by acknowledging receipt.

  • {:stop, reason} - stop the event handler with the given reason.

The default behaviour if you don't provide an error/3 callback is to stop the event handler using the exact error reason returned from the handle/2 function. If the event handler is supervised using restart permanent or transient stopping on error will cause the handler to be restarted. It will likely crash again as it will reprocesse the problematic event. This can lead to cascading failures going up the supervision tree.

Example error handling

defmodule ExampleHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: __MODULE__

  require Logger

  alias Commanded.Event.FailureContext

  def handle(%AnEvent{}, _metadata) do
    # simulate event handling failure
    {:error, :failed}
  end

  def error({:error, :failed}, %AnEvent{} = event, %FailureContext{context: context}) do
    context = record_failure(context)

    case Map.get(context, :failures) do
      too_many when too_many >= 3 ->
        # skip bad event after third failure
        Logger.warning("Skipping bad event, too many failures: " <> inspect(event))

        :skip

      _ ->
        # retry event, failure count is included in context map
        {:retry, context}
    end
  end

  defp record_failure(context) do
    Map.update(context, :failures, 1, fn failures -> failures + 1 end)
  end
end
Link to this callback

handle(domain_event, metadata)

View Source
@callback handle(domain_event(), metadata()) ::
  :ok
  | {:ok, new_state :: any()}
  | {:error, :already_seen_event}
  | {:error, reason :: any()}

Handle a domain event and its metadata.

Return :ok on success, {:error, :already_seen_event} to ack and skip the event, or {:error, reason} on failure.

This callback is deprecated. Use the after_start/1 callback instead..
@callback init() :: :ok | {:stop, reason :: any()}
@callback init(config :: Keyword.t()) :: {:ok, Keyword.t()}

Optional callback function called to configure the handler before it starts.

It is passed the merged compile-time and runtime config, and must return the updated config as {:ok, config}.

Note this function is called before the event handler process is started and is not run from the handler's process. You cannot use self() to access the handler's PID.

Example

The init/1 function is used to define the handler's application and name based upon a value provided at runtime:

defmodule ExampleHandler do
  use Commanded.Event.Handler

  def init(config) do
    {tenant, config} = Keyword.pop!(config, :tenant)

    config =
      config
      |> Keyword.put(:application, Module.concat([ExampleApp, tenant]))
      |> Keyword.put(:name, Module.concat([__MODULE__, tenant]))

    {:ok, config}
  end
end

Usage:

{:ok, _pid} = ExampleHandler.start_link(tenant: :tenant1)
Link to this callback

partition_by(domain_event, metadata)

View Source (optional)
@callback partition_by(domain_event(), metadata()) :: any()

Determine which partition an event belongs to.

Only applicable when an event handler has been configured with more than one instance via the :concurrency option.

Functions

Returns a specification to start this module under a supervisor.

See Supervisor.