gen_stage v0.12.2 ConsumerSupervisor behaviour View Source

A supervisor that starts children as events flow in.

A ConsumerSupervisor can be used as the consumer in a GenStage pipeline. A new child process will be started per event, where the event is appended to the arguments in the child specification.

A ConsumerSupervisor can be attached to a producer by returning :subscribe_to from init/1 or explicitly with GenStage.sync_subscribe/3 and GenStage.async_subscribe/2.

Once subscribed, the supervisor will ask the producer for max_demand events and start child processes as events arrive. As child processes terminate, the supervisor will accumulate demand and request more events once min_demand is reached. This allows the ConsumerSupervisor to work similar to a pool, except a child process is started per event. The minimum amount of concurrent children per producer is specified by min_demand and the maximum is given by max_demand.

Example

Let’s define a GenStage consumer as a ConsumerSupervisor that subscribes to a producer named Producer and starts a new process for each event received from the producer. Each new process will be started by calling Printer.start_link/1, which simply starts a task that will print the incoming event to the terminal.

defmodule Consumer do
  def start_link() do
    children = [Printer]
    opts = [strategy: :one_for_one, subscribe_to: [{Producer, max_demand: 50}]]
    ConsumerSupervisor.start_link(children, opts)
  end
end

Then in the Printer module:

defmodule Printer do
  use Task

  def start_link(event) do
    Task.start_link(fn ->
      IO.inspect {self(), event}
    end)
  end
end

Similar to Supervisor, ConsumerSupervisor also provides start_link/3, which allows developers to start a supervisor with the help of a callback module.

Name Registration

A supervisor is bound to the same name registration rules as a GenServer. Read more about it in the GenServer docs.

Link to this section Summary

Types

Options used by the start* functions

Functions

Returns a map containing count values for the supervisor

Invoked when the server is started

Receives a template to initialize and a set of options

Starts a child in the consumer supervisor

Starts a supervisor with the given children

Starts a consumer supervisor module with the given arg

Terminates the given child pid

Returns a list with information about all children

Callbacks

Callback invoked to start the supervisor and during hot code upgrades

Link to this section Types

Link to this type option() View Source
option ::
  {:registry, atom} |
  {:name, Supervisor.name} |
  {:strategy, Supervisor.Spec.strategy} |
  {:max_restarts, non_neg_integer} |
  {:max_seconds, non_neg_integer} |
  {:max_dynamic, non_neg_integer | :infinity} |
  {:subscribe_to, [GenStage.stage | {GenStage.stage, keyword}]}

Options used by the start* functions

Link to this section Functions

Link to this function count_children(supervisor) View Source
count_children(Supervisor.supervisor) :: %{specs: non_neg_integer, active: non_neg_integer, supervisors: non_neg_integer, workers: non_neg_integer}

Returns a map containing count values for the supervisor.

The map contains the following keys:

  • :specs - always 1 as consumer supervisors have a single specification

  • :active - the count of all actively running child processes managed by this supervisor

  • :supervisors - the count of all supervisors whether or not the child process is still alive

  • :workers - the count of all workers, whether or not the child process is still alive

Link to this function format_status(arg1, list) View Source

The same as GenServer.format_status/2.

Callback implementation for GenStage.format_status/2.

Invoked when the server is started.

start_link/3 (or start/3) will block until this callback returns. args is the argument term (second argument) passed to start_link/3 (or start/3).

In case of successful start, this callback must return a tuple where the first element is the stage type, which is one of:

  • :producer
  • :consumer
  • :producer_consumer (if the stage is acting as both)

For example:

def init(args) do
  {:producer, some_state}
end

The returned tuple may also contain 3 or 4 elements. The third element may be the :hibernate atom or a set of options defined below.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop or calling terminate/2.

Returning {:stop, reason} will cause start_link/3 to return {:error, reason} and the process to exit with reason reason without entering the loop or calling terminate/2.

Options

This callback may return options. Some options are specific to the chosen stage type while others are shared across all types.

:producer options

  • :demand - when :forward, the demand is always forwarded to the c:handle_demand/2 callback. When :accumulate, demand is accumulated until its mode is set to :forward via demand/2. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed. Defaults to :forward.

:producer and :producer_consumer options

  • :buffer_size - the size of the buffer to store events without demand. Can be :infinity to signal no limit on the buffer size. Check the “Buffer events” section of the module documentation. Defaults to 10_000 for :producer, :infinity for :producer_consumer.

  • :buffer_keep - returns whether the :first or :last entries should be kept on the buffer in case the buffer size is exceeded. Defaults to :last.

  • :dispatcher - the dispatcher responsible for handling demands. Defaults to GenStage.DemandDispatch. May be either an atom representing a dispatcher module or a two-element tuple with the dispatcher module and the dispatcher options.

:consumer and :producer_consumer options

  • :subscribe_to - a list of producers to subscribe to. Each element represents either the producer module or a tuple with the producer module and the subscription options (as defined in sync_subscribe/2).

Callback implementation for GenStage.init/1.

Receives a template to initialize and a set of options.

This is typically invoked at the end of the init/1 callback of module-based supervisors.

This function returns a the child specification and the supervisor flags.

Link to this function start_child(supervisor, args) View Source

Starts a child in the consumer supervisor.

The child process will be started by appending the given list of args to the existing function arguments in the child specification.

This child is started separately from any producer and does not count towards the demand of any of them.

If the child process starts, function returns {:ok, child} or {:ok, child, info}, the pid is added to the supervisor and the function returns the same value.

If the child process starts, function returns ignore, an error tuple or an erroneous value, or if it fails, the child is discarded and :ignore or {:error, error} where error is a term containing information about the error is returned.

Starts a supervisor with the given children.

A strategy is required to be given as an option. Furthermore, the :max_restarts, :max_seconds, and :subscribe_to values can be configured as described in the documentation for the init/1 callback.

The options can also be used to register a supervisor name. The supported values are described under the Name Registration section in the GenServer module docs.

Note that the consumer supervisor is linked to the parent process and will exit not only on crashes but also if the parent process exits with :normal reason.

Link to this function start_link(mod, args, opts \\ []) View Source
start_link(module, any, [option]) :: Supervisor.on_start

Starts a consumer supervisor module with the given arg.

To start the supervisor, the init/1 callback will be invoked in the given module, with arg passed to it. The init/1 callback must return a supervision specification which can be created with the help of the Supervisor.Spec module.

If the init/1 callback returns :ignore, this function returns :ignore as well and the supervisor terminates with reason :normal. If it fails or returns an incorrect value, this function returns {:error, term} where term is a term with information about the error, and the supervisor terminates with reason term.

The :name option can also be given in order to register a supervisor name, the supported values are described under the Name Registration section in the GenServer module docs.

Link to this function terminate_child(supervisor, pid) View Source
terminate_child(Supervisor.supervisor, pid) ::
  :ok |
  {:error, :not_found}

Terminates the given child pid.

If successful, the function returns :ok. If there is no such pid, the function returns {:error, :not_found}.

Link to this function which_children(supervisor) View Source
which_children(Supervisor.supervisor) :: [{:undefined, pid | :restarting, Supervisor.Spec.worker, Supervisor.Spec.modules}]

Returns a list with information about all children.

Note that calling this function when supervising a large number of children under low memory conditions can cause an out of memory exception.

This function returns a list of tuples containing:

  • id - as defined in the child specification but is always set to :undefined for consumer supervisors

  • child - the pid of the corresponding child process or the atom :restarting if the process is about to be restarted

  • type - :worker or :supervisor as defined in the child specification

  • modules - as defined in the child specification

Link to this section Callbacks

Link to this callback init(args) View Source
init(args :: term) ::
  {:ok, [:supervisor.child_spec], options :: keyword} |
  :ignore

Callback invoked to start the supervisor and during hot code upgrades.

Options

  • :strategy - the restart strategy option. Only :one_for_one is supported by consumer supervisors.

  • :max_restarts - the maximum amount of restarts allowed in a time frame. Defaults to 3 times.

  • :max_seconds - the time frame in which :max_restarts applies in seconds. Defaults to 5 seconds.

  • :subscribe_to - a list of producers to subscribe to. Each element represents the producer or a tuple with the producer and the subscription options. e.g. [Producer] or [{Producer, max_demand: 10, min_demand: 20}]