GenStage.BroadcastDispatcher (gen_stage v1.3.2)
View SourceA dispatcher that accumulates demand from all consumers before broadcasting events to all of them.
This dispatcher guarantees that events are dispatched to all consumers without exceeding the demand of any given consumer.
The :selector option
If a producer uses GenStage.BroadcastDispatcher, its subscribers
can specify an optional :selector function that receives the event
and returns a boolean in the subscription options.
Assume producer and consumer are stages exchanging events of type
%{:key => String.t, any => any}, then by calling
GenStage.sync_subscribe(consumer,
to: producer,
selector: fn %{key: key} -> String.starts_with?(key, "foo-") end)consumer will receive only the events broadcast from producer
for which the selector function returns a truthy value.
The :selector option can be specified in sync and async subscriptions,
as well as in the :subscribe_to list in the return tuple of
GenStage.init/1. For example:
def init(:ok) do
{:consumer, :ok, subscribe_to:
[{producer, selector: fn %{key: key} -> String.starts_with?(key, "foo-") end}]}
endDemand while setting up
[Producer Consumer 1]
/ \
[Producer] - - [Consumer]
\ /
[Producer Consumer 2]When starting Producer Consumer 1 before Producer Consumer 2 (or even
regular consumers), it is the first batch of events is only delivered to
Producer Consumer 1 since Producer Consummer 2 is not registered yet.
It is therefore recommended to start the producer with
{:producer, state, demand: :accumulate}, which pauses demand in the producers,
and after all stages have been initialized, call GenStage.demand/2 to resume
the producer.