EventStore.subscribe_to_stream
subscribe_to_stream
, go back to EventStore module for more information.
subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts)
View SourceSpecs
subscribe_to_stream( stream_uuid :: String.t(), subscription_name :: String.t(), subscriber :: pid(), opts :: persistent_subscription_options() ) :: {:ok, subscription :: pid()} | {:error, :already_subscribed} | {:error, :subscription_already_exists} | {:error, :too_many_subscribers} | {:error, reason :: term()}
Create a subscription to a single stream. By default the subscription is persistent.
The subscriber
process will be notified of each batch of events appended to
the single stream identified by stream_uuid
.
stream_uuid
is the stream to subscribe to. Use the$all
identifier to subscribe to events from all streams.subscription_name
is used to uniquely identify the subscription.subscriber
is a process that will be sent{:events, events}
notification messages.opts
is an optional map providing additional subscription configuration:name
the name of the event store if provided tostart_link/1
.start_from
is a pointer to the first event to receive. It must be one of::origin
for all events from the start of the stream (default).:current
for any new events appended to the stream after the subscription has been created.- any positive integer for a stream version to receive events after.
selector
to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy value.mapper
to define a function to map each recorded event before sending to the subscriber.concurrency_limit
defines the maximum number of concurrent subscribers allowed to connect to the subscription. By default only one subscriber may connect. If too many subscribers attempt to connect to the subscription an{:error, :too_many_subscribers}
is returned.buffer_size
limits how many in-flight events will be sent to the subscriber process before acknowledgement of successful processing. This limits the number of messages sent to the subscriber and stops their message queue from getting filled with events. Defaults to one in-flight event.checkpoint_threshold
determines how frequently a checkpoint is written to the database for the subscription after events are acknowledged. Increasing the threshold will reduce the number of database writes for busy subscriptions, but means that events might be replayed when the subscription resumes if the checkpoint cannot be written. The default is to persist the checkpoint after each acknowledgement.checkpoint_after
(milliseconds) used to ensure a checkpoint is written after a period of inactivity even if the checkpoint threshold has not been met. This ensures checkpoints are consistently written during less busy periods. It is only applicable when a checkpoint threshold has been set as the default subscription behaviour is to checkpoint after each acknowledgement.partition_by
is an optional function used to partition events to subscribers. It can be used to guarantee processing order when multiple subscribers have subscribed to a single subscription. The function is passed a single argument (anEventStore.RecordedEvent
struct) and must return the partition key. As an example to guarantee events for a single stream are processed serially, but different streams are processed concurrently, you could use thestream_uuid
as the partition key.by_stream = fn %EventStore.RecordedEvent{stream_uuid: stream_uuid} -> stream_uuid end {:ok, _subscription} = EventStore.subscribe_to_stream(stream_uuid, "example", self(), concurrency_limit: 10, partition_by: by_stream )
transient
is an optional boolean flag to create a transient subscription. By default this is set tofalse
. If you want to create a transient subscription set this flag to true. Your subscription will not be persisted, so if the subscription is restarted, you will receive the events again starting fromstart_from
. An example usage are short lived event handlers that keep their state in memory but still want to have the guarantee to have received all events. It's possible to create a persistent subscription with some name, stop it and later create a transient subscription with the same name. The transient subscription will now receive all events starting fromstart_from
. If you later stop thistransient
subscription and start a persistent subscription again with the same name, you will receive the events again as if the transient subscription never existed.
The subscription will resume from the last acknowledged event if it already
exists. It will ignore the start_from
argument in this case.
Returns {:ok, subscription}
when subscription succeeds.
Notification messages
Subscribers will initially receive a {:subscribed, subscription}
message
once the subscription has successfully subscribed.
After this message events will be sent to the subscriber, in batches, as
{:events, events}
where events is a collection of EventStore.RecordedEvent
structs.
Example
{:ok, subscription} = EventStore.subscribe_to_stream(stream_uuid, "example", self())
# wait for the subscription confirmation
receive do
{:subscribed, ^subscription} ->
IO.puts "Successfully subscribed to stream: " <> inspect(stream_uuid)
end
receive do
{:events, events} ->
IO.puts "Received events: " <> inspect(events)
# acknowledge receipt
EventStore.ack(subscription, events)
end
Subscription tuning
Use the checkpoint_threshold
and checkpoint_after
options to configure how
frequently checkpoints are written to the database. By default a subscription
will persist a checkpoint after each acknowledgement. This can cause high
write load on the database for busy subscriptions which receive a large number
of events. This problem is known as write amplification where each event
written to a stream causes many additional writes as subscriptions acknowledge
processing of the event.
The checkpoint_threshold
controls how frequently checkpoints are persisted.
Increasing the threshold reduces the number of database writes. For example
using a threshold of 100 means that a checkpoint is written at most once for
every 100 events processed. The checkpoint_after
ensures that a checkpoint
will still be written after a period of inactivity even when the threshold has
not been met. This ensures bursts of event processing can be safely handled.