Express v1.3.3 Express.PushRequests.Consumer View Source

GenStage consumer. Retrieves push requests from Express.PushRequests.Buffer, gets a proper worker from one of poolboy pools and sends a push message via it.

Maximum demand (size of a bunch) depends on the number of schedulers available for BEAM VM on the current machine: System.schedulers_online() * 5

The default multiplier (5) can be adjusted via config:

config :express,
  buffer: [
    consumer_demand_multiplier: 10

Link to this section Summary


Invoked on :producer_consumer and :consumer stages to handle events

Invoked when the server is started

Link to this section Functions

Link to this function handle_events(push_requests, from, state) View Source

Invoked on :producer_consumer and :consumer stages to handle events.

Must always be explicitly implemented by such types.

Return values are the same as c:handle_cast/2.

Callback implementation for GenStage.handle_events/3.

Invoked when the server is started.

start_link/3 (or start/3) will block until this callback returns. args is the argument term (second argument) passed to start_link/3 (or start/3).

In case of successful start, this callback must return a tuple where the first element is the stage type, which is one of:

  • :producer
  • :consumer
  • :producer_consumer (if the stage is acting as both)

For example:

def init(args) do
  {:producer, some_state}

The returned tuple may also contain 3 or 4 elements. The third element may be the :hibernate atom or a set of options defined below.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling terminate/2.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling terminate/2.


This callback may return options. Some options are specific to the chosen stage type while others are shared across all types.

:producer options

  • :demand - when :forward, the demand is always forwarded to the c:handle_demand/2 callback. When :accumulate, demand is accumulated until its mode is set to :forward via demand/2. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed. Defaults to :forward.

:producer and :producer_consumer options

  • :buffer_size - the size of the buffer to store events without demand. Can be :infinity to signal no limit on the buffer size. Check the “Buffer events” section of the module documentation. Defaults to 10_000 for :producer, :infinity for :producer_consumer.

  • :buffer_keep - returns whether the :first or :last entries should be kept on the buffer in case the buffer size is exceeded. Defaults to :last.

  • :dispatcher - the dispatcher responsible for handling demands. Defaults to GenStage.DemandDispatch. May be either an atom representing a dispatcher module or a two-element tuple with the dispatcher module and the dispatcher options.

:consumer and :producer_consumer options

  • :subscribe_to - a list of producers to subscribe to. Each element represents either the producer module or a tuple with the producer module and the subscription options (as defined in sync_subscribe/2).

Callback implementation for GenStage.init/1.