gen_stage v0.11.0 GenStage.Dispatcher behaviour

This module defines the behaviour used by :producer and :producer_consumer to dispatch events.

When using a :producer or :producer_consumer, the dispatcher may be configured on init as follows:

{:producer, state, dispatcher: GenStage.BroadcastDispatcher}

Some dispatchers may require options to be given on initialization, those can be done with a tuple:

{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: 0..3}}

Elixir ships with the following dispatcher implementations:

  • GenStage.DemandDispatcher - dispatches the given batch of events to the consumer with the biggest demand in a FIFO ordering. This is the default dispatcher.

  • GenStage.BroadcastDispatcher - dispatches all events to all consumers. The demand is only sent upstream once all consumers ask for data.

  • GenStage.PartitionDispatcher - dispatches all events to a fixed amount of consumers that works as partitions according to a hash function.

Summary

Callbacks

Called every time a consumer sends demand

Called every time a subscription is cancelled or the consumer goes down

Called every time a producer wants to dispatch an event

Called on initialization with the options given on GenStage.init/1

Used to send a notification to all consumers

Called every time the producer gets a new subscriber

Callbacks

ask(demand, from, state)
ask(demand :: pos_integer, from :: {pid, reference}, state :: term) :: {:ok, actual_demand :: non_neg_integer, new_state} when new_state: term

Called every time a consumer sends demand.

The demand will always be a positive integer (more than 0). This callback must return the actual_demand as part of its return tuple. The returned demand is then sent to producers.

It is guaranteed the reference given in from points to a reference previously given in subscribe.

cancel(from, state)
cancel(from :: {pid, reference}, state :: term) :: {:ok, demand :: non_neg_integer, new_state} when new_state: term

Called every time a subscription is cancelled or the consumer goes down.

It is guaranteed the reference given in from points to a reference previously given in subscribe.

dispatch(events, length, state)
dispatch(events :: [term, ...], length :: pos_integer, state :: term) :: {:ok, leftover_events :: [term], new_state} when new_state: term

Called every time a producer wants to dispatch an event.

The events will always be a non empty list. This callback may receive more events than previously asked and therefore must return events it cannot not effectively deliver as part of its return tuple. Any leftover_events will be stored by producers in their buffer.

It is important to emphasize that leftover_events can happen in any dispatcher implementation. After all, a consumer can subscribe, ask for events and crash. Eventually the events the consumer asked will be delivered while the consumer no longer exists, meaning they must be returned as left_over events until another consumer subscribes.

It is guaranteed the reference given in from points to a reference previously given in subscribe. It is also recommended for events to be sent with Process.send/3 and the [:noconnect] option as the consumers are all monitored by the producer. For example:

Process.send(consumer, {:"$gen_consumer, {self(), consumer_ref}, events}, [:noconnect])
init(opts)
init(opts :: keyword) :: {:ok, state} when state: any

Called on initialization with the options given on GenStage.init/1.

notify(msg, state)
notify(msg :: term, state :: term) :: {:ok, new_state} when new_state: term

Used to send a notification to all consumers.

In case the dispatcher is doing buffering, notify must keep the ordering guarantees. It is recommended for the notification to be sent with Process.send/3 and the [:noconnect] option as the consumers are all monitored by the producer. For example:

Process.send(consumer, {consumer_ref, msg}, [:noconnect])
subscribe(opts, from, state)
subscribe(opts :: keyword, from :: {pid, reference}, state :: term) :: {:ok, demand :: non_neg_integer, new_state} when new_state: term

Called every time the producer gets a new subscriber.