View Source EventStore behaviour (EventStore v1.4.7)

EventStore allows you to define one or more event store modules to append, read, and subscribe to streams of events.

It uses PostgreSQL (v9.5 or later) as the underlying storage engine.

Defining an event store

An event store module is defined in your own application as follows:

defmodule MyApp.EventStore do
  use EventStore, otp_app: :my_app

  # Optional `init/1` function to modify config at runtime.
  def init(config) do
    {:ok, config}
  end
end

Where the configuration for the event store must be in your application environment, usually defined in config/config.exs:

config :my_app, MyApp.EventStore,
  serializer: EventStore.JsonSerializer,
  username: "postgres",
  password: "postgres",
  database: "eventstore",
  hostname: "localhost"

Or use a URL to connect instead:

config :my_app, MyApp.EventStore,
  serializer: EventStore.JsonSerializer,
  url: "postgres://postgres:postgres@localhost/eventstore"

Note: To use an EventStore with Commanded you should configure the event store to use Commanded's JSON serializer which provides additional support for JSON decoding:

config :my_app, MyApp.EventStore,
  serializer: Commanded.Serialization.JsonSerializer

The event store module defines a start_link/1 function that needs to be invoked before using the event store. In general, this function is not called directly, but included as part of your application supervision tree.

If your application was generated with a supervisor (by passing --sup to mix new) you will have a lib/my_app/application.ex file containing the application start callback that defines and starts your supervisor. You just need to edit the start/2 function to start the event store in your application's supervisor:

  def start(_type, _args) do
    children = [
      MyApp.EventStore
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

Each event store module (e.g. MyApp.EventStore) provides a public API to read events from and write events to an event stream, and subscribe to event notifications.

Postgres schema

By default the public schema will be used for event store tables. An event store can be configured to use an alternate Postgres schema:

defmodule MyApp.EventStore do
  use EventStore, otp_app: :my_app, schema: "schema_name"
end

Or provide the schema as an option in the init/1 callback function:

defmodule MyApp.EventStore do
  use EventStore, otp_app: :my_app

  def init(config) do
    {:ok, Keyword.put(config, :schema, "schema_name")}
  end
end

Or define it in environment config when configuring the database connection settings:

# config/config.exs
config :my_app, MyApp.EventStore, schema: "schema_name"

This feature allows you to define and start multiple event stores sharing a single Postgres database, but with their data isolated and segregated by schema.

Note the mix event_store.<task> tasks to create, initialize, and drop an event store database will also handle creating and/or dropping the schema.

Dynamic named event store

An event store can be started multiple times by providing a name when starting. The name must be provided as an option to all event store operations to identify the correct instance.

Example

Define an event store:

defmodule MyApp.EventStore do
  use EventStore, otp_app: :my_app
end

Start multiple instances of the event store, each with a unique name:

{:ok, _pid} = EventStore.start_link(name: :eventstore1)
{:ok, _pid} = EventStore.start_link(name: :eventstore2)
{:ok, _pid} = EventStore.start_link(name: :eventstore3)

Use a dynamic event store by providing its name as an option to each function:

:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, name: :eventstore1)

{:ok, events} = EventStore.read_stream_forward(stream_uuid, 0, 1_000, name: :eventstore1)

Dynamic schemas

This feature also allows you to start each event store instance using a different schema:

{:ok, _pid} = EventStore.start_link(name: :tenant1, schema: "tenant1")
{:ok, _pid} = EventStore.start_link(name: :tenant2, schema: "tenant2")

Or start supervised:

children =
  for tenant <- [:tenant1, :tenant2, :tenant3] do
    {MyApp.EventStore, name: tenant, schema: "#{tenant}"}
  end

opts = [strategy: :one_for_one, name: MyApp.Supervisor]

Supervisor.start_link(children, opts)

The above can be used for multi-tenancy where the data for each tenant is stored in a separate, isolated schema.

Shared database connection pools

By default each event store will start its own Postgrex database connection pool. The size of the pool is configured with the pool_size config option.

When you have multiple event stores running you will also end up with multiple connection pools. If they are all connecting to the same physical Postgres database then it can be useful to share a single pool amongst all event stores. Use the shared_connection_pool config option to specify a name for the shared connection pool. Then configure the event stores you'd like to share the pool with the same name.

This can be done in config:

# config/config.exs
config :my_app, MyApp.EventStore, shared_connection_pool: :shared_pool

Or when starting the event stores, such as via a Supervisor:

Supervisor.start_link(
  [
    {MyApp.EventStore, name: :eventstore1, shared_connection_pool: :shared_pool},
    {MyApp.EventStore, name: :eventstore2, shared_connection_pool: :shared_pool},
    {MyApp.EventStore, name: :eventstore3, shared_connection_pool: :shared_pool}
  ], opts)

Using an existing database connection or transaction

In some situations you might want to execute the event store operations using an existing Postgres database connection or transaction. For instance, if you want to persist changes to one or more other tables, such as a read-model projection.

To do this you can provide a Postgrex connection process or transaction as a :conn option to any of the supported EventStore functions.

{:ok, pid} = Postgrex.start_link(config)

Postgrex.transaction(pid, fn conn ->
  :ok = EventStore.append_to_stream(stream_uuid, expected_version, events, conn: conn)
end)

This can also be used with an Ecto Repo which is configured to use the Postgres SQL adapter. The connection process may be looked up as follows:

Repo.transaction(fn ->
  %{pid: pool} = Ecto.Adapter.lookup_meta(Repo)

  conn = Process.get({Ecto.Adapters.SQL, pool})

  :ok = EventStore.append_to_stream(stream_uuid, expected_version, events, conn: conn)
end)

Guides

Please refer to the following guides to learn more:


Summary

Callbacks

Acknowledge receipt of the given events received from a subscription.

Append one or more events to a stream atomically.

Returns the event store configuration stored in the :otp_app environment.

Delete an existing persistent subscription to all streams.

Delete a previously recorded snapshop for a given source.

Delete an existing persistent subscription.

A callback executed when the event store starts or when configuration is read.

Link one or more existing events to another stream.

Paginate all streams.

Reads the requested number of events from all streams in the reverse order from which they were originally written.

Reads the requested number of events from all streams in the order in which they were originally written.

Read a snapshot, if available, for a given source.

Reads the requested number of events from the given stream in the reverse order from which they were originally written.

Reads the requested number of events from the given stream in the order in which they were originally written.

Record a snapshot of the data and metadata for a given source.

Starts any connection pooling or supervision and return {:ok, pid} or just :ok if nothing needs to be done.

Shuts down the event store.

Streams events from all streams in the reverse order from which they were originally written.

Streams events from all streams in the order in which they were originally written.

Streams events from the given stream in the reverse order from which they were originally written.

Streams events from the given stream in the order in which they were originally written.

Get basic information about a stream, including its version, status, and created date.

Create a transient subscription to a given stream.

Create a subscription to all streams. By default the subscription is persistent.

Create a subscription to a single stream. By default the subscription is persistent.

Unsubscribe an existing subscriber from all event notifications.

Unsubscribe an existing subscriber from event notifications.

Functions

Returns all running EventStore instances.

Types

expected_version()

@type expected_version() ::
  :any_version | :no_stream | :stream_exists | non_neg_integer()

option()

@type option() ::
  {:name, atom()}
  | {:conn, Postgrex.conn() | DBConnection.t()}
  | {:timeout, timeout()}

options()

@type options() :: [option()]

pagination_option()

@type pagination_option() ::
  option()
  | {:page_size, pos_integer()}
  | {:page_number, pos_integer()}
  | {:search, String.t()}
  | {:sort_by,
     :stream_uuid
     | :stream_id
     | :stream_version
     | :created_at
     | :deleted_at
     | :status}
  | {:sort_dir, :asc | :desc}

pagination_options()

@type pagination_options() :: [pagination_option()]

persistent_subscription_option()

@type persistent_subscription_option() ::
  transient_subscribe_option()
  | {:buffer_size, pos_integer()}
  | {:checkpoint_after, non_neg_integer()}
  | {:checkpoint_threshold, pos_integer()}
  | {:concurrency_limit, pos_integer()}
  | {:max_size, pos_integer()}
  | {:partition_by, (EventStore.RecordedEvent.t() -> any())}
  | {:start_from, :origin | :current | non_neg_integer()}
  | {:timeout, timeout()}
  | {:transient, boolean()}

persistent_subscription_options()

@type persistent_subscription_options() :: [persistent_subscription_option()]

start_from()

@type start_from() :: :origin | :current | non_neg_integer()

t()

@type t() :: module()

transient_subscribe_option()

@type transient_subscribe_option() ::
  {:name, atom()}
  | {:selector, (EventStore.RecordedEvent.t() -> any())}
  | {:mapper, (EventStore.RecordedEvent.t() -> any())}

transient_subscribe_options()

@type transient_subscribe_options() :: [transient_subscribe_option()]

Callbacks

ack(subscription, arg2)

@callback ack(
  subscription :: pid(),
  EventStore.RecordedEvent.t()
  | [EventStore.RecordedEvent.t()]
  | non_neg_integer()
) :: :ok | {:error, reason :: term()}

Acknowledge receipt of the given events received from a subscription.

Accepts a single EventStore.RecordedEvent struct, a list of EventStore.RecordedEvents, or the event number of the recorded event to acknowledge.

append_to_stream(stream_uuid, expected_version, events, opts)

@callback append_to_stream(
  stream_uuid :: String.t(),
  expected_version(),
  events :: [EventStore.EventData.t()],
  opts :: options()
) ::
  :ok
  | {:error, :cannot_append_to_all_stream}
  | {:error, :stream_exists}
  | {:error, :stream_not_found}
  | {:error, :wrong_expected_version}
  | {:error, :stream_deleted}
  | {:error, reason :: term()}

Append one or more events to a stream atomically.

  • stream_uuid is used to uniquely identify a stream.

  • expected_version is used for optimistic concurrency checks. You can provide a non-negative integer to specify the expected stream version. This is used to ensure you can only append to the stream if it is at exactly that version.

    You can also provide one of the following values to alter the concurrency check behaviour:

    • :any_version - No concurrency checking and allow any stream version (including no stream).
    • :no_stream - Ensure the stream does not exist.
    • :stream_exists - Ensure the stream exists.
  • events is a list of %EventStore.EventData{} structs.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database transaction, in milliseconds. Defaults to 15,000ms.

Returns :ok on success, or an {:error, reason} tagged tuple. The returned error may be due to one of the following reasons:

  • {:error, :wrong_expected_version} when the actual stream version differs from the provided expected version.
  • {:error, :stream_exists} when the stream exists, but expected version was :no_stream.
  • {:error, :stream_not_found} when the stream does not exist, but expected version was :stream_exists.

config()

@callback config() :: Keyword.t()

Returns the event store configuration stored in the :otp_app environment.

delete_all_streams_subscription(subscription_name, opts)

@callback delete_all_streams_subscription(
  subscription_name :: String.t(),
  opts :: options()
) ::
  :ok | {:error, term()}

Delete an existing persistent subscription to all streams.

  • subscription_name is used to identify the existing subscription to remove.

Returns :ok on success.

delete_snapshot(source_uuid, opts)

@callback delete_snapshot(source_uuid :: String.t(), opts :: options()) ::
  :ok | {:error, reason :: term()}

Delete a previously recorded snapshop for a given source.

Returns :ok on success, or when the snapshot does not exist.

delete_stream(stream_uuid, expected_version, type, opts)

@callback delete_stream(
  stream_uuid :: String.t(),
  expected_version :: :any_version | :stream_exists | non_neg_integer(),
  type :: :soft | :hard,
  opts :: Keyword.t()
) ::
  :ok
  | {:error, :stream_not_found}
  | {:error, :stream_deleted}
  | {:error, term()}

Delete an existing stream.

  • stream_uuid identity of the stream to be deleted.

  • expected_version is used for optimistic concurrency checking. You can provide a non-negative integer to specify the expected stream version. This is used to ensure you can only delete a stream if it is at exactly that version.

    You can also provide one of the following values to alter the concurrency checking behaviour:

    • :any_version - No concurrency check, allow any stream version.
    • :stream_exists - Ensure the stream exists, at any version.
  • type - used to indicate how the stream is deleted:

    • :soft - the stream is marked as deleted, but no events are removed.
    • :hard - the stream and its events are permanently deleted from the database.

    Soft deletion is the default if the type is not provided.

Returns :ok on success or an error tagged tuple on failure.

Soft delete

Will mark the stream as deleted, but will not delete its events. Events from soft deleted streams will still appear in the globally ordered all events ($all) stream and in any linked streams.

A soft deleted stream cannot be read nor appended to. Subscriptions to the deleted stream will not receive any events but subscriptions containing linked events from the deleted stream, such as the global all events stream, will still receive events from the deleted stream.

Hard delete

Will permanently delete the stream and its events. This is irreversible and will remove data. Events will be removed from the globally ordered all events stream and any linked streams.

After being hard deleted, a stream can later be appended to and read as if it had never existed.

Examples

Soft delete a stream

Delete a stream at any version:

:ok = MyApp.EventStore.delete_stream("stream1", :any_version, :soft)

Delete a stream at an expected version:

:ok = MyApp.EventStore.delete_stream("stream2", 3, :soft)

Delete stream will use soft delete by default so you can omit the type:

:ok = MyApp.EventStore.delete_stream("stream1", :any_version)

Hard delete a stream

Since hard deletes are destructive and irreversible they are disabled by default. To use hard deletes you must first enable them for the event store:

defmodule MyApp.EventStore do
  use EventStore, otp_app: :my_app, enable_hard_deletes: true
end

Or via config:

# config/config.exs
config :my_app, MyApp.EventStore, enable_hard_deletes: true

Hard delete a stream at any version:

:ok = MyApp.EventStore.delete_stream("stream1", :any_version, :hard)

Hard delete a stream that should exist:

:ok = MyApp.EventStore.delete_stream("stream2", :stream_exists, :hard)

delete_subscription(stream_uuid, subscription_name, opts)

@callback delete_subscription(
  stream_uuid :: String.t(),
  subscription_name :: String.t(),
  opts :: options()
) :: :ok | {:error, term()}

Delete an existing persistent subscription.

  • stream_uuid is the stream the subscription is subscribed to.

  • subscription_name is used to identify the existing subscription to remove.

Returns :ok on success.

init(config)

(optional)
@callback init(config :: Keyword.t()) :: {:ok, Keyword.t()}

A callback executed when the event store starts or when configuration is read.

It must return {:ok, keyword} with the updated list of configuration.

paginate_streams(opts)

@callback paginate_streams(opts :: pagination_options()) ::
  {:ok, EventStore.Page.t(EventStore.Streams.StreamInfo.t())} | {:error, any()}

Paginate all streams.

  • opts an optional keyword list containing:

    • page_size the total number of streams per page. Defaults to 50.

    • page_number the current page number. Defaults to page 1.

    • search search for a stream by its identity.

    • sort_by sort the streams by the given field. Defaults to sorting by the stream's internal id (:stream_id field)

    • sort_dir direction to sort streams by, either :asc or :desc. Defaults to :asc.

    • name the name of the event store if provided to start_link/1. Defaults to the event store module name (e.g. MyApp.EventStore).

    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.

Returns an {:ok, page} result containing a list of StreamInfo structs, or an error tagged tuple on failure.

Example

alias EventStore.Page

{:ok, %Page{entries: streams}} = MyApp.EventStore.paginate_streams()

read_all_streams_backward(start_version, count, opts)

@callback read_all_streams_backward(
  start_version :: integer(),
  count :: non_neg_integer(),
  opts :: options()
) :: {:ok, [EventStore.RecordedEvent.t()]} | {:error, reason :: term()}

Reads the requested number of events from all streams in the reverse order from which they were originally written.

  • start_version optionally, the stream version of the first event to read. Use -1 to indicate starting from the end of the stream. Defaults to the end of the stream if not set.

  • count optionally, the maximum number of events to read. Defaults to returning 1,000 events from all streams.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.

read_all_streams_forward(start_version, count, opts)

@callback read_all_streams_forward(
  start_version :: non_neg_integer(),
  count :: non_neg_integer(),
  opts :: options()
) :: {:ok, [EventStore.RecordedEvent.t()]} | {:error, reason :: term()}

Reads the requested number of events from all streams in the order in which they were originally written.

  • start_version optionally, the stream version of the first event to read. Defaults to the beginning of the stream if not set.

  • count optionally, the maximum number of events to read. Defaults to returning 1,000 events from all streams.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.

read_snapshot(source_uuid, opts)

@callback read_snapshot(source_uuid :: String.t(), opts :: options()) ::
  {:ok, EventStore.Snapshots.SnapshotData.t()} | {:error, :snapshot_not_found}

Read a snapshot, if available, for a given source.

Returns {:ok, %EventStore.Snapshots.SnapshotData{}} on success, or {:error, :snapshot_not_found} when unavailable.

read_stream_backward(stream_uuid, start_version, count, opts)

@callback read_stream_backward(
  stream_uuid :: String.t(),
  start_version :: non_neg_integer(),
  count :: non_neg_integer(),
  opts :: options()
) ::
  {:ok, [EventStore.RecordedEvent.t()]}
  | {:error, :stream_deleted}
  | {:error, reason :: term()}

Reads the requested number of events from the given stream in the reverse order from which they were originally written.

  • stream_uuid is used to uniquely identify a stream.

  • start_version optionally, the stream version of the first event to read. Use -1 to indicate starting from the end of the stream. Defaults to the end of the stream if not set.

  • count optionally, the maximum number of events to read. Defaults to to returning 1,000 events from the stream.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.

read_stream_forward(stream_uuid, start_version, count, opts)

@callback read_stream_forward(
  stream_uuid :: String.t(),
  start_version :: non_neg_integer(),
  count :: non_neg_integer(),
  opts :: options()
) ::
  {:ok, [EventStore.RecordedEvent.t()]}
  | {:error, :stream_deleted}
  | {:error, reason :: term()}

Reads the requested number of events from the given stream in the order in which they were originally written.

  • stream_uuid is used to uniquely identify a stream.

  • start_version optionally, the stream version of the first event to read. Defaults to the beginning of the stream if not set.

  • count optionally, the maximum number of events to read. Defaults to to returning 1,000 events from the stream.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.

record_snapshot(snapshot, opts)

@callback record_snapshot(
  snapshot :: EventStore.Snapshots.SnapshotData.t(),
  opts :: options()
) ::
  :ok | {:error, reason :: term()}

Record a snapshot of the data and metadata for a given source.

Returns :ok on success.

start_link(opts)

@callback start_link(opts :: Keyword.t()) ::
  {:ok, pid()} | {:error, {:already_started, pid()}} | {:error, term()}

Starts any connection pooling or supervision and return {:ok, pid} or just :ok if nothing needs to be done.

Returns {:error, {:already_started, pid}} if the event store is already started or {:error, term} in case anything else goes wrong.

stop(supervisor, timeout)

@callback stop(Supervisor.supervisor(), timeout()) :: :ok

Shuts down the event store.

stream_all_backward(start_version, opts)

@callback stream_all_backward(
  start_version :: non_neg_integer(),
  opts :: [options() | {:read_batch_size, non_neg_integer()}]
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term()}

Streams events from all streams in the reverse order from which they were originally written.

  • start_version optionally, the stream version of the first event to read. Use -1 to indicate starting from the end of the stream. Defaults to the end of the stream if not set.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.
    • read_batch_size optionally, the number of events to read at a time from storage. Defaults to reading 1,000 events per batch.

stream_all_forward(start_version, opts)

@callback stream_all_forward(
  start_version :: non_neg_integer(),
  opts :: [options() | {:read_batch_size, non_neg_integer()}]
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term()}

Streams events from all streams in the order in which they were originally written.

  • start_version optionally, the stream version of the first event to read. Defaults to the beginning of the stream if not set.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.
    • read_batch_size optionally, the number of events to read at a time from storage. Defaults to reading 1,000 events per batch.

stream_backward(stream_uuid, start_version, opts)

@callback stream_backward(
  stream_uuid :: String.t(),
  start_version :: integer(),
  opts :: [options() | {:read_batch_size, non_neg_integer()}]
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term()}

Streams events from the given stream in the reverse order from which they were originally written.

  • start_version optionally, the stream version of the first event to read. Use -1 to indicate starting from the end of the stream. Defaults to the end of the stream if not set.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.
    • read_batch_size optionally, the number of events to read at a time from storage. Defaults to reading 1,000 events per batch.

stream_forward(stream_uuid, start_version, opts)

@callback stream_forward(
  stream_uuid :: String.t(),
  start_version :: integer(),
  opts :: [options() | {:read_batch_size, non_neg_integer()}]
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term()}

Streams events from the given stream in the order in which they were originally written.

  • start_version optionally, the stream version of the first event to read. Defaults to the beginning of the stream if not set.

  • opts an optional keyword list containing:

    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.
    • read_batch_size optionally, the number of events to read at a time from storage. Defaults to reading 1,000 events per batch.

stream_info(stream_uuid, opts)

@callback stream_info(stream_uuid :: String.t() | :all, opts :: options()) ::
  {:ok, EventStore.Streams.StreamInfo.t()}
  | {:error, :stream_not_found}
  | {:error, :stream_deleted}
  | {:error, reason :: term()}

Get basic information about a stream, including its version, status, and created date.

  • opts an optional keyword list containing:
    • name the name of the event store if provided to start_link/1.
    • timeout an optional timeout for the database query, in milliseconds. Defaults to 15,000ms.

Returns {:ok, StreamInfo.t()} on success, or an {:error, reason} tagged tuple. The returned error may be due to one of the following reasons:

  • {:error, :stream_not_found} when the stream does not exist.
  • {:error, :stream_deleted} when the stream was soft deleted.

Example

alias EventStore.Streams.StreamInfo

{:ok, %StreamInfo{stream_version: stream_version}} =
  MyApp.EventStore.stream_info("stream-1234")

subscribe(stream_uuid, opts)

@callback subscribe(stream_uuid :: String.t(), opts :: transient_subscribe_options()) ::
  :ok | {:error, term()}

Create a transient subscription to a given stream.

  • stream_uuid is the stream to subscribe to. Use the $all identifier to subscribe to events from all streams.

  • opts is an optional keyword list providing additional subscription configuration:

    • name the name of the event store if provided to start_link/1.
    • selector to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy value
    • mapper to define a function to map each recorded event before sending to the subscriber.

The calling process will be notified whenever new events are appended to the given stream_uuid.

As the subscription is transient you do not need to acknowledge receipt of each event. The subscriber process will miss any events if it is restarted and resubscribes. If you need a persistent subscription with guaranteed at-least-once event delivery and back-pressure you should use EventStore.subscribe_to_stream/4.

Notification message

Events will be sent to the subscriber, in batches, as {:events, events} where events is a collection of EventStore.RecordedEvent structs.

Example

{:ok, subscription} = EventStore.subscribe(stream_uuid)

# receive first batch of events
receive do
  {:events, events} ->
    IO.puts "Received events: " <> inspect(events)
end

subscribe_to_all_streams(subscription_name, subscriber, opts)

@callback subscribe_to_all_streams(
  subscription_name :: String.t(),
  subscriber :: pid(),
  opts :: persistent_subscription_options()
) ::
  {:ok, subscription :: pid()}
  | {:error, :already_subscribed}
  | {:error, :subscription_already_exists}
  | {:error, :too_many_subscribers}
  | {:error, reason :: term()}

Create a subscription to all streams. By default the subscription is persistent.

See EventStore.subscribe_to_stream/4 for options.

Example

{:ok, subscription} = EventStore.subscribe_to_all_streams("all_subscription", self())

# wait for the subscription confirmation
receive do
  {:subscribed, ^subscription} ->
    IO.puts "Successfully subscribed to all streams"
end

receive do
  {:events, events} ->
    IO.puts "Received events: " <> inspect(events)

    # acknowledge receipt
    EventStore.ack(subscription, events)
end

subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts)

@callback subscribe_to_stream(
  stream_uuid :: String.t(),
  subscription_name :: String.t(),
  subscriber :: pid(),
  opts :: persistent_subscription_options()
) ::
  {:ok, subscription :: pid()}
  | {:error, :already_subscribed}
  | {:error, :subscription_already_exists}
  | {:error, :too_many_subscribers}
  | {:error, reason :: term()}

Create a subscription to a single stream. By default the subscription is persistent.

The subscriber process will be notified of each batch of events appended to the single stream identified by stream_uuid.

  • stream_uuid is the stream to subscribe to. Use the $all identifier to subscribe to events from all streams.

  • subscription_name is used to uniquely identify the subscription.

  • subscriber is a process that will be sent {:events, events} notification messages.

  • opts is an optional keyword list providing additional subscription configuration:

    • name the name of the event store if provided to start_link/1.

    • start_from is a pointer to the first event to receive. It must be one of:

      • :origin for all events from the start of the stream (default).
      • :current for any new events appended to the stream after the subscription has been created.
      • any positive integer for a stream version to receive events after.
    • selector to define a function to filter each event, i.e. returns only those elements for which the function returns a truthy value.

    • mapper to define a function to map each recorded event before sending to the subscriber.

    • concurrency_limit defines the maximum number of concurrent subscribers allowed to connect to the subscription. By default only one subscriber may connect. If too many subscribers attempt to connect to the subscription an {:error, :too_many_subscribers} is returned.

    • buffer_size limits how many in-flight events will be sent to the subscriber process before acknowledgement of successful processing. This limits the number of messages sent to the subscriber and stops their message queue from getting filled with events. Defaults to one in-flight event.

    • checkpoint_threshold determines how frequently a checkpoint is written to the database for the subscription after events are acknowledged. Increasing the threshold will reduce the number of database writes for busy subscriptions, but means that events might be replayed when the subscription resumes if the checkpoint cannot be written. The default is to persist the checkpoint after each acknowledgement.

    • checkpoint_after (milliseconds) used to ensure a checkpoint is written after a period of inactivity even if the checkpoint threshold has not been met. This ensures checkpoints are consistently written during less busy periods. It is only applicable when a checkpoint threshold has been set as the default subscription behaviour is to checkpoint after each acknowledgement.

    • max_size limits the number of events queued in memory by the subscription process to prevent excessive memory usage. If the in-memory queue exceeds the max size - because the subscriber cannot keep up - then events will not be queued in memory, but instead will be read from the database on demand once the subscriber process has processed the queue. This limit also determines how many events are read from the database at a time during catch-up. Defaults to 1,000 events.

    • partition_by is an optional function used to partition events to subscribers. It can be used to guarantee processing order when multiple subscribers have subscribed to a single subscription. The function is passed a single argument (an EventStore.RecordedEvent struct) and must return the partition key. As an example to guarantee events for a single stream are processed serially, but different streams are processed concurrently, you could use the stream_uuid as the partition key.

        by_stream = fn %EventStore.RecordedEvent{stream_uuid: stream_uuid} -> stream_uuid end
      
        {:ok, _subscription} =
          EventStore.subscribe_to_stream(stream_uuid, "example", self(),
            concurrency_limit: 10,
            partition_by: by_stream
          )
    • timeout an optional timeout for database queries, in milliseconds. Defaults to 15,000ms.

    • transient is an optional boolean flag to create a transient subscription. By default this is set to false. If you want to create a transient subscription set this flag to true. Your subscription will not be persisted, so if the subscription is restarted, you will receive the events again starting from start_from.

      An example usage are short lived event handlers that keep their state in memory but still want to have the guarantee to have received all events.

      It's possible to create a persistent subscription with some name, stop it and later create a transient subscription with the same name. The transient subscription will now receive all events starting from start_from. If you later stop this transient subscription and start a persistent subscription again with the same name, you will receive the events again as if the transient subscription never existed.

The subscription will resume from the last acknowledged event if it already exists. It will ignore the start_from argument in this case.

Returns {:ok, subscription} when subscription succeeds.

Notification messages

Subscribers will initially receive a {:subscribed, subscription} message once the subscription has successfully subscribed.

After this message events will be sent to the subscriber, in batches, as {:events, events} where events is a collection of EventStore.RecordedEvent structs.

Example

{:ok, subscription} = EventStore.subscribe_to_stream(stream_uuid, "example", self())

# wait for the subscription confirmation
receive do
  {:subscribed, ^subscription} ->
    IO.puts "Successfully subscribed to stream: " <> inspect(stream_uuid)
end

receive do
  {:events, events} ->
    IO.puts "Received events: " <> inspect(events)

    # acknowledge receipt
    EventStore.ack(subscription, events)
end

Subscription tuning

Use the checkpoint_threshold and checkpoint_after options to configure how frequently checkpoints are written to the database. By default a subscription will persist a checkpoint after each acknowledgement. This can cause high write load on the database for busy subscriptions which receive a large number of events. This problem is known as write amplification where each event written to a stream causes many additional writes as subscriptions acknowledge processing of the event.

The checkpoint_threshold controls how frequently checkpoints are persisted. Increasing the threshold reduces the number of database writes. For example using a threshold of 100 means that a checkpoint is written at most once for every 100 events processed. The checkpoint_after ensures that a checkpoint will still be written after a period of inactivity even when the threshold has not been met. This ensures bursts of event processing can be safely handled.

unsubscribe_from_all_streams(subscription_name, opts)

@callback unsubscribe_from_all_streams(subscription_name :: String.t(), opts :: options()) ::
  :ok

Unsubscribe an existing subscriber from all event notifications.

  • subscription_name is used to identify the existing subscription process to stop.

Returns :ok on success.

unsubscribe_from_stream(stream_uuid, subscription_name, opts)

@callback unsubscribe_from_stream(
  stream_uuid :: String.t(),
  subscription_name :: String.t(),
  opts :: options()
) :: :ok

Unsubscribe an existing subscriber from event notifications.

  • stream_uuid is the stream to unsubscribe from.

  • subscription_name is used to identify the existing subscription process to stop.

Returns :ok on success.

Functions

all_instances()

@spec all_instances() :: [{event_store :: module(), [{:name, atom()}]}]

Returns all running EventStore instances.

Note that order is not guaranteed.