View Source Volley.PersistentSubscription (Volley v1.0.0)

A GenStage/Broadway producer for persistent subscriptions

Persistent subscriptions are a feature of EventStoreDB which offload the responsibilities of tracking processed events, back-pressure, and subscriber dispatch to the EventStoreDB server. This allows subscribers to more easily implement complex subscription strategies, such as allowing multiple subscribers across services without gossip, avoiding head-of-line blocking, and enabling concurrent and batch processing schemes and rate-limiting.

Broadway features an acknowledgement interface which integrates well with the persistent subscription Spear.ack/3 and Spear.nack/4 system. Consumers intending to use this producer should prefer writing handlers with Broadway (over GenStage) where possible.

configuration

Configuration

  • :broadway? - (default: false) whether to emit events as Broadway.Message.t/0 messages or as Spear.Event.t/0 events. true should be set for this option if this producer is being used in a Broadway topology, false for use in a GenStage pipeline. When set as true, this producer will set the ack interface for each message to Volley.PersistentSubscription.Acknowledger with the proper connection details for sending acks and nacks. When false, the Spear.Event.metadata.subscription field will be replaced with a Volley.PersistentSubscription.Subscription.t/0 struct with any necessary connection details.

  • :subscriptions - (default: []) a list of subscription configurations. Broadway does not currently allow more than one producer in a topology, however one may wish to subscribe to multiple persistent subscriptions, potentially across EventStoreDBs. Since back-pressure is controlled on the EventStoreDB side, a handler may specify multiple subscriptions in a single producer without any special considerations. The schema of each subscription is as follows

    • :connection - (required) a Spear.Connection.t/0 process which can either be specified as a PID or any valid GenServer.name/0

    • :stream_name - (required) the EventStoreDB stream

    • :group_name - (required) the EventStoreDB group name

    • :opts - (default: []) options to pass to Spear.connect_to_persistent_subscription/5. The main use of this options field is to configure the :buffer_size option (default: 1). The :buffer_size option controls the number of events allowed to be sent to this producer before any events are acknowledged.

Remaining options are passed to GenStage.start_link/3 and the {:producer, state, opts} tuple in GenStage.init/1.

examples

Examples

defmodule MyHandler do
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    subscription_opts = [
      broadway?: true,
      subscriptions: [
        [
          stream_name: "MyStream",
          group_name: inspect(__MODULE__),
          connection: MyApp.SpearClient,
          opts: [
            # 10 events allowed in-flight at a time
            buffer_size: 10
          ]
        ]
      ]
    ]

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {Volley.PersistentSubscription, subscription_opts}
      ],
      processors: [
        default: [concurrency: 2]
      ],
      batchers: [
        default: [concurrency: 1, batch_size: 5]
      ]
    )
  end

  @impl Broadway
  def handle_message(:default, %Message{} = message, _context) do
    message
    |> Message.put_batcher(:default)
  end

  @impl Broadway
  def handle_batch(:default, messages, _batch_info, context) do
    # do something batchy with messages...
  end
end