Maestro.Aggregate.Root behaviour (Maestro v0.5.0)

View Source

Core behaviour and functionality provided by Maestro for processing commands and managing aggregate state.

Traditional domain entities are referred to as aggregates in the literature. At the outermost edge of a bounded context, you find an aggregate root. The goal of this library is to greatly simplify the process of implementing an event sourced application by owning the flow of non-domain data (i.e. commands, events, and snapshots) to allow you to focus on the business logic of evaluating your commands and applying the subsequent events to your domain objects.

The most crucial piece to this is the aggregate root. Maestro.Aggregate.CommandHandler defines a behaviour with the goal of isolating a single command handler's eval. Similarly, there is the Maestro.Aggregate.EventHandler behaviour which defines how to apply that event to the aggregate. With these key components modeled explicitly, the Maestro.Aggregate.Root focuses on the dataflow and ensuring that queries to aggregate state flow properly.

The aggregate root dispatches to the particular command handlers and event handlers by means of an opinionated dynamic dispatch. To ensure that these things are handled in a consistent manner, the aggregate root is modeled as a GenServer and provides the requisite lifecycle hooks.

use Maestro.Aggregate.Root takes the following options:

  • :command_prefix - module prefix for finding commands
  • :event_prefix - module prefix for finding events
  • :projections - zero or more modules that implement the ProjectionHandler behaviour for the events that are generated by this aggregate root. These projections are invoked within the transaction that commits the events.

Summary

Callbacks

If you extend the aggregate to provide other functionality, call is available to assist in pushing that functionality into the aggregate's context.

Evaluate the command within the aggregate's context. With the option :return return either the events or the state.

Forces the aggregate to retrieve any events. Since Maestro operates in a node-local manner, it's entirely possible some other node has processed commands/events.

A (potentially) stale read of the aggregate's state. If you want to ensure the state is as up-to-date as possible, see fetch/1.

When an aggregate root is created, this callback is invoked to generate the state

Create a new aggregate along with the provided initial_state function. This function should only fail if there was a problem generating an HLC timestamp.

Snapshots are stored in a single-row-per-aggregate manner and are used to make it easier/faster to hydrate the aggregate root. This function should return the map which will be JSON encoded when moving to a durable store.

Recover a past version of the aggregate's state by specifying a maximum sequence number. The aggregate's snapshot and any/all events will be used to get the state back to that point.

Using the aggregate root's prepare_snapshot function, generate and store a snapshot. Useful if there are a lot of events, big events, or just a healthy amount of aggregate state to compose.

Moving from the snapshotted representation to the aggregate root's structure can be a complicated process that requires custom hooks. Otherwise, a default implementation is provided that simply lifts the map out of the snapshot and uses it as the state of the aggregate.

Functions

struct to event type of the form "The.ModuleName -> the.module_name" dropping the provided prefix for conciseness

Look up an aggregate by its ID. The module is provided to start the right type of aggregate should it not already be started.

Types

command()

@type command() :: Maestro.Types.Command.t()

evaluate_opt()

@type evaluate_opt() :: {:return, :events | :state}

evaluate_opts()

@type evaluate_opts() :: [evaluate_opt()]

id()

@type id() :: HLClock.Timestamp.t()

sequence()

@type sequence() :: non_neg_integer()

stack()

@type stack() :: Exception.stacktrace()

t()

@type t() :: %Maestro.Aggregate.Root{
  command_prefix: module(),
  event_prefix: module(),
  id: id(),
  module: module(),
  projections: [module()],
  sequence: sequence(),
  state: any()
}

Callbacks

call(id, msg)

@callback call(id(), msg :: any()) :: any()

If you extend the aggregate to provide other functionality, call is available to assist in pushing that functionality into the aggregate's context.

evaluate(command)

@callback evaluate(command()) ::
  :ok
  | {:ok, [Maestro.Types.Event.t()]}
  | {:ok, state :: any()}
  | {:error, any(), stack()}

Evaluate the command within the aggregate's context. With the option :return return either the events or the state.

evaluate(command, evaluate_opts)

@callback evaluate(command(), evaluate_opts()) ::
  :ok
  | {:ok, [Maestro.Types.Event.t()]}
  | {:ok, state :: any()}
  | {:error, any(), stack()}

fetch(id)

@callback fetch(id()) :: {:ok, any()} | {:error, any(), stack()}

Forces the aggregate to retrieve any events. Since Maestro operates in a node-local manner, it's entirely possible some other node has processed commands/events.

get(id)

@callback get(id()) :: {:ok, any()} | {:error, any(), stack()}

A (potentially) stale read of the aggregate's state. If you want to ensure the state is as up-to-date as possible, see fetch/1.

initial_state()

(optional)
@callback initial_state() :: any()

When an aggregate root is created, this callback is invoked to generate the state

new()

@callback new() :: {:ok, id()} | {:error, any()}

Create a new aggregate along with the provided initial_state function. This function should only fail if there was a problem generating an HLC timestamp.

prepare_snapshot(root)

(optional)
@callback prepare_snapshot(root :: t()) :: map()

Snapshots are stored in a single-row-per-aggregate manner and are used to make it easier/faster to hydrate the aggregate root. This function should return the map which will be JSON encoded when moving to a durable store.

replay(id, sequence)

@callback replay(id(), sequence()) :: {:ok, any()} | {:error, any(), stack()}

Recover a past version of the aggregate's state by specifying a maximum sequence number. The aggregate's snapshot and any/all events will be used to get the state back to that point.

snapshot(id)

@callback snapshot(id()) :: :ok | {:error, any(), stack()}

Using the aggregate root's prepare_snapshot function, generate and store a snapshot. Useful if there are a lot of events, big events, or just a healthy amount of aggregate state to compose.

use_snapshot(root, snapshot)

(optional)
@callback use_snapshot(root :: t(), snapshot :: Maestro.Types.Snapshot.t()) :: any()

Moving from the snapshotted representation to the aggregate root's structure can be a complicated process that requires custom hooks. Otherwise, a default implementation is provided that simply lifts the map out of the snapshot and uses it as the state of the aggregate.

Functions

child_spec(opts)

event_type(pre, str)

@spec event_type(
  prefix :: module(),
  struct()
) :: String.t()

struct to event type of the form "The.ModuleName -> the.module_name" dropping the provided prefix for conciseness

start_link(opts)

whereis(agg_id, mod)

Look up an aggregate by its ID. The module is provided to start the right type of aggregate should it not already be started.