EventStore v1.0.3 EventStore behaviour View Source
EventStore allows you to define one or more event store modules to append, read, and subscribe to streams of events.
It uses PostgreSQL (v9.5 or later) as the underlying storage engine.
Defining an event store
We can define an event store in our own application as follows:
defmodule MyApp.EventStore do
use EventStore,
otp_app: :my_app
# Optional `init/1` function to modify config at runtime.
def init(config) do
{:ok, config}
end
end
Where the configuration for the event store must be in your application
environment, usually defined in config/config.exs
:
config :my_app, MyApp.EventStore,
serializer: EventStore.JsonSerializer,
username: "postgres",
password: "postgres",
database: "eventstore",
hostname: "localhost",
# OR use a URL to connect instead
url: "postgres://postgres:postgres@localhost/eventstore",
pool_size: 1
The event store module defines a start_link/1
function that needs to be
invoked before using the event store. In general, this function is not
called directly, but included as part of your application supervision tree.
If your application was generated with a supervisor (by passing --sup
to mix new
) you will have a lib/my_app/application.ex
file
containing the application start callback that defines and starts your
supervisor. You just need to edit the start/2
function to start the event
store in your application's supervisor:
def start(_type, _args) do
children = [
MyApp.EventStore
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
Each event store module (e.g. MyApp.EventStore
) provides a public API to
read events from and write events to an event stream, and subscribe to event
notifications.
Please refer to the following guides to learn more:
Link to this section Summary
Callbacks
Acknowledge receipt of the given events received from a subscription.
Append one or more events to a stream atomically.
Returns the event store configuration stored in the :otp_app
environment.
Delete an existing persistent subscription to all streams.
Delete a previously recorded snapshop for a given source.
Delete an existing persistent subscription.
A callback executed when the event store starts or when configuration is read.
Link one or more existing events to another stream.
Reads the requested number of events from all streams, in the order in which they were originally written.
Read a snapshot, if available, for a given source.
Reads the requested number of events from the given stream, in the order in which they were originally written.
Record a snapshot of the data and metadata for a given source.
Starts any connection pooling or supervision and return {:ok, pid}
or just :ok
if nothing needs to be done.
Shuts down the event store.
Streams events from all streams, in the order in which they were originally written.
Streams events from the given stream, in the order in which they were originally written.
Create a transient subscription to a given stream.
Create a persistent subscription to all streams.
Create a persistent subscription to a single stream.
Unsubscribe an existing subscriber from all event notifications.
Unsubscribe an existing subscriber from event notifications.
Link to this section Types
expected_version()
View Sourceexpected_version() :: :any_version | :no_stream | :stream_exists | non_neg_integer()
Link to this section Callbacks
ack(subscription, arg2)
View Sourceack( subscription :: pid(), EventStore.RecordedEvent.t() | [EventStore.RecordedEvent.t()] | non_neg_integer() ) :: :ok | {:error, reason :: term()}
Acknowledge receipt of the given events received from a subscription.
Accepts a single EventStore.RecordedEvent
struct, a list of
EventStore.RecordedEvent
s, or the event number of the recorded event to
acknowledge.
append_to_stream(stream_uuid, expected_version, events, timeout)
View Sourceappend_to_stream( stream_uuid :: String.t(), expected_version(), events :: [EventStore.EventData.t()], timeout :: timeout() | nil ) :: :ok | {:error, :cannot_append_to_all_stream} | {:error, :stream_exists} | {:error, :stream_does_not_exist} | {:error, :wrong_expected_version} | {:error, reason :: term()}
Append one or more events to a stream atomically.
stream_uuid
is used to uniquely identify a stream.expected_version
is used for optimistic concurrency checks. You can provide a non-negative integer to specify the expected stream version. This is used to ensure you can only append to the stream if it is at exactly that version.You can also provide one of the following values to alter the concurrency check behaviour:
:any_version
- No concurrency checking and allow any stream version (including no stream).:no_stream
- Ensure the stream does not exist.:stream_exists
- Ensure the stream exists.
events
is a list of%EventStore.EventData{}
structs.timeout
an optional timeout for the database transaction, in milliseconds. Defaults to 15_000ms.
Returns :ok
on success, or an {:error, reason}
tagged tuple. The returned
error may be due to one of the following reasons:
{:error, :wrong_expected_version}
when the actual stream version differs from the provided expected version.{:error, :stream_exists}
when the stream exists, but expected version was:no_stream
.{:error, :stream_does_not_exist}
when the stream does not exist, but expected version was:stream_exists
.
Returns the event store configuration stored in the :otp_app
environment.
Delete an existing persistent subscription to all streams.
subscription_name
is used to identify the existing subscription to remove.
Returns :ok
on success.
Delete a previously recorded snapshop for a given source.
Returns :ok
on success, or when the snapshot does not exist.
Delete an existing persistent subscription.
stream_uuid
is the stream the subscription is subscribed to.subscription_name
is used to identify the existing subscription to remove.
Returns :ok
on success.
A callback executed when the event store starts or when configuration is read.
It must return {:ok, keyword}
with the updated list of configuration.
link_to_stream(stream_uuid, expected_version, events, timeout)
View Sourcelink_to_stream( stream_uuid :: String.t(), expected_version(), events :: [EventStore.RecordedEvent.t()] | [non_neg_integer()], timeout :: timeout() | nil ) :: :ok | {:error, :cannot_append_to_all_stream} | {:error, :stream_exists} | {:error, :stream_does_not_exist} | {:error, :wrong_expected_version} | {:error, reason :: term()}
Link one or more existing events to another stream.
Allows you to construct streams containing events already appended to any other stream. This is more efficient than copying events between streams since only a reference to the existing event is created.
stream_uuid
is used to uniquely identify the target stream.expected_version
is used for optimistic concurrency checks. You can provide a non-negative integer to specify the expected stream version. This is used to ensure you can only append to the stream if it is at exactly that version.You can also provide one of the following values to affect the concurrency check behaviour:
:any_version
- No concurrency checking; allow any stream version (including no stream).:no_stream
- Ensure the stream does not exist.:stream_exists
- Ensure the stream exists.
events_or_event_ids
is a list of%EventStore.EventData{}
structs or event ids.timeout
an optional timeout for the database transaction, in milliseconds. Defaults to 15_000ms.
Returns :ok
on success, or an {:error, reason}
tagged tuple. The returned
error may be due to one of the following reasons:
{:error, :wrong_expected_version}
when the actual stream version differs from the provided expected version.{:error, :stream_exists}
when the stream exists, but expected version was:no_stream
.{:error, :stream_does_not_exist}
when the stream does not exist, but expected version was:stream_exists
.
read_all_streams_forward(start_event_number, count, timeout)
View Sourceread_all_streams_forward( start_event_number :: non_neg_integer(), count :: non_neg_integer(), timeout :: timeout() | nil ) :: {:ok, [EventStore.RecordedEvent.t()]} | {:error, reason :: term()}
Reads the requested number of events from all streams, in the order in which they were originally written.
start_event_number
optionally, the number of the first event to read. Defaults to the beginning of the stream if not set.count
optionally, the maximum number of events to read. If not set it will be limited to returning 1,000 events from all streams.timeout
an optional timeout for querying the database, in milliseconds. Defaults to 15_000ms.
read_snapshot(source_uuid)
View Sourceread_snapshot(source_uuid :: String.t()) :: {:ok, EventStore.Snapshots.SnapshotData.t()} | {:error, :snapshot_not_found}
Read a snapshot, if available, for a given source.
Returns {:ok, %EventStore.Snapshots.SnapshotData{}}
on success, or
{:error, :snapshot_not_found}
when unavailable.
read_stream_forward(stream_uuid, start_version, count, timeout)
View Sourceread_stream_forward( stream_uuid :: String.t(), start_version :: non_neg_integer(), count :: non_neg_integer(), timeout :: timeout() | nil ) :: {:ok, [EventStore.RecordedEvent.t()]} | {:error, reason :: term()}
Reads the requested number of events from the given stream, in the order in which they were originally written.
stream_uuid
is used to uniquely identify a stream.start_version
optionally, the version number of the first event to read. Defaults to the beginning of the stream if not set.count
optionally, the maximum number of events to read. If not set it will be limited to returning 1,000 events from the stream.timeout
an optional timeout for querying the database, in milliseconds. Defaults to 15_000ms.
record_snapshot(snapshot)
View Sourcerecord_snapshot(snapshot :: EventStore.Snapshots.SnapshotData.t()) :: :ok | {:error, reason :: term()}
Record a snapshot of the data and metadata for a given source.
Returns :ok
on success.
Starts any connection pooling or supervision and return {:ok, pid}
or just :ok
if nothing needs to be done.
Returns {:error, {:already_started, pid}}
if the event store is already
started or {:error, term}
in case anything else goes wrong.
Shuts down the event store.
stream_all_forward(start_event_number, read_batch_size, timeout)
View Sourcestream_all_forward( start_event_number :: non_neg_integer(), read_batch_size :: non_neg_integer(), timeout :: timeout() | nil ) :: Enumerable.t()
Streams events from all streams, in the order in which they were originally written.
start_event_number
optionally, the number of the first event to read. Defaults to the beginning of the stream if not set.read_batch_size
optionally, the number of events to read at a time from storage. Defaults to reading 1,000 events per batch.timeout
an optional timeout for querying the database (per batch), in milliseconds. Defaults to 15_000ms.
stream_forward(stream_uuid, start_version, read_batch_size, timeout)
View Sourcestream_forward( stream_uuid :: String.t(), start_version :: non_neg_integer(), read_batch_size :: non_neg_integer(), timeout :: timeout() | nil ) :: Enumerable.t() | {:error, reason :: term()}
Streams events from the given stream, in the order in which they were originally written.
start_version
optionally, the version number of the first event to read. Defaults to the beginning of the stream if not set.read_batch_size
optionally, the number of events to read at a time from storage. Defaults to reading 1,000 events per batch.timeout
an optional timeout for querying the database (per batch), in milliseconds. Defaults to 15_000ms.
subscribe(stream_uuid, list)
View Sourcesubscribe(stream_uuid :: String.t(), selector: (EventStore.RecordedEvent.t() -> any()), mapper: (EventStore.RecordedEvent.t() -> any()) ) :: :ok | {:error, term()}
Create a transient subscription to a given stream.
stream_uuid
is the stream to subscribe to. Use the$all
identifier to subscribe to events from all streams.opts
is an optional map providing additional subscription configuration:selector
to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy valuemapper
to define a function to map each recorded event before sending to the subscriber.
The calling process will be notified whenever new events are appended to
the given stream_uuid
.
As the subscription is transient you do not need to acknowledge receipt of
each event. The subscriber process will miss any events if it is restarted
and resubscribes. If you need a persistent subscription with guaranteed
at-least-once event delivery and back-pressure you should use
EventStore.subscribe_to_stream/4
.
Notification message
Events will be sent to the subscriber, in batches, as {:events, events}
where events is a collection of EventStore.RecordedEvent
structs.
Example
{:ok, subscription} = EventStore.subscribe(stream_uuid)
# receive first batch of events
receive do
{:events, events} ->
IO.puts "Received events: " <> inspect(events)
end
Create a persistent subscription to all streams.
The subscriber
process will be notified of each batch of events appended to
any stream.
subscription_name
is used to uniquely identify the subscription.subscriber
is a process that will be sent{:events, events}
notification messages.opts
is an optional map providing additional subscription configuration:start_from
is a pointer to the first event to receive. It must be one of::origin
for all events from the start of the stream (default).:current
for any new events appended to the stream after the subscription has been created.- any positive integer for an event id to receive events after that exact event.
selector
to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy valuemapper
to define a function to map each recorded event before sending to the subscriber.concurrency_limit
defines the maximum number of concurrent subscribers allowed to connect to the subscription. By default only one subscriber may connect. If too many subscribers attempt to connect to the subscription an{:error, :too_many_subscribers}
is returned.
The subscription will resume from the last acknowledged event if it already
exists. It will ignore the start_from
argument in this case.
Returns {:ok, subscription}
when subscription succeeds.
Example
{:ok, subscription} = EventStore.subscribe_to_all_streams("all_subscription", self())
# wait for the subscription confirmation
receive do
{:subscribed, ^subscription} ->
IO.puts "Successfully subscribed to all streams"
end
receive do
{:events, events} ->
IO.puts "Received events: " <> inspect(events)
# acknowledge receipt
EventStore.ack(subscription, events)
end
subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts)
View SourceCreate a persistent subscription to a single stream.
The subscriber
process will be notified of each batch of events appended to
the single stream identified by stream_uuid
.
stream_uuid
is the stream to subscribe to. Use the$all
identifier to subscribe to events from all streams.subscription_name
is used to uniquely identify the subscription.subscriber
is a process that will be sent{:events, events}
notification messages.opts
is an optional map providing additional subscription configuration:start_from
is a pointer to the first event to receive. It must be one of::origin
for all events from the start of the stream (default).:current
for any new events appended to the stream after the subscription has been created.- any positive integer for a stream version to receive events after.
selector
to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy value.mapper
to define a function to map each recorded event before sending to the subscriber.concurrency_limit
defines the maximum number of concurrent subscribers allowed to connect to the subscription. By default only one subscriber may connect. If too many subscribers attempt to connect to the subscription an{:error, :too_many_subscribers}
is returned.buffer_size
limits how many in-flight events will be sent to the subscriber process before acknowledgement of successful processing. This limits the number of messages sent to the subscriber and stops their message queue from getting filled with events. Defaults to one in-flight event.partition_by
is an optional function used to partition events to subscribers. It can be used to guarantee processing order when multiple subscribers have subscribed to a single subscription. The function is passed a single argument (anEventStore.RecordedEvent
struct) and must return the partition key. As an example to guarantee events for a single stream are processed serially, but different streams are processed concurrently, you could use thestream_uuid
as the partition key.by_stream = fn %EventStore.RecordedEvent{stream_uuid: stream_uuid} -> stream_uuid end {:ok, _subscription} = EventStore.subscribe_to_stream(stream_uuid, "example", self(), concurrency_limit: 10, partition_by: by_stream )
The subscription will resume from the last acknowledged event if it already
exists. It will ignore the start_from
argument in this case.
Returns {:ok, subscription}
when subscription succeeds.
Notification messages
Subscribers will initially receive a {:subscribed, subscription}
message
once the subscription has successfully subscribed.
After this message events will be sent to the subscriber, in batches, as
{:events, events}
where events is a collection of EventStore.RecordedEvent
structs.
Example
{:ok, subscription} = EventStore.subscribe_to_stream(stream_uuid, "example", self())
# wait for the subscription confirmation
receive do
{:subscribed, ^subscription} ->
IO.puts "Successfully subscribed to stream: " <> inspect(stream_uuid)
end
receive do
{:events, events} ->
IO.puts "Received events: " <> inspect(events)
# acknowledge receipt
EventStore.ack(subscription, events)
end
unsubscribe_from_all_streams(subscription_name)
View Sourceunsubscribe_from_all_streams(subscription_name :: String.t()) :: :ok
Unsubscribe an existing subscriber from all event notifications.
subscription_name
is used to identify the existing subscription process to stop.
Returns :ok
on success.
Unsubscribe an existing subscriber from event notifications.
stream_uuid
is the stream to unsubscribe from.subscription_name
is used to identify the existing subscription process to stop.
Returns :ok
on success.