Commanded.EventStore (Commanded v1.4.9)

View Source

Use the event store configured for a Commanded application.

Telemetry Events

Adds telemetry events for the following functions. Events are emitted in the form

[:commanded, :event_store, event] with their spannable postfixes (start, stop, exception)

  • ack_event/3
  • adapter/2
  • append_to_stream/4
  • delete_snapshot/2
  • delete_subscription/3
  • read_snapshot/2
  • record_snapshot/2
  • stream_forward/2
  • stream_forward/3
  • stream_forward/4
  • subscribe/2
  • subscribe_to/5
  • subscribe_to/6
  • unsubscribe/2

Summary

Functions

Acknowledge receipt and successful processing of the given event received from a subscription to an event stream.

Get the configured event store adapter for the given application.

Delete a previously recorded snapshot for a given source

Read a snapshot, if available, for a given source.

Record a snapshot of the data and metadata for a given source

Streams events from the given stream, in the order in which they were originally written.

Create a transient subscription to a single event stream.

Unsubscribe an existing subscriber from event notifications.

Types

application()

@type application() :: Commanded.Application.t()

config()

@type config() :: Keyword.t()

Functions

ack_event(application, subscription, event)

Acknowledge receipt and successful processing of the given event received from a subscription to an event stream.

adapter(application, config)

@spec adapter(application(), config()) :: {module(), config()}

Get the configured event store adapter for the given application.

append_to_stream(application, stream_uuid, expected_version, events, opts \\ [])

Append one or more events to a stream atomically.

delete_snapshot(application, source_uuid)

Delete a previously recorded snapshot for a given source

delete_subscription(application, subscribe_to, handler_name)

Delete an existing subscription.

Example

:ok = Commanded.EventStore.delete_subscription(MyApp, :all, "Example")

read_snapshot(application, source_uuid)

Read a snapshot, if available, for a given source.

record_snapshot(application, snapshot)

Record a snapshot of the data and metadata for a given source

stream_forward(application, stream_uuid, start_version \\ 0, read_batch_size \\ 1000)

Streams events from the given stream, in the order in which they were originally written.

subscribe(application, stream_uuid)

Create a transient subscription to a single event stream.

The event store will publish any events appended to the given stream to the subscriber process as an {:events, events} message.

The subscriber does not need to acknowledge receipt of the events.

subscribe_to(application, stream_uuid, subscription_name, subscriber, start_from, opts \\ [])

Create a persistent subscription to an event stream.

To subscribe to all events appended to any stream use :all as the stream when subscribing.

The event store will remember the subscribers last acknowledged event. Restarting the named subscription will resume from the next event following the last seen.

Once subscribed, the subscriber process should be sent a {:subscribed, subscription} message to allow it to defer initialisation until the subscription has started.

The subscriber process will be sent all events persisted to the stream. It will receive a {:events, events} message for each batch of events persisted for a single aggregate.

The subscriber must ack each received, and successfully processed event, using Commanded.EventStore.ack_event/3.

Examples

Subscribe to all streams:

{:ok, subscription} =
  Commanded.EventStore.subscribe_to(MyApp, :all, "Example", self(), :current)

Subscribe to a single stream:

{:ok, subscription} =
  Commanded.EventStore.subscribe_to(MyApp, "stream1", "Example", self(), :origin)

unsubscribe(application, subscription)

Unsubscribe an existing subscriber from event notifications.

This will not delete the subscription.

Example

:ok = Commanded.EventStore.unsubscribe(MyApp, subscription)