View Source Volley.PersistentSubscription (Volley v1.0.0)
A GenStage/Broadway producer for persistent subscriptions
Persistent subscriptions are a feature of EventStoreDB which offload the responsibilities of tracking processed events, back-pressure, and subscriber dispatch to the EventStoreDB server. This allows subscribers to more easily implement complex subscription strategies, such as allowing multiple subscribers across services without gossip, avoiding head-of-line blocking, and enabling concurrent and batch processing schemes and rate-limiting.
Broadway features an acknowledgement interface which integrates well with
the persistent subscription Spear.ack/3
and Spear.nack/4
system. Consumers
intending to use this producer should prefer writing handlers with
Broadway
(over GenStage
) where possible.
configuration
Configuration
:broadway?
- (default:false
) whether to emit events asBroadway.Message.t/0
messages or asSpear.Event.t/0
events.true
should be set for this option if this producer is being used in a Broadway topology,false
for use in aGenStage
pipeline. When set astrue
, this producer will set the ack interface for each message toVolley.PersistentSubscription.Acknowledger
with the proper connection details for sending acks and nacks. Whenfalse
, theSpear.Event.metadata.subscription
field will be replaced with aVolley.PersistentSubscription.Subscription.t/0
struct with any necessary connection details.:subscriptions
- (default:[]
) a list of subscription configurations. Broadway does not currently allow more than one producer in a topology, however one may wish to subscribe to multiple persistent subscriptions, potentially across EventStoreDBs. Since back-pressure is controlled on the EventStoreDB side, a handler may specify multiple subscriptions in a single producer without any special considerations. The schema of each subscription is as follows:connection
- (required) aSpear.Connection.t/0
process which can either be specified as a PID or any validGenServer.name/0
:stream_name
- (required) the EventStoreDB stream:group_name
- (required) the EventStoreDB group name:opts
- (default:[]
) options to pass toSpear.connect_to_persistent_subscription/5
. The main use of this options field is to configure the:buffer_size
option (default:1
). The:buffer_size
option controls the number of events allowed to be sent to this producer before any events are acknowledged.
Remaining options are passed to GenStage.start_link/3
and the
{:producer, state, opts}
tuple in GenStage.init/1
.
examples
Examples
defmodule MyHandler do
use Broadway
alias Broadway.Message
def start_link(_opts) do
subscription_opts = [
broadway?: true,
subscriptions: [
[
stream_name: "MyStream",
group_name: inspect(__MODULE__),
connection: MyApp.SpearClient,
opts: [
# 10 events allowed in-flight at a time
buffer_size: 10
]
]
]
]
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {Volley.PersistentSubscription, subscription_opts}
],
processors: [
default: [concurrency: 2]
],
batchers: [
default: [concurrency: 1, batch_size: 5]
]
)
end
@impl Broadway
def handle_message(:default, %Message{} = message, _context) do
message
|> Message.put_batcher(:default)
end
@impl Broadway
def handle_batch(:default, messages, _batch_info, context) do
# do something batchy with messages...
end
end