View Source Volley.PersistentSubscription (Volley v1.0.1)
A GenStage/Broadway producer for persistent subscriptions
Persistent subscriptions are a feature of EventStoreDB which offload the responsibilities of tracking processed events, back-pressure, and subscriber dispatch to the EventStoreDB server. This allows subscribers to more easily implement complex subscription strategies, such as allowing multiple subscribers across services without gossip, avoiding head-of-line blocking, and enabling concurrent and batch processing schemes and rate-limiting.
Broadway features an acknowledgement interface which integrates well with
the persistent subscription Spear.ack/3 and Spear.nack/4 system. Consumers
intending to use this producer should prefer writing handlers with
Broadway (over GenStage) where possible.
configuration
Configuration
:broadway?- (default:false) whether to emit events asBroadway.Message.t/0messages or asSpear.Event.t/0events.trueshould be set for this option if this producer is being used in a Broadway topology,falsefor use in aGenStagepipeline. When set astrue, this producer will set the ack interface for each message toVolley.PersistentSubscription.Acknowledgerwith the proper connection details for sending acks and nacks. Whenfalse, theSpear.Event.metadata.subscriptionfield will be replaced with aVolley.PersistentSubscription.Subscription.t/0struct with any necessary connection details.:subscriptions- (default:[]) a list of subscription configurations. Broadway does not currently allow more than one producer in a topology, however one may wish to subscribe to multiple persistent subscriptions, potentially across EventStoreDBs. Since back-pressure is controlled on the EventStoreDB side, a handler may specify multiple subscriptions in a single producer without any special considerations. The schema of each subscription is as follows:connection- (required) aSpear.Connection.t/0process which can either be specified as a PID or any validGenServer.name/0:stream_name- (required) the EventStoreDB stream:group_name- (required) the EventStoreDB group name:opts- (default:[]) options to pass toSpear.connect_to_persistent_subscription/5. The main use of this options field is to configure the:buffer_sizeoption (default:1). The:buffer_sizeoption controls the number of events allowed to be sent to this producer before any events are acknowledged.
Remaining options are passed to GenStage.start_link/3 and the
{:producer, state, opts} tuple in GenStage.init/1.
examples
Examples
defmodule MyHandler do
use Broadway
alias Broadway.Message
def start_link(_opts) do
subscription_opts = [
broadway?: true,
subscriptions: [
[
stream_name: "MyStream",
group_name: inspect(__MODULE__),
connection: MyApp.SpearClient,
opts: [
# 10 events allowed in-flight at a time
buffer_size: 10
]
]
]
]
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {Volley.PersistentSubscription, subscription_opts}
],
processors: [
default: [concurrency: 2]
],
batchers: [
default: [concurrency: 1, batch_size: 5]
]
)
end
@impl Broadway
def handle_message(:default, %Message{} = message, _context) do
message
|> Message.put_batcher(:default)
end
@impl Broadway
def handle_batch(:default, messages, _batch_info, context) do
# do something batchy with messages...
end
end