gen_stage v0.11.0 GenStage.Dispatcher behaviour
This module defines the behaviour used by :producer
and
:producer_consumer
to dispatch events.
When using a :producer
or :producer_consumer
, the dispatcher
may be configured on init as follows:
{:producer, state, dispatcher: GenStage.BroadcastDispatcher}
Some dispatchers may require options to be given on initialization, those can be done with a tuple:
{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: 0..3}}
Elixir ships with the following dispatcher implementations:
GenStage.DemandDispatcher
- dispatches the given batch of events to the consumer with the biggest demand in a FIFO ordering. This is the default dispatcher.GenStage.BroadcastDispatcher
- dispatches all events to all consumers. The demand is only sent upstream once all consumers ask for data.GenStage.PartitionDispatcher
- dispatches all events to a fixed amount of consumers that works as partitions according to a hash function.
Summary
Callbacks
Called every time a consumer sends demand
Called every time a subscription is cancelled or the consumer goes down
Called every time a producer wants to dispatch an event
Called on initialization with the options given on GenStage.init/1
Used to send a notification to all consumers
Called every time the producer gets a new subscriber
Callbacks
ask(demand :: pos_integer, from :: {pid, reference}, state :: term) :: {:ok, actual_demand :: non_neg_integer, new_state} when new_state: term
Called every time a consumer sends demand.
The demand will always be a positive integer (more than 0).
This callback must return the actual_demand
as part of its
return tuple. The returned demand is then sent to producers.
It is guaranteed the reference given in from
points to a
reference previously given in subscribe.
cancel(from :: {pid, reference}, state :: term) :: {:ok, demand :: non_neg_integer, new_state} when new_state: term
Called every time a subscription is cancelled or the consumer goes down.
It is guaranteed the reference given in from
points to a reference
previously given in subscribe.
dispatch(events :: [term, ...], length :: pos_integer, state :: term) :: {:ok, leftover_events :: [term], new_state} when new_state: term
Called every time a producer wants to dispatch an event.
The events will always be a non empty list. This callback may
receive more events than previously asked and therefore must
return events it cannot not effectively deliver as part of its
return tuple. Any leftover_events
will be stored by producers
in their buffer.
It is important to emphasize that leftover_events
can happen
in any dispatcher implementation. After all, a consumer can
subscribe, ask for events and crash. Eventually the events
the consumer asked will be delivered while the consumer no longer
exists, meaning they must be returned as left_over events until
another consumer subscribes.
It is guaranteed the reference given in from
points to a
reference previously given in subscribe. It is also recommended
for events to be sent with Process.send/3
and the [:noconnect]
option as the consumers are all monitored by the producer. For
example:
Process.send(consumer, {:"$gen_consumer, {self(), consumer_ref}, events}, [:noconnect])
Called on initialization with the options given on GenStage.init/1
.
Used to send a notification to all consumers.
In case the dispatcher is doing buffering, notify must keep
the ordering guarantees. It is recommended for the notification
to be sent with Process.send/3
and the [:noconnect]
option
as the consumers are all monitored by the producer. For example:
Process.send(consumer, {consumer_ref, msg}, [:noconnect])