gen_stage v1.0.0 ConsumerSupervisor behaviour View Source
A supervisor that starts children as events flow in.
A ConsumerSupervisor
can be used as the consumer in a GenStage
pipeline.
A new child process will be started per event, where the event is appended
to the arguments in the child specification.
A ConsumerSupervisor
can be attached to a producer by returning
:subscribe_to
from init/1
or explicitly with GenStage.sync_subscribe/3
and GenStage.async_subscribe/2
.
Once subscribed, the supervisor will ask the producer for :max_demand
events
and start child processes as events arrive. As child processes terminate, the
supervisor will accumulate demand and request more events once :min_demand
is reached. This allows the ConsumerSupervisor
to work similar to a pool,
except a child process is started per event. The minimum amount of concurrent
children per producer is specified by :min_demand
and the maximum is given
by :max_demand
.
Example
Let's define a GenStage consumer as a ConsumerSupervisor
that subscribes
to a producer named Producer
and starts a new process for each event
received from the producer. Each new process will be started by calling
Printer.start_link/1
, which simply starts a task that will print the
incoming event to the terminal.
defmodule Consumer do
use ConsumerSupervisor
def start_link(arg) do
ConsumerSupervisor.start_link(__MODULE__, arg)
end
def init(_arg) do
# Note: By default the restart for a child is set to :permanent
# which is not supported in ConsumerSupervisor. You need to explicitly
# set the :restart option either to :temporary or :transient.
children = [%{id: Printer, start: {Printer, :start_link, []}, restart: :transient}]
opts = [strategy: :one_for_one, subscribe_to: [{Producer, max_demand: 50}]]
ConsumerSupervisor.init(children, opts)
end
end
Then in the Printer
module:
defmodule Printer do
def start_link(event) do
# Note: this function must return the format of `{:ok, pid}` and like
# all children started by a Supervisor, the process must be linked
# back to the supervisor (if you use [`Task.start_link/1`](https://hexdocs.pm/elixir/Task.html#start_link/1) then both
# these requirements are met automatically)
Task.start_link(fn ->
IO.inspect({self(), event})
end)
end
end
Similar to Supervisor
, ConsumerSupervisor
also provides start_link/3
,
which allows developers to start a supervisor with the help of a callback
module.
Name Registration
A supervisor is bound to the same name registration rules as a GenServer
.
Read more about it in the GenServer
docs.
Link to this section Summary
Functions
Returns a map containing count values for the supervisor.
Receives a template to initialize and a set of options.
Starts a child in the consumer supervisor.
Starts a supervisor with the given children.
Starts a consumer supervisor module with the given args
.
Terminates the given child pid.
Returns a list with information about all children.
Callbacks
Callback invoked to start the supervisor and during hot code upgrades.
Link to this section Types
option()
View Sourceoption() :: {:registry, atom()} | {:name, Supervisor.name()} | {:strategy, Supervisor.Spec.strategy()} | {:max_restarts, non_neg_integer()} | {:max_seconds, non_neg_integer()} | {:subscribe_to, [GenStage.stage() | {GenStage.stage(), keyword()}]}
Options used by the start*
functions
Link to this section Functions
count_children(supervisor)
View Sourcecount_children(Supervisor.supervisor()) :: %{ specs: non_neg_integer(), active: non_neg_integer(), supervisors: non_neg_integer(), workers: non_neg_integer() }
Returns a map containing count values for the supervisor.
The map contains the following keys:
:specs
- always1
as consumer supervisors have a single specification:active
- the count of all actively running child processes managed by this supervisor:supervisors
- the count of all supervisors whether or not the child process is still alive:workers
- the count of all workers, whether or not the child process is still alive
Receives a template to initialize and a set of options.
This is typically invoked at the end of the init/1
callback of module-based supervisors.
This function returns a the child specification and the supervisor flags.
Examples
Using the child specification changes introduced in Elixir 1.5:
defmodule MyConsumerSupervisor do
use ConsumerSupervisor
def start_link(arg) do
ConsumerSupervisor.start_link(__MODULE__, arg)
end
def init(_arg) do
ConsumerSupervisor.init([MyConsumer], strategy: :one_for_one, subscribe_to: MyProducer)
end
end
start_child(supervisor, args)
View Sourcestart_child(Supervisor.supervisor(), [term()]) :: Supervisor.on_start_child()
Starts a child in the consumer supervisor.
The child process will be started by appending the given list of
args
to the existing function arguments in the child specification.
This child is started separately from any producer and does not count towards the demand of any of them.
If the child process starts, function returns {:ok, child}
or
{:ok, child, info}
, the pid is added to the supervisor, and the
function returns the same value.
If the child process start function returns :ignore
, an error tuple,
or an erroneous value, or if it fails, the child is discarded and
:ignore
or {:error, error}
where error
is a term containing
information about the error is returned.
start_link(children, options)
View Sourcestart_link([Supervisor.Spec.spec()], [option()]) :: Supervisor.on_start()
start_link(module(), any()) :: Supervisor.on_start()
Starts a supervisor with the given children.
A strategy is required to be given as an option. Furthermore,
the :max_restarts
, :max_seconds
, and :subscribe_to
values can be configured as described in the documentation for the
init/1
callback.
The options can also be used to register a supervisor name.
The supported values are described under the "Name Registration"
section in the GenServer
module docs.
The child processes specified in children
will be started by appending
the event to process to the existing function arguments in the child specification.
Note that the consumer supervisor is linked to the parent process
and will exit not only on crashes but also if the parent process
exits with :normal
reason.
start_link(mod, args, opts \\ [])
View Sourcestart_link(module(), any(), [option()]) :: Supervisor.on_start()
Starts a consumer supervisor module with the given args
.
To start the supervisor, the init/1
callback will be invoked in the given
module, with args
passed to it. The init/1
callback must return a
supervision specification which can be created with the help of the
Supervisor
module.
If the init/1
callback returns :ignore
, this function returns
:ignore
as well and the supervisor terminates with reason :normal
.
If it fails or returns an incorrect value, this function returns
{:error, term}
where term
is a term with information about the
error, and the supervisor terminates with reason term
.
The :name
option can also be given in order to register a supervisor
name. The supported values are described under the "Name Registration"
section in the GenServer
module docs.
terminate_child(supervisor, pid)
View Sourceterminate_child(Supervisor.supervisor(), pid()) :: :ok | {:error, :not_found}
Terminates the given child pid.
If successful, the function returns :ok
. If there is no
such pid, the function returns {:error, :not_found}
.
which_children(supervisor)
View Sourcewhich_children(Supervisor.supervisor()) :: [ {:undefined, pid() | :restarting, :worker | :supervisor, :dynamic | [module()]} ]
Returns a list with information about all children.
Note that calling this function when supervising a large number of children under low memory conditions can cause an out of memory exception.
This function returns a list of tuples containing:
id
- as defined in the child specification but is always set to:undefined
for consumer supervisorschild
- the pid of the corresponding child process or the atom:restarting
if the process is about to be restartedtype
-:worker
or:supervisor
as defined in the child specificationmodules
- as defined in the child specification
Link to this section Callbacks
init(args)
View Sourceinit(args :: term()) :: {:ok, [:supervisor.child_spec()], options :: keyword()} | :ignore
Callback invoked to start the supervisor and during hot code upgrades.
Options
:strategy
- the restart strategy option. Only:one_for_one
is supported by consumer supervisors.:max_restarts
- the maximum amount of restarts allowed in a time frame. Defaults to 3 times.:max_seconds
- the time frame in which:max_restarts
applies in seconds. Defaults to 5 seconds.:subscribe_to
- a list of producers to subscribe to. Each element represents the producer or a tuple with the producer and the subscription options, for example,[Producer]
or[{Producer, max_demand: 20, min_demand: 10}]
.