Command registration and dispatch

Commands

Create a module per command and define the fields with defstruct. A command must contain a field to uniquely identify the aggregate instance (e.g. account_number).

defmodule OpenAccount do
  defstruct [:account_number, :initial_balance]
end

Command handlers

Implement the Commanded.Commands.Handler behaviour consisting of a single handle/2 function.

It receives the aggregate root state and the command to be handled. It must return the raised domain events from the aggregate root. It may return an {:error, reason} tuple on failure.

defmodule OpenAccountHandler do
  @behaviour Commanded.Commands.Handler

  def handle(%BankAccount{} = aggregate, %OpenAccount{account_number: account_number, initial_balance: initial_balance}) do
    aggregate
    |> BankAccount.open_account(account_number, initial_balance)
  end
end

Command dispatch and routing

You must create a router to register each command with its associated handler.

defmodule BankRouter do
  use Commanded.Commands.Router

  dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number
  dispatch DepositMoney, to: DepositMoneyHandler, aggregate: BankAccount, identity: :account_number
end

It is also possible to route a command directly to an aggregate root, without requiring an intermediate command handler.

defmodule BankRouter do
  use Commanded.Commands.Router

  dispatch OpenAccount, to: BankAccount, identity: :account_number
end

The aggregate root must implement an execute/2 function that receives the aggregate’s state and the command to execute.

You can then dispatch a command using the router.

:ok = BankRouter.dispatch(%OpenAccount{account_number: "ACC123", initial_balance: 1_000})

Timeouts

A command handler has a default timeout of 5 seconds. The same default as a GenServer process call. It must handle the command in this period, otherwise the call fails and the caller exits.

You can configure a different timeout value during command registration by providing a timeout option, defined in milliseconds:

defmodule BankRouter do
  use Commanded.Commands.Router

  # configure a timeout of 1 second for the open account command handler
  dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number, timeout: 1_000
end

You can override the timeout value during command dispatch. This example is dispatching the open account command with a timeout of 2 seconds:

:ok = BankRouter.dispatch(%OpenAccount{account_number: "ACC123", initial_balance: 1_000}, timeout: 2_000)

Multi-command registration

Command routers support multi command registration so you can group related command handlers into the same module.

defmodule BankRouter do
  use Commanded.Commands.Router

  dispatch [OpenAccount,CloseAccount], to: BankAccountHandler, aggregate: BankAccount, identity: :account_number
end

Dispatch returning aggregate version

You can optionally choose to include the aggregate’s version as part of the dispatch result by setting the include_aggregate_version option to true:

{:ok, aggregate_version} = BankRouter.dispatch(command, include_aggregate_version: true)

This is useful when you need to wait for an event handler, such as a read model projection, to be up-to-date before continuing execution or querying its data.

Aggregate lifespan

By default an aggregate instance process will run indefinitely once started. You can control this by implementing the Commanded.Aggregates.AggregateLifespan behaviour in a module.

defmodule BankAccountLifespan do
  @behaviour Commanded.Aggregates.AggregateLifespan

  def after_command(%OpenAccount{}), do: :infinity
  def after_command(%CloseAccount{}), do: 0
end

Then specify the module as the lifespan option when registering the command in the router.

defmodule BankRouter do
  use Commanded.Commands.Router

  dispatch [OpenAccount,CloseAccount],
    to: BankAccountHandler, aggregate: BankAccount, lifespan: BankAccountLifespan, identity: :account_number
end

The timeout is specified in milliseconds, after which time the aggregate process will be stopped if no other messages are received.

You can also return :hibernate and the process is hibernated, it will continue its loop once a message is in its message queue. Hibernating an aggregate causes garbage collection and minimises the memory used by the process. Hibernating should not be used aggressively as too much time could be spent garbage collecting.

Return :infinity to keep the aggregate instance process running indefinitely.

Middleware

Allows a command router to define middleware modules that are executed before and after success or failure of each command dispatch.

This provides an extension point to add in command validation, authorization, logging, and other behaviour that you want to be called for every command the router dispatches.

defmodule BankingRouter do
  use Commanded.Commands.Router

  middleware CommandLogger
  middleware MyCommandValidator
  middleware AuthorizeCommand

  dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number
  dispatch DepositMoney, to: DepositMoneyHandler, aggregate: BankAccount, identity: :account_number
end

The middleware modules are executed in the order they’ve been defined. They will receive a Commanded.Middleware.Pipeline struct containing the command being dispatched.

Example middleware

Implement the Commanded.Middleware behaviour in your module and define the before_dispatch, after_dispatch, and after_failure callback functions.

defmodule NoOpMiddleware do
  @behaviour Commanded.Middleware

  alias Commanded.Middleware.Pipeline
  import Pipeline

  def before_dispatch(%Pipeline{command: command} = pipeline) do
    pipeline
  end

  def after_dispatch(%Pipeline{} = pipeline) do
    pipeline
  end

  def after_failure(%Pipeline{} = pipeline) do
    pipeline
  end
end

Commanded provides a Commanded.Middleware.Logger middleware for logging the name of each dispatched command and its execution duration.