Commanded.Aggregates.Aggregate (Commanded v1.3.1) View Source

Aggregate is a GenServer process used to provide access to an instance of an event sourced aggregate.

It allows execution of commands against an aggregate instance, and handles persistence of created events to the configured event store. Concurrent commands sent to an aggregate instance are serialized and executed in the order received.

The Commanded.Commands.Router module will locate, or start, an aggregate instance when a command is dispatched. By default, an aggregate process will run indefinitely once started. Its lifespan may be controlled by using the Commanded.Aggregates.AggregateLifespan behaviour.

Snapshotting

You can configure state snapshots for an aggregate in config. By default snapshots are not taken for an aggregate. The following options are available to enable snapshots:

  • snapshot_every - snapshot aggregate state every so many events. Use nil to disable snapshotting, or exclude the configuration entirely.

  • snapshot_version - a non-negative integer indicating the version of the aggregate state snapshot. Incrementing this version forces any earlier recorded snapshots to be ignored when rebuilding aggregate state.

Example

In config/config.exs enable snapshots for MyApp.ExampleAggregate after every ten events:

config :my_app, MyApp.Application,
  snapshotting: %{
    MyApp.ExampleAggregate => [
      snapshot_every: 10,
      snapshot_version: 1
    ]
  }

Telemetry

  • [:commanded, :aggregate, :execute, :start]

    • Description: Emitted when an aggregate starts executing a command
    • Measurements: %{system_time: integer()}
    • Metadata: %{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t()}
  • [:commanded, :aggregate, :execute, :stop]

    • Description: Emitted when an aggregate stops executing a command
    • Measurements: %{duration: non_neg_integer()}
    • Metadata: %{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t(), events: [map()], error: nil | any()}
  • [:commanded, :aggregate, :execute, :exception]

    • Description: Emitted when an aggregate raises an exception
    • Measurements: %{duration: non_neg_integer()}
    • Metadata: %{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t(), kind: :throw | :error | :exit, reason: any(), stacktrace: list()}

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

execute(application, aggregate_module, aggregate_uuid, context, timeout \\ 5000)

View Source

Execute the given command against the aggregate.

  • aggregate_module - the aggregate's module (e.g. BankAccount).
  • aggregate_uuid - uniquely identifies an instance of the aggregate.
  • context - includes command execution arguments (see Commanded.Aggregates.ExecutionContext for details).
  • timeout - an non-negative integer which specifies how many milliseconds to wait for a reply, or the atom :infinity to wait indefinitely. The default value is five seconds (5,000ms).

Return values

Returns {:ok, aggregate_version, events} on success, or {:error, error} on failure.

  • aggregate_version - the updated version of the aggregate after executing the command.
  • events - events produced by the command, can be an empty list.
Link to this function

start_link(config, opts)

View Source