Commanded v0.19.1 Commanded.EventStore behaviour View Source

Defines the behaviour to be implemented by an event store adapter to be used by Commanded.

Link to this section Summary

Functions

Acknowledge receipt and successful processing of the given event received from a subscription to an event stream.

Append one or more events to a stream atomically.

Delete a previously recorded snapshot for a given source

Delete an existing subscription.

Get the configured event store adapter

Read a snapshot, if available, for a given source.

Record a snapshot of the data and metadata for a given source

Streams events from the given stream, in the order in which they were originally written.

Create a transient subscription to a single event stream.

Create a persistent subscription to an event stream.

Unsubscribe an existing subscriber from all event notifications.

Callbacks

Acknowledge receipt and successful processing of the given event received from a subscription to an event stream.

Append one or more events to a stream atomically.

Return a child spec defining all processes required by the event store.

Delete a previously recorded snapshot for a given source

Delete an existing subscription.

Read a snapshot, if available, for a given source.

Record a snapshot of the data and metadata for a given source

Streams events from the given stream, in the order in which they were originally written.

Create a transient subscription to a single event stream.

Create a persistent subscription to an event stream.

Unsubscribe an existing subscriber from event notifications.

Link to this section Types

Link to this type

expected_version()

View Source
expected_version() ::
  :any_version | :no_stream | :stream_exists | non_neg_integer()
Link to this type

source_uuid()

View Source
source_uuid() :: String.t()
Link to this type

start_from()

View Source
start_from() :: :origin | :current | integer()
Link to this type

stream_uuid()

View Source
stream_uuid() :: String.t()
Link to this type

subscriber()

View Source
subscriber() :: pid()
Link to this type

subscription()

View Source
subscription() :: any()
Link to this type

subscription_name()

View Source
subscription_name() :: String.t()

Link to this section Functions

Acknowledge receipt and successful processing of the given event received from a subscription to an event stream.

Link to this function

append_to_stream(stream_uuid, expected_version, events)

View Source
append_to_stream(
  stream_uuid(),
  expected_version(),
  events :: [Commanded.EventStore.EventData.t()]
) :: :ok | {:error, :wrong_expected_version} | {:error, error()}

Append one or more events to a stream atomically.

Link to this function

delete_snapshot(source_uuid)

View Source
delete_snapshot(source_uuid()) :: :ok | {:error, error()}

Delete a previously recorded snapshot for a given source

Link to this function

delete_subscription(stream_uuid, subscription_name)

View Source
delete_subscription(stream_uuid() | :all, subscription_name()) ::
  :ok | {:error, :subscription_not_found} | {:error, error()}

Delete an existing subscription.

Get the configured event store adapter

Link to this function

read_snapshot(source_uuid)

View Source
read_snapshot(source_uuid()) ::
  {:ok, snapshot()} | {:error, :snapshot_not_found}

Read a snapshot, if available, for a given source.

Link to this function

record_snapshot(snapshot)

View Source
record_snapshot(snapshot()) :: :ok | {:error, error()}

Record a snapshot of the data and metadata for a given source

Link to this function

stream_forward(stream_uuid, start_version \\ 0, read_batch_size \\ 1000)

View Source
stream_forward(
  stream_uuid(),
  start_version :: non_neg_integer(),
  read_batch_size :: non_neg_integer()
) :: Enumerable.t() | {:error, :stream_not_found} | {:error, error()}

Streams events from the given stream, in the order in which they were originally written.

Link to this function

subscribe(stream_uuid)

View Source
subscribe(stream_uuid() | :all) :: :ok | {:error, error()}

Create a transient subscription to a single event stream.

Link to this function

subscribe_to(stream_uuid, subscription_name, subscriber, start_from)

View Source
subscribe_to(
  stream_uuid() | :all,
  subscription_name(),
  subscriber(),
  start_from()
) ::
  {:ok, subscription()}
  | {:error, :subscription_already_exists}
  | {:error, error()}

Create a persistent subscription to an event stream.

Link to this function

unsubscribe(subscription)

View Source
unsubscribe(subscription()) :: :ok

Unsubscribe an existing subscriber from all event notifications.

Link to this section Callbacks

Acknowledge receipt and successful processing of the given event received from a subscription to an event stream.

Link to this callback

append_to_stream(stream_uuid, expected_version, events)

View Source
append_to_stream(
  stream_uuid(),
  expected_version(),
  events :: [Commanded.EventStore.EventData.t()]
) :: :ok | {:error, :wrong_expected_version} | {:error, error()}

Append one or more events to a stream atomically.

Return a child spec defining all processes required by the event store.

Link to this callback

delete_snapshot(source_uuid)

View Source
delete_snapshot(source_uuid()) :: :ok | {:error, error()}

Delete a previously recorded snapshot for a given source

Link to this callback

delete_subscription(arg1, subscription_name)

View Source
delete_subscription(stream_uuid() | :all, subscription_name()) ::
  :ok | {:error, :subscription_not_found} | {:error, error()}

Delete an existing subscription.

Example

:ok = Commanded.EventStore.delete_subscription(:all, "Example")
Link to this callback

read_snapshot(source_uuid)

View Source
read_snapshot(source_uuid()) ::
  {:ok, snapshot()} | {:error, :snapshot_not_found}

Read a snapshot, if available, for a given source.

Link to this callback

record_snapshot(snapshot)

View Source
record_snapshot(snapshot()) :: :ok | {:error, error()}

Record a snapshot of the data and metadata for a given source

Link to this callback

stream_forward(stream_uuid, start_version, read_batch_size)

View Source
stream_forward(
  stream_uuid(),
  start_version :: non_neg_integer(),
  read_batch_size :: non_neg_integer()
) :: Enumerable.t() | {:error, :stream_not_found} | {:error, error()}

Streams events from the given stream, in the order in which they were originally written.

Link to this callback

subscribe(arg1)

View Source
subscribe(stream_uuid() | :all) :: :ok | {:error, error()}

Create a transient subscription to a single event stream.

The event store will publish any events appended to the given stream to the subscriber process as an {:events, events} message.

The subscriber does not need to acknowledge receipt of the events.

Link to this callback

subscribe_to(arg1, subscription_name, subscriber, start_from)

View Source
subscribe_to(
  stream_uuid() | :all,
  subscription_name(),
  subscriber(),
  start_from()
) ::
  {:ok, subscription()}
  | {:error, :subscription_already_exists}
  | {:error, error()}

Create a persistent subscription to an event stream.

To subscribe to all events appended to any stream use :all as the stream when subscribing.

The event store will remember the subscribers last acknowledged event. Restarting the named subscription will resume from the next event following the last seen.

Once subscribed, the subscriber process should be sent a {:subscribed, subscription} message to allow it to defer initialisation until the subscription has started.

The subscriber process will be sent all events persisted to any stream. It will receive a {:events, events} message for each batch of events persisted for a single aggregate.

The subscriber must ack each received, and successfully processed event, using Commanded.EventStore.ack_event/2.

Examples

Subscribe to all streams:

{:ok, subscription} =
  Commanded.EventStore.subscribe_to(:all, "Example", self(), :current)

Subscribe to a single stream:

{:ok, subscription} =
  Commanded.EventStore.subscribe_to("stream1", "Example", self(), :origin)
Link to this callback

unsubscribe(subscription)

View Source
unsubscribe(subscription()) :: :ok

Unsubscribe an existing subscriber from event notifications.

This will not delete the subscription.

Example

:ok = Commanded.EventStore.unsubscribe(subscription)