View Source Commanded.Aggregates.Aggregate (Commanded v1.4.1)
Aggregate is a GenServer
process used to provide access to an
instance of an event sourced aggregate.
It allows execution of commands against an aggregate instance, and handles persistence of created events to the configured event store. Concurrent commands sent to an aggregate instance are serialized and executed in the order received.
The Commanded.Commands.Router
module will locate, or start, an aggregate
instance when a command is dispatched. By default, an aggregate process will
run indefinitely once started. Its lifespan may be controlled by using the
Commanded.Aggregates.AggregateLifespan
behaviour.
snapshotting
Snapshotting
You can configure state snapshots for an aggregate in config. By default snapshots are not taken for an aggregate. The following options are available to enable snapshots:
snapshot_every
- snapshot aggregate state every so many events. Usenil
to disable snapshotting, or exclude the configuration entirely.snapshot_version
- a non-negative integer indicating the version of the aggregate state snapshot. Incrementing this version forces any earlier recorded snapshots to be ignored when rebuilding aggregate state.
example
Example
In config/config.exs
enable snapshots for MyApp.ExampleAggregate
after
every ten events:
config :my_app, MyApp.Application,
snapshotting: %{
MyApp.ExampleAggregate => [
snapshot_every: 10,
snapshot_version: 1
]
}
telemetry
Telemetry
[:commanded, :aggregate, :execute, :start]
- Description: Emitted when an aggregate starts executing a command
- Measurements:
%{system_time: integer()}
- Metadata:
%{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t()}
[:commanded, :aggregate, :execute, :stop]
- Description: Emitted when an aggregate stops executing a command
- Measurements:
%{duration: non_neg_integer()}
- Metadata:
%{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t(), events: [map()], error: nil | any()}
[:commanded, :aggregate, :execute, :exception]
- Description: Emitted when an aggregate raises an exception
- Measurements:
%{duration: non_neg_integer()}
- Metadata:
%{application: Commanded.Application.t(), aggregate_uuid: String.t(), aggregate_state: struct(), aggregate_version: non_neg_integer(), caller: pid(), execution_context: Commanded.Aggregates.ExecutionContext.t(), kind: :throw | :error | :exit, reason: any(), stacktrace: list()}
Link to this section Summary
Functions
Returns a specification to start this module under a supervisor.
Execute the given command against the aggregate.
Link to this section Types
Link to this section Functions
Returns a specification to start this module under a supervisor.
See Supervisor
.
execute(application, aggregate_module, aggregate_uuid, context, timeout \\ 5000)
View SourceExecute the given command against the aggregate.
aggregate_module
- the aggregate's module (e.g.BankAccount
).aggregate_uuid
- uniquely identifies an instance of the aggregate.context
- includes command execution arguments (seeCommanded.Aggregates.ExecutionContext
for details).timeout
- an non-negative integer which specifies how many milliseconds to wait for a reply, or the atom :infinity to wait indefinitely. The default value is five seconds (5,000ms).
return-values
Return values
Returns {:ok, aggregate_version, events}
on success, or {:error, error}
on failure.
aggregate_version
- the updated version of the aggregate after executing the command.events
- events produced by the command, can be an empty list.