Command registration and dispatch
Commands
Create a module per command and define the fields with defstruct. A command must contain a field to uniquely identify the aggregate instance (e.g. account_number).
defmodule OpenAccount do
defstruct [:account_number, :initial_balance]
end
Command handlers
Implement the Commanded.Commands.Handler behaviour consisting of a single handle/2 function.
It receives the aggregate root state and the command to be handled. It must return the raised domain events from the aggregate root. It may return an {:error, reason} tuple on failure.
defmodule OpenAccountHandler do
@behaviour Commanded.Commands.Handler
def handle(%BankAccount{} = aggregate, %OpenAccount{account_number: account_number, initial_balance: initial_balance}) do
aggregate
|> BankAccount.open_account(account_number, initial_balance)
end
end
Command dispatch and routing
You must create a router to register each command with its associated handler.
defmodule BankRouter do
use Commanded.Commands.Router
dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number
dispatch DepositMoney, to: DepositMoneyHandler, aggregate: BankAccount, identity: :account_number
end
It is also possible to route a command directly to an aggregate root, without requiring an intermediate command handler.
defmodule BankRouter do
use Commanded.Commands.Router
dispatch OpenAccount, to: BankAccount, identity: :account_number
end
The aggregate root must implement an execute/2 function that receives the aggregate’s state and the command to execute.
You can then dispatch a command using the router.
:ok = BankRouter.dispatch(%OpenAccount{account_number: "ACC123", initial_balance: 1_000})
Timeouts
A command handler has a default timeout of 5 seconds. The same default as a GenServer process call. It must handle the command in this period, otherwise the call fails and the caller exits.
You can configure a different timeout value during command registration by providing a timeout option, defined in milliseconds:
defmodule BankRouter do
use Commanded.Commands.Router
# configure a timeout of 1 second for the open account command handler
dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number, timeout: 1_000
end
You can override the timeout value during command dispatch. This example is dispatching the open account command with a timeout of 2 seconds:
:ok = BankRouter.dispatch(%OpenAccount{account_number: "ACC123", initial_balance: 1_000}, timeout: 2_000)
Multi-command registration
Command routers support multi command registration so you can group related command handlers into the same module.
defmodule BankRouter do
use Commanded.Commands.Router
dispatch [OpenAccount,CloseAccount], to: BankAccountHandler, aggregate: BankAccount, identity: :account_number
end
Dispatch returning aggregate version
You can optionally choose to include the aggregate’s version as part of the dispatch result by setting the include_aggregate_version option to true:
{:ok, aggregate_version} = BankRouter.dispatch(command, include_aggregate_version: true)
This is useful when you need to wait for an event handler, such as a read model projection, to be up-to-date before continuing execution or querying its data.
Aggregate lifespan
By default an aggregate instance process will run indefinitely once started. You can control this by implementing the Commanded.Aggregates.AggregateLifespan behaviour in a module.
defmodule BankAccountLifespan do
@behaviour Commanded.Aggregates.AggregateLifespan
def after_command(%OpenAccount{}), do: :infinity
def after_command(%CloseAccount{}), do: 0
end
Then specify the module as the lifespan option when registering the command in the router.
defmodule BankRouter do
use Commanded.Commands.Router
dispatch [OpenAccount,CloseAccount],
to: BankAccountHandler, aggregate: BankAccount, lifespan: BankAccountLifespan, identity: :account_number
end
The timeout is specified in milliseconds, after which time the aggregate process will be stopped if no other messages are received.
You can also return :hibernate and the process is hibernated, it will continue its loop once a message is in its message queue. Hibernating an aggregate causes garbage collection and minimises the memory used by the process. Hibernating should not be used aggressively as too much time could be spent garbage collecting.
Return :infinity to keep the aggregate instance process running indefinitely.
Middleware
Allows a command router to define middleware modules that are executed before and after success or failure of each command dispatch.
This provides an extension point to add in command validation, authorization, logging, and other behaviour that you want to be called for every command the router dispatches.
defmodule BankingRouter do
use Commanded.Commands.Router
middleware CommandLogger
middleware MyCommandValidator
middleware AuthorizeCommand
dispatch OpenAccount, to: OpenAccountHandler, aggregate: BankAccount, identity: :account_number
dispatch DepositMoney, to: DepositMoneyHandler, aggregate: BankAccount, identity: :account_number
end
The middleware modules are executed in the order they’ve been defined. They will receive a Commanded.Middleware.Pipeline struct containing the command being dispatched.
Example middleware
Implement the Commanded.Middleware behaviour in your module and define the before_dispatch, after_dispatch, and after_failure callback functions.
defmodule NoOpMiddleware do
@behaviour Commanded.Middleware
alias Commanded.Middleware.Pipeline
import Pipeline
def before_dispatch(%Pipeline{command: command} = pipeline) do
pipeline
end
def after_dispatch(%Pipeline{} = pipeline) do
pipeline
end
def after_failure(%Pipeline{} = pipeline) do
pipeline
end
end
Commanded provides a Commanded.Middleware.Logger middleware for logging the name of each dispatched command and its execution duration.