EventStore v1.0.3 EventStore behaviour View Source

EventStore allows you to define one or more event store modules to append, read, and subscribe to streams of events.

It uses PostgreSQL (v9.5 or later) as the underlying storage engine.

Defining an event store

We can define an event store in our own application as follows:

defmodule MyApp.EventStore do
  use EventStore,
    otp_app: :my_app

  # Optional `init/1` function to modify config at runtime.
  def init(config) do
    {:ok, config}
  end
end

Where the configuration for the event store must be in your application environment, usually defined in config/config.exs:

config :my_app, MyApp.EventStore,
  serializer: EventStore.JsonSerializer,
  username: "postgres",
  password: "postgres",
  database: "eventstore",
  hostname: "localhost",
  # OR use a URL to connect instead
  url: "postgres://postgres:postgres@localhost/eventstore",
  pool_size: 1

The event store module defines a start_link/1 function that needs to be invoked before using the event store. In general, this function is not called directly, but included as part of your application supervision tree.

If your application was generated with a supervisor (by passing --sup to mix new) you will have a lib/my_app/application.ex file containing the application start callback that defines and starts your supervisor. You just need to edit the start/2 function to start the event store in your application's supervisor:

  def start(_type, _args) do
    children = [
      MyApp.EventStore
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

Each event store module (e.g. MyApp.EventStore) provides a public API to read events from and write events to an event stream, and subscribe to event notifications.

Please refer to the following guides to learn more:

Link to this section Summary

Callbacks

Acknowledge receipt of the given events received from a subscription.

Append one or more events to a stream atomically.

Returns the event store configuration stored in the :otp_app environment.

Delete an existing persistent subscription to all streams.

Delete a previously recorded snapshop for a given source.

Delete an existing persistent subscription.

A callback executed when the event store starts or when configuration is read.

Link one or more existing events to another stream.

Reads the requested number of events from all streams, in the order in which they were originally written.

Read a snapshot, if available, for a given source.

Reads the requested number of events from the given stream, in the order in which they were originally written.

Record a snapshot of the data and metadata for a given source.

Starts any connection pooling or supervision and return {:ok, pid} or just :ok if nothing needs to be done.

Shuts down the event store.

Streams events from all streams, in the order in which they were originally written.

Streams events from the given stream, in the order in which they were originally written.

Create a transient subscription to a given stream.

Create a persistent subscription to all streams.

Create a persistent subscription to a single stream.

Unsubscribe an existing subscriber from all event notifications.

Unsubscribe an existing subscriber from event notifications.

Link to this section Types

Link to this type

expected_version()

View Source
expected_version() ::
  :any_version | :no_stream | :stream_exists | non_neg_integer()
Link to this type

start_from()

View Source
start_from() :: :origin | :current | non_neg_integer()

Link to this section Callbacks

Link to this callback

ack(subscription, arg2)

View Source
ack(
  subscription :: pid(),
  EventStore.RecordedEvent.t()
  | [EventStore.RecordedEvent.t()]
  | non_neg_integer()
) :: :ok | {:error, reason :: term()}

Acknowledge receipt of the given events received from a subscription.

Accepts a single EventStore.RecordedEvent struct, a list of EventStore.RecordedEvents, or the event number of the recorded event to acknowledge.

Link to this callback

append_to_stream(stream_uuid, expected_version, events, timeout)

View Source
append_to_stream(
  stream_uuid :: String.t(),
  expected_version(),
  events :: [EventStore.EventData.t()],
  timeout :: timeout() | nil
) ::
  :ok
  | {:error, :cannot_append_to_all_stream}
  | {:error, :stream_exists}
  | {:error, :stream_does_not_exist}
  | {:error, :wrong_expected_version}
  | {:error, reason :: term()}

Append one or more events to a stream atomically.

  • stream_uuid is used to uniquely identify a stream.

  • expected_version is used for optimistic concurrency checks. You can provide a non-negative integer to specify the expected stream version. This is used to ensure you can only append to the stream if it is at exactly that version.

    You can also provide one of the following values to alter the concurrency check behaviour:

    • :any_version - No concurrency checking and allow any stream version (including no stream).
    • :no_stream - Ensure the stream does not exist.
    • :stream_exists - Ensure the stream exists.
  • events is a list of %EventStore.EventData{} structs.

  • timeout an optional timeout for the database transaction, in milliseconds. Defaults to 15_000ms.

Returns :ok on success, or an {:error, reason} tagged tuple. The returned error may be due to one of the following reasons:

  • {:error, :wrong_expected_version} when the actual stream version differs from the provided expected version.
  • {:error, :stream_exists} when the stream exists, but expected version was :no_stream.
  • {:error, :stream_does_not_exist} when the stream does not exist, but expected version was :stream_exists.

Returns the event store configuration stored in the :otp_app environment.

Link to this callback

delete_all_streams_subscription(subscription_name)

View Source
delete_all_streams_subscription(subscription_name :: String.t()) ::
  :ok | {:error, term()}

Delete an existing persistent subscription to all streams.

  • subscription_name is used to identify the existing subscription to remove.

Returns :ok on success.

Link to this callback

delete_snapshot(source_uuid)

View Source
delete_snapshot(source_uuid :: String.t()) :: :ok | {:error, reason :: term()}

Delete a previously recorded snapshop for a given source.

Returns :ok on success, or when the snapshot does not exist.

Link to this callback

delete_subscription(stream_uuid, subscription_name)

View Source
delete_subscription(stream_uuid :: String.t(), subscription_name :: String.t()) ::
  :ok | {:error, term()}

Delete an existing persistent subscription.

  • stream_uuid is the stream the subscription is subscribed to.

  • subscription_name is used to identify the existing subscription to remove.

Returns :ok on success.

Link to this callback

init(config)

View Source (optional)
init(config :: Keyword.t()) :: {:ok, Keyword.t()}

A callback executed when the event store starts or when configuration is read.

It must return {:ok, keyword} with the updated list of configuration.

Link to this callback

read_all_streams_forward(start_event_number, count, timeout)

View Source
read_all_streams_forward(
  start_event_number :: non_neg_integer(),
  count :: non_neg_integer(),
  timeout :: timeout() | nil
) :: {:ok, [EventStore.RecordedEvent.t()]} | {:error, reason :: term()}

Reads the requested number of events from all streams, in the order in which they were originally written.

  • start_event_number optionally, the number of the first event to read. Defaults to the beginning of the stream if not set.

  • count optionally, the maximum number of events to read. If not set it will be limited to returning 1,000 events from all streams.

  • timeout an optional timeout for querying the database, in milliseconds. Defaults to 15_000ms.

Link to this callback

read_snapshot(source_uuid)

View Source
read_snapshot(source_uuid :: String.t()) ::
  {:ok, EventStore.Snapshots.SnapshotData.t()} | {:error, :snapshot_not_found}

Read a snapshot, if available, for a given source.

Returns {:ok, %EventStore.Snapshots.SnapshotData{}} on success, or {:error, :snapshot_not_found} when unavailable.

Link to this callback

read_stream_forward(stream_uuid, start_version, count, timeout)

View Source
read_stream_forward(
  stream_uuid :: String.t(),
  start_version :: non_neg_integer(),
  count :: non_neg_integer(),
  timeout :: timeout() | nil
) :: {:ok, [EventStore.RecordedEvent.t()]} | {:error, reason :: term()}

Reads the requested number of events from the given stream, in the order in which they were originally written.

  • stream_uuid is used to uniquely identify a stream.

  • start_version optionally, the version number of the first event to read. Defaults to the beginning of the stream if not set.

  • count optionally, the maximum number of events to read. If not set it will be limited to returning 1,000 events from the stream.

  • timeout an optional timeout for querying the database, in milliseconds. Defaults to 15_000ms.

Link to this callback

record_snapshot(snapshot)

View Source
record_snapshot(snapshot :: EventStore.Snapshots.SnapshotData.t()) ::
  :ok | {:error, reason :: term()}

Record a snapshot of the data and metadata for a given source.

Returns :ok on success.

Link to this callback

start_link(opts)

View Source
start_link(opts :: Keyword.t()) ::
  {:ok, pid()} | {:error, {:already_started, pid()}} | {:error, term()}

Starts any connection pooling or supervision and return {:ok, pid} or just :ok if nothing needs to be done.

Returns {:error, {:already_started, pid}} if the event store is already started or {:error, term} in case anything else goes wrong.

Link to this callback

stop(timeout)

View Source
stop(timeout()) :: :ok

Shuts down the event store.

Link to this callback

stream_all_forward(start_event_number, read_batch_size, timeout)

View Source
stream_all_forward(
  start_event_number :: non_neg_integer(),
  read_batch_size :: non_neg_integer(),
  timeout :: timeout() | nil
) :: Enumerable.t()

Streams events from all streams, in the order in which they were originally written.

  • start_event_number optionally, the number of the first event to read. Defaults to the beginning of the stream if not set.

  • read_batch_size optionally, the number of events to read at a time from storage. Defaults to reading 1,000 events per batch.

  • timeout an optional timeout for querying the database (per batch), in milliseconds. Defaults to 15_000ms.

Link to this callback

stream_forward(stream_uuid, start_version, read_batch_size, timeout)

View Source
stream_forward(
  stream_uuid :: String.t(),
  start_version :: non_neg_integer(),
  read_batch_size :: non_neg_integer(),
  timeout :: timeout() | nil
) :: Enumerable.t() | {:error, reason :: term()}

Streams events from the given stream, in the order in which they were originally written.

  • start_version optionally, the version number of the first event to read. Defaults to the beginning of the stream if not set.

  • read_batch_size optionally, the number of events to read at a time from storage. Defaults to reading 1,000 events per batch.

  • timeout an optional timeout for querying the database (per batch), in milliseconds. Defaults to 15_000ms.

Link to this callback

subscribe(stream_uuid, list)

View Source
subscribe(stream_uuid :: String.t(),
  selector: (EventStore.RecordedEvent.t() -> any()),
  mapper: (EventStore.RecordedEvent.t() -> any())
) :: :ok | {:error, term()}

Create a transient subscription to a given stream.

  • stream_uuid is the stream to subscribe to. Use the $all identifier to subscribe to events from all streams.

  • opts is an optional map providing additional subscription configuration:

    • selector to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy value
    • mapper to define a function to map each recorded event before sending to the subscriber.

The calling process will be notified whenever new events are appended to the given stream_uuid.

As the subscription is transient you do not need to acknowledge receipt of each event. The subscriber process will miss any events if it is restarted and resubscribes. If you need a persistent subscription with guaranteed at-least-once event delivery and back-pressure you should use EventStore.subscribe_to_stream/4.

Notification message

Events will be sent to the subscriber, in batches, as {:events, events} where events is a collection of EventStore.RecordedEvent structs.

Example

{:ok, subscription} = EventStore.subscribe(stream_uuid)

# receive first batch of events
receive do
  {:events, events} ->
    IO.puts "Received events: " <> inspect(events)
end
Link to this callback

subscribe_to_all_streams(subscription_name, subscriber, opts)

View Source
subscribe_to_all_streams(
  subscription_name :: String.t(),
  subscriber :: pid(),
  opts :: keyword()
) ::
  {:ok, subscription :: pid()}
  | {:error, :already_subscribed}
  | {:error, :subscription_already_exists}
  | {:error, :too_many_subscribers}
  | {:error, reason :: term()}

Create a persistent subscription to all streams.

The subscriber process will be notified of each batch of events appended to any stream.

  • subscription_name is used to uniquely identify the subscription.

  • subscriber is a process that will be sent {:events, events} notification messages.

  • opts is an optional map providing additional subscription configuration:

    • start_from is a pointer to the first event to receive. It must be one of:

      • :origin for all events from the start of the stream (default).
      • :current for any new events appended to the stream after the subscription has been created.
      • any positive integer for an event id to receive events after that exact event.
    • selector to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy value

    • mapper to define a function to map each recorded event before sending to the subscriber.

    • concurrency_limit defines the maximum number of concurrent subscribers allowed to connect to the subscription. By default only one subscriber may connect. If too many subscribers attempt to connect to the subscription an {:error, :too_many_subscribers} is returned.

The subscription will resume from the last acknowledged event if it already exists. It will ignore the start_from argument in this case.

Returns {:ok, subscription} when subscription succeeds.

Example

{:ok, subscription} = EventStore.subscribe_to_all_streams("all_subscription", self())

# wait for the subscription confirmation
receive do
  {:subscribed, ^subscription} ->
    IO.puts "Successfully subscribed to all streams"
end

receive do
  {:events, events} ->
    IO.puts "Received events: " <> inspect(events)

    # acknowledge receipt
    EventStore.ack(subscription, events)
end
Link to this callback

subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts)

View Source
subscribe_to_stream(
  stream_uuid :: String.t(),
  subscription_name :: String.t(),
  subscriber :: pid(),
  opts :: keyword()
) ::
  {:ok, subscription :: pid()}
  | {:error, :already_subscribed}
  | {:error, :subscription_already_exists}
  | {:error, :too_many_subscribers}
  | {:error, reason :: term()}

Create a persistent subscription to a single stream.

The subscriber process will be notified of each batch of events appended to the single stream identified by stream_uuid.

  • stream_uuid is the stream to subscribe to. Use the $all identifier to subscribe to events from all streams.

  • subscription_name is used to uniquely identify the subscription.

  • subscriber is a process that will be sent {:events, events} notification messages.

  • opts is an optional map providing additional subscription configuration:

    • start_from is a pointer to the first event to receive. It must be one of:

      • :origin for all events from the start of the stream (default).
      • :current for any new events appended to the stream after the subscription has been created.
      • any positive integer for a stream version to receive events after.
    • selector to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy value.

    • mapper to define a function to map each recorded event before sending to the subscriber.

    • concurrency_limit defines the maximum number of concurrent subscribers allowed to connect to the subscription. By default only one subscriber may connect. If too many subscribers attempt to connect to the subscription an {:error, :too_many_subscribers} is returned.

    • buffer_size limits how many in-flight events will be sent to the subscriber process before acknowledgement of successful processing. This limits the number of messages sent to the subscriber and stops their message queue from getting filled with events. Defaults to one in-flight event.

    • partition_by is an optional function used to partition events to subscribers. It can be used to guarantee processing order when multiple subscribers have subscribed to a single subscription. The function is passed a single argument (an EventStore.RecordedEvent struct) and must return the partition key. As an example to guarantee events for a single stream are processed serially, but different streams are processed concurrently, you could use the stream_uuid as the partition key.

        by_stream = fn %EventStore.RecordedEvent{stream_uuid: stream_uuid} -> stream_uuid end
      
        {:ok, _subscription} =
          EventStore.subscribe_to_stream(stream_uuid, "example", self(),
            concurrency_limit: 10,
            partition_by: by_stream
          )

The subscription will resume from the last acknowledged event if it already exists. It will ignore the start_from argument in this case.

Returns {:ok, subscription} when subscription succeeds.

Notification messages

Subscribers will initially receive a {:subscribed, subscription} message once the subscription has successfully subscribed.

After this message events will be sent to the subscriber, in batches, as {:events, events} where events is a collection of EventStore.RecordedEvent structs.

Example

{:ok, subscription} = EventStore.subscribe_to_stream(stream_uuid, "example", self())

# wait for the subscription confirmation
receive do
  {:subscribed, ^subscription} ->
    IO.puts "Successfully subscribed to stream: " <> inspect(stream_uuid)
end

receive do
  {:events, events} ->
    IO.puts "Received events: " <> inspect(events)

    # acknowledge receipt
    EventStore.ack(subscription, events)
end
Link to this callback

unsubscribe_from_all_streams(subscription_name)

View Source
unsubscribe_from_all_streams(subscription_name :: String.t()) :: :ok

Unsubscribe an existing subscriber from all event notifications.

  • subscription_name is used to identify the existing subscription process to stop.

Returns :ok on success.

Link to this callback

unsubscribe_from_stream(stream_uuid, subscription_name)

View Source
unsubscribe_from_stream(
  stream_uuid :: String.t(),
  subscription_name :: String.t()
) :: :ok

Unsubscribe an existing subscriber from event notifications.

  • stream_uuid is the stream to unsubscribe from.

  • subscription_name is used to identify the existing subscription process to stop.

Returns :ok on success.