Commanded.Aggregates.Aggregate behaviour (Commanded v1.4.9)

View Source

Aggregate is a GenServer process used to provide access to an instance of an event sourced aggregate.

It allows execution of commands against an aggregate instance, and handles persistence of created events to the configured event store. Concurrent commands sent to an aggregate instance are serialized and executed in the order received.

The Commanded.Commands.Router module will locate, or start, an aggregate instance when a command is dispatched. By default, an aggregate process will run indefinitely once started. Its lifespan may be controlled by using the Commanded.Aggregates.AggregateLifespan behaviour.

Snapshotting

You can configure state snapshots for an aggregate in config. By default snapshots are not taken for an aggregate. The following options are available to enable snapshots:

  • snapshot_every - snapshot aggregate state every so many events. Use nil to disable snapshotting, or exclude the configuration entirely.

  • snapshot_version - a non-negative integer indicating the version of the aggregate state snapshot. Incrementing this version forces any earlier recorded snapshots to be ignored when rebuilding aggregate state.

Example

In config/config.exs enable snapshots for MyApp.ExampleAggregate after every ten events:

config :my_app, MyApp.Application,
  snapshotting: %{
    MyApp.ExampleAggregate => [
      snapshot_every: 10,
      snapshot_version: 1
    ]
  }

Telemetry

  • [:commanded, :aggregate, :execute, :start]

    • Description: Emitted when an aggregate starts executing a command
    • Measurements: %{system_time: integer()}
    • Metadata: %{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t()}
  • [:commanded, :aggregate, :execute, :stop]

    • Description: Emitted when an aggregate stops executing a command
    • Measurements: %{duration: non_neg_integer()}
    • Metadata: %{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t(), events: [map()], error: nil | any()}
  • [:commanded, :aggregate, :execute, :exception]

    • Description: Emitted when an aggregate raises an exception
    • Measurements: %{duration: non_neg_integer()}
    • Metadata: %{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t(), kind: :throw | :error | :exit, reason: any(), stacktrace: list()}

Summary

Callbacks

Apply an event to the aggregate state. Returns the updated aggregate.

Optionally execute a command against the aggregate. Returns either no event, one event, a list of events, or an error tuple.

Functions

Returns a specification to start this module under a supervisor.

Types

no_return_event()

@type no_return_event() :: :ok | {:ok, []} | nil | []

return_event()

@type return_event() :: struct() | [struct()] | {:ok, struct()} | {:ok, [struct()]}

state()

@type state() :: struct()

uuid()

@type uuid() :: String.t()

Callbacks

apply(aggregate, event)

@callback apply(aggregate :: state(), event :: struct()) :: state()

Apply an event to the aggregate state. Returns the updated aggregate.

execute(aggregate, command)

(optional)
@callback execute(aggregate :: state(), command :: struct()) ::
  return_event() | no_return_event() | {:error, term()}

Optionally execute a command against the aggregate. Returns either no event, one event, a list of events, or an error tuple.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

execute(application, aggregate_module, aggregate_uuid, context, timeout \\ 5000)

Execute the given command against the aggregate.

  • aggregate_module - the aggregate's module (e.g. BankAccount).
  • aggregate_uuid - uniquely identifies an instance of the aggregate.
  • context - includes command execution arguments (see Commanded.Aggregates.ExecutionContext for details).
  • timeout - an non-negative integer which specifies how many milliseconds to wait for a reply, or the atom :infinity to wait indefinitely. The default value is five seconds (5,000ms).

Return values

Returns {:ok, aggregate_version, events} on success, or {:error, error} on failure.

  • aggregate_version - the updated version of the aggregate after executing the command.
  • events - events produced by the command, can be an empty list.

start_link(config, opts)