ExFirebase v0.3.1 ExFirebase.Messaging.QueueProducer View Source

The Producer in the Queue GenStage pipeline. It manages a queue of all payloads waiting to be sent.

Link to this section Summary

Functions

Invoked to handle synchronous call/3 messages

Invoked to handle asynchronous cast/2 messages

Invoked on :producer stages

Invoked when the server is started

Link to this section Functions

Link to this function add(payload) View Source
add(map() | [map()]) :: :ok
Link to this function handle_call(atom, from, queue) View Source

Invoked to handle synchronous call/3 messages.

call/3 will block until a reply is received (unless the call times out or nodes are disconnected).

request is the request message sent by a call/3, from is a two-element tuple containing the caller’s PID and a term that uniquely identifies the call, and state is the current state of the GenStage.

Returning {:reply, reply, [events], new_state} sends the response reply to the caller after events are dispatched (or buffered) and continues the loop with new state new_state. In case you want to deliver the reply before processing events, use reply/2 and return {:noreply, [event], state}.

Returning {:noreply, [event], new_state} does not send a response to the caller and processes the given events before continuing the loop with new state new_state. The response must be sent with reply/2.

Hibernating is also supported as an atom to be returned from either :reply and :noreply tuples.

Returning {:stop, reason, reply, new_state} stops the loop and terminate/2 is called with reason reason and state new_state. Then the reply is sent as the response to the call and the process exits with reason reason.

Returning {:stop, reason, new_state} is similar to {:stop, reason, reply, new_state} except that no reply is sent to the caller.

If this callback is not implemented, the default implementation by use GenStage will return {:stop, {:bad_call, request}, state}.

Callback implementation for GenStage.handle_call/3.

Invoked to handle asynchronous cast/2 messages.

request is the request message sent by a cast/2 and state is the current state of the GenStage.

Returning {:noreply, [event], new_state} dispatches the events and continues the loop with new state new_state.

Returning {:noreply, [event], new_state, :hibernate} is similar to {:noreply, new_state} except the process is hibernated before continuing the loop. See the return values for GenServer.handle_call/3 for more information on hibernation.

Returning {:stop, reason, new_state} stops the loop and terminate/2 is called with the reason reason and state new_state. The process exits with reason reason.

If this callback is not implemented, the default implementation by use GenStage will return {:stop, {:bad_cast, request}, state}.

Callback implementation for GenStage.handle_cast/2.

Link to this function handle_demand(demand, queue) View Source

Invoked on :producer stages.

This callback is invoked on :producer stages with the demand from consumers/dispatcher. The producer that implements this callback must either store the demand, or return the amount of requested events.

Must always be explicitly implemented by :producer stages.

Examples

def handle_demand(demand, state) do
  # We check if we're able to satisfy the demand and fetch
  # events if we aren't.
  events =
    if length(state.events) >= demand do
      state.events
    else
      # fetch_events()
    end

  # We dispatch only the requested number of events.
  {to_dispatch, remaining} = Enum.split(events, demand)

  {:noreply, to_dispatch, %{state | events: remaining}}
end

Callback implementation for GenStage.handle_demand/2.

Invoked when the server is started.

start_link/3 (or start/3) will block until this callback returns. args is the argument term (second argument) passed to start_link/3 (or start/3).

In case of successful start, this callback must return a tuple where the first element is the stage type, which is one of:

  • :producer
  • :consumer
  • :producer_consumer (if the stage is acting as both)

For example:

def init(args) do
  {:producer, some_state}
end

The returned tuple may also contain 3 or 4 elements. The third element may be the :hibernate atom or a set of options defined below.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling terminate/2.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling terminate/2.

Options

This callback may return options. Some options are specific to the chosen stage type while others are shared across all types.

:producer options

  • :demand - when :forward, the demand is always forwarded to the c:handle_demand/2 callback. When :accumulate, demand is accumulated until its mode is set to :forward via demand/2. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed. Defaults to :forward.

:producer and :producer_consumer options

  • :buffer_size - the size of the buffer to store events without demand. Can be :infinity to signal no limit on the buffer size. Check the “Buffer events” section of the module documentation. Defaults to 10_000 for :producer, :infinity for :producer_consumer.

  • :buffer_keep - returns whether the :first or :last entries should be kept on the buffer in case the buffer size is exceeded. Defaults to :last.

  • :dispatcher - the dispatcher responsible for handling demands. Defaults to GenStage.DemandDispatch. May be either an atom representing a dispatcher module or a two-element tuple with the dispatcher module and the dispatcher options.

:consumer and :producer_consumer options

  • :subscribe_to - a list of producers to subscribe to. Each element represents either the producer module or a tuple with the producer module and the subscription options (as defined in sync_subscribe/2).

Callback implementation for GenStage.init/1.