EventStore.subscribe_to_stream

You're seeing just the callback subscribe_to_stream, go back to EventStore module for more information.
Link to this callback

subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts)

View Source

Specs

subscribe_to_stream(
  stream_uuid :: String.t(),
  subscription_name :: String.t(),
  subscriber :: pid(),
  opts :: persistent_subscription_options()
) ::
  {:ok, subscription :: pid()}
  | {:error, :already_subscribed}
  | {:error, :subscription_already_exists}
  | {:error, :too_many_subscribers}
  | {:error, reason :: term()}

Create a subscription to a single stream. By default the subscription is persistent.

The subscriber process will be notified of each batch of events appended to the single stream identified by stream_uuid.

  • stream_uuid is the stream to subscribe to. Use the $all identifier to subscribe to events from all streams.

  • subscription_name is used to uniquely identify the subscription.

  • subscriber is a process that will be sent {:events, events} notification messages.

  • opts is an optional map providing additional subscription configuration:

    • name the name of the event store if provided to start_link/1.
    • start_from is a pointer to the first event to receive. It must be one of:
      • :origin for all events from the start of the stream (default).
      • :current for any new events appended to the stream after the subscription has been created.
      • any positive integer for a stream version to receive events after.
    • selector to define a function to filter each event, i.e. returns only those elements for which fun returns a truthy value.
    • mapper to define a function to map each recorded event before sending to the subscriber.
    • concurrency_limit defines the maximum number of concurrent subscribers allowed to connect to the subscription. By default only one subscriber may connect. If too many subscribers attempt to connect to the subscription an {:error, :too_many_subscribers} is returned.
    • buffer_size limits how many in-flight events will be sent to the subscriber process before acknowledgement of successful processing. This limits the number of messages sent to the subscriber and stops their message queue from getting filled with events. Defaults to one in-flight event.
    • checkpoint_threshold determines how frequently a checkpoint is written to the database for the subscription after events are acknowledged. Increasing the threshold will reduce the number of database writes for busy subscriptions, but means that events might be replayed when the subscription resumes if the checkpoint cannot be written. The default is to persist the checkpoint after each acknowledgement.
    • checkpoint_after (milliseconds) used to ensure a checkpoint is written after a period of inactivity even if the checkpoint threshold has not been met. This ensures checkpoints are consistently written during less busy periods. It is only applicable when a checkpoint threshold has been set as the default subscription behaviour is to checkpoint after each acknowledgement.
    • partition_by is an optional function used to partition events to subscribers. It can be used to guarantee processing order when multiple subscribers have subscribed to a single subscription. The function is passed a single argument (an EventStore.RecordedEvent struct) and must return the partition key. As an example to guarantee events for a single stream are processed serially, but different streams are processed concurrently, you could use the stream_uuid as the partition key.
        by_stream = fn %EventStore.RecordedEvent{stream_uuid: stream_uuid} -> stream_uuid end
        {:ok, _subscription} =
          EventStore.subscribe_to_stream(stream_uuid, "example", self(),
            concurrency_limit: 10,
            partition_by: by_stream
          )
    • transient is an optional boolean flag to create a transient subscription. By default this is set to false. If you want to create a transient subscription set this flag to true. Your subscription will not be persisted, so if the subscription is restarted, you will receive the events again starting from start_from. An example usage are short lived event handlers that keep their state in memory but still want to have the guarantee to have received all events. It's possible to create a persistent subscription with some name, stop it and later create a transient subscription with the same name. The transient subscription will now receive all events starting from start_from. If you later stop this transient subscription and start a persistent subscription again with the same name, you will receive the events again as if the transient subscription never existed.

The subscription will resume from the last acknowledged event if it already exists. It will ignore the start_from argument in this case.

Returns {:ok, subscription} when subscription succeeds.

Notification messages

Subscribers will initially receive a {:subscribed, subscription} message once the subscription has successfully subscribed.

After this message events will be sent to the subscriber, in batches, as {:events, events} where events is a collection of EventStore.RecordedEvent structs.

Example

{:ok, subscription} = EventStore.subscribe_to_stream(stream_uuid, "example", self())

# wait for the subscription confirmation
receive do
  {:subscribed, ^subscription} ->
    IO.puts "Successfully subscribed to stream: " <> inspect(stream_uuid)
end

receive do
  {:events, events} ->
    IO.puts "Received events: " <> inspect(events)

    # acknowledge receipt
    EventStore.ack(subscription, events)
end

Subscription tuning

Use the checkpoint_threshold and checkpoint_after options to configure how frequently checkpoints are written to the database. By default a subscription will persist a checkpoint after each acknowledgement. This can cause high write load on the database for busy subscriptions which receive a large number of events. This problem is known as write amplification where each event written to a stream causes many additional writes as subscriptions acknowledge processing of the event.

The checkpoint_threshold controls how frequently checkpoints are persisted. Increasing the threshold reduces the number of database writes. For example using a threshold of 100 means that a checkpoint is written at most once for every 100 events processed. The checkpoint_after ensures that a checkpoint will still be written after a period of inactivity even when the threshold has not been met. This ensures bursts of event processing can be safely handled.