gen_stage v0.11.0 GenStage behaviour
Stages are computation steps that send and/or receive data from other stages.
When a stage sends data, it acts as a producer. When it receives data, it acts as a consumer. Stages may take both producer and consumer roles at once.
Stage types
Besides taking both producer and consumer roles, a stage may be called “source” if it only produces items or called “sink” if it only consumes items.
For example, imagine the stages below where A sends data to B that sends data to C:
[A] -> [B] -> [C]
we conclude that:
- A is only a producer (and therefore a source)
- B is both producer and consumer
- C is only a consumer (and therefore a sink)
As we will see in the upcoming Examples section, we must specify the type of the stage when we implement each of them.
To start the flow of events, we subscribe consumers to producers. Once the communication channel between them is established, consumers will ask the producers for events. We typically say the consumer is sending demand upstream. Once demand arrives, the producer will emit items, never emitting more items than the consumer asked for. This provides a back-pressure mechanism.
A consumer may have multiple producers and a producer may have
multiple consumers. When a consumer asks for data, each producer
is handled separately, with its own demand. When a producer
receives demand and sends data to multiple consumers, the demand
is tracked and the events are sent by a dispatcher. This allows
producers to send data using different “strategies”. See
GenStage.Dispatcher
for more information.
Example
Let’s define the simple pipeline below:
[A] -> [B] -> [C]
where A is a producer that will emit items starting from 0, B is a producer-consumer that will receive those items and multiply them by a given number and C will receive those events and print them to the terminal.
Let’s start with A. Since A is a producer, its main
responsibility is to receive demand and generate events.
Those events may be in memory or an external queue system.
For simplicity, let’s implement a simple counter starting
from a given value of counter
received on init/1
:
defmodule A do
use GenStage
def start_link(number) do
GenStage.start_link(A, number)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
B is a producer-consumer. This means it does not explicitly handle the demand because the demand is always forwarded to its producer. Once A receives the demand from B, it will send events to B which will be transformed by B as desired. In our case, B will receive events and multiply them by a number given on initialization and stored as the state:
defmodule B do
use GenStage
def start_link(number) do
GenStage.start_link(B, number)
end
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
events = Enum.map(events, & &1 * number)
{:noreply, events, number}
end
end
C will finally receive those events and print them every second to the terminal:
defmodule C do
use GenStage
def start_link() do
GenStage.start_link(C, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Wait for a second.
:timer.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
Now we can start and connect them:
{:ok, a} = A.start_link(0) # starting from zero
{:ok, b} = B.start_link(2) # multiply by 2
{:ok, c} = C.start_link() # state does not matter
GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)
Notice we typically subscribe from bottom to top. Since A will start producing items only when B connects to it, we want this subscription to happen when the whole pipeline is ready. After you subscribe all of them, demand will start flowing upstream and events downstream.
When implementing consumers, we often set the :max_demand
and
:min_demand
on subscription. The :max_demand
specifies the
maximum amount of events that must be in flow while the :min_demand
specifies the minimum threshold to trigger for more demand. For
example, if :max_demand
is 1000 and :min_demand
is 500
(the default values), the consumer will ask for 1000 events initially
and ask for more only after it receives at least 500.
In the example above, B is a :producer_consumer
and therefore
acts as a buffer. Getting the proper demand values in B is
important: making the buffer too small may make the whole pipeline
slower, making the buffer too big may unnecessarily consume
memory.
When such values are applied to the stages above, it is easy to see the producer works in batches. The producer A ends-up emitting batches of 50 items which will take approximately 50 seconds to be consumed by C, which will then request another batch of 50 items.
init
and subscribe_to
In the example above, we have started the processes A, B and C
independently and subscribed them later on. But most often it is
simpler to subscribe a consumer to its producer on its init/1
callback. This way, if the consumer crashes, restarting the consumer
will automatically re-invoke its init/1
callback and resubscribe
it to the supervisor.
This approach works as long as the producer can be referenced when
the consumer starts—such as by name (for a named process) or by pid
for a running unnamed process. For example, assuming the process
A
and B
are started as follows:
# Let's call the stage in module A as A
GenStage.start_link(A, 0, name: A)
# Let's call the stage in module B as B
GenStage.start_link(B, 2, name: B)
# No need to name consumers as they won't be subscribed to
GenStage.start_link(C, :ok)
We can now change the init/1
callback for C to the following:
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [B]}
end
Or:
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [{B, options}]}
end
And we will no longer need to call sync_subscribe/2
.
Another advantage of this approach is that it makes it straight-forward to leverage concurrency by simply starting multiple consumers that subscribe to its producer (or producer_consumer). This can be done in the example above by simply calling start link multiple times:
# Start 4 consumers
GenStage.start_link(C, :ok)
GenStage.start_link(C, :ok)
GenStage.start_link(C, :ok)
GenStage.start_link(C, :ok)
In a supervision tree, this is often done by starting multiple workers:
children = [
worker(A, [0]),
worker(B, [2]),
worker(C, []),
worker(C, []),
worker(C, []),
worker(C, [])
]
Supervisor.start_link(children, strategy: :one_for_one)
In fact, multiple consumers is often the easiest and simplest way to
leverage concurrency in a GenStage pipeline, especially if events can
be processed out of order. For example, imagine a scenario where you
have a stream of incoming events and you need to access a number of
external services per event. Instead of building complex stages that
route events through those services, one simple mechanism to leverage
concurrency is to start a producer and N consumers and invoke the external
services directly for each event in each consumer. N is typically the
number of cores (as returned by System.schedulers_online/0
) but can
likely be increased if the consumers are mostly waiting on IO.
Another alternative to the scenario above, is to use a ConsumerSupervisor
for consuming the events instead of N consumers. The ConsumerSupervisor
will start a separate supervised process per event in a way you have at
most max_demand
children and the average amount of children is
(max_demand - min_demand) / 2
.
Buffering
In many situations, producers may attempt to emit events while no consumers
have yet subscribed. Similarly, consumers may ask producers for events
that are not yet available. In such cases, it is necessary for producers
to buffer events until a consumer is available or buffer the consumer
demand until events arrive, respectively. As we will see next, buffering
events can be done automatically by GenStage
, while buffering the demand
is a case that must be explicitly considered by developers implementing
producers.
Buffering events
Due to the concurrent nature of Elixir software, sometimes a producer
may dispatch events without consumers to send those events to. For example,
imagine a :consumer
B subscribes to :producer
A. Next, the consumer B
sends demand to A, which uses to start producing events. Now, if the
consumer B crashes, the producer may attempt to dispatch the now produced
events but it no longer has a consumer to send those events to. In such
cases, the producer will automatically buffer the events until another
consumer subscribes.
The buffer can also be used in cases external sources only send events in batches larger than asked for. For example, if you are receiving events from an external source that only sends events in batches of 1000 and the internal demand is smaller than that, the buffer allows you to always emit batches of 1000 events even when the consumer has asked for less.
In all of those cases when an event cannot be sent immediately by
a producer, the event will be automatically stored and sent the next
time consumers ask for events. The size of the buffer is configured
via the :buffer_size
option returned by init/1
and the default
value is 10000. If the buffer_size
is exceeded, an error is logged.
Buffering demand
In case consumers send demand and the producer is not yet ready to fill in the demand, producers must buffer the demand until data arrives.
As an example, let’s implement a producer that broadcasts messages to consumers. For producers, we need to consider two scenarios:
- what if events arrive and there are no consumers?
- what if consumers send demand and there are not enough events?
One way to implement such a broadcaster is to simply rely on the internal
buffer available in GenStage
, dispatching events as they arrive, as explained
in the previous section:
defmodule Broadcaster do
use GenStage
@doc "Starts the broadcaster."
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(pid, event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
def init(:ok) do
{:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, _from, state) do
{:reply, :ok, [event], state} # Dispatch immediately
end
def handle_demand(_demand, state) do
{:noreply, [], state} # We don't care about the demand
end
end
By always sending events as soon as they arrive, if there is any demand,
we will serve the existing demand, otherwise the event will be queued in
GenStage
’s internal buffer. In case events are being queued and not being
consumed, a log message will be emitted when we exceed the :buffer_size
configuration.
While the implementation above is enough to solve the constraints above,
a more robust implementation would have tighter control over the events
and demand by tracking this data locally, leaving the GenStage
internal
buffer only for cases where consumers crash without consuming all data.
To handle such cases, we will make the broadcaster state a tuple with two elements: a queue and the pending demand. When events arrive and there are no consumers, we store the event in the queue alongside the process information that broadcasted the event. When consumers send demand and there are not enough events, we increase the pending demand. Once we have both the data and the demand, we acknowledge the process that has sent the event to the broadcaster and finally broadcast the event downstream.
defmodule QueueBroadcaster do
use GenStage
@doc "Starts the broadcaster."
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
## Callbacks
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, {from, event}}, queue} ->
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end
Let’s also implement a consumer that automatically subscribes to the
broadcaster on init/1
. The advantage of doing so on initialization
is that, if the consumer crashes while it is supervised, the subscription
is automatically reestablished when the supervisor restarts it.
defmodule Printer do
use GenStage
@doc "Starts the consumer."
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
# Starts a permanent subscription to the broadcaster
# which will automatically start requesting items.
{:consumer, :ok, subscribe_to: [QueueBroadcaster]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect {self(), event}
end
{:noreply, [], state}
end
end
With the broadcaster in hand, now let’s start the producer as well as multiple consumers:
# Start the producer
QueueBroadcaster.start_link()
# Start multiple consumers
Printer.start_link()
Printer.start_link()
Printer.start_link()
Printer.start_link()
At this point, all consumers must have sent their demand which we were
not able to fulfill. Now by calling sync_notify
, the event shall be
broadcasted to all consumers at once as we have buffered the demand in
the producer:
QueueBroadcaster.sync_notify(:hello_world)
If we had called QueueBroadcaster.sync_notify(:hello_world)
before any
consumer was available, the event would also be buffered in our own
queue and served only when demand is received.
By having control over the demand and queue, the Broadcaster
has
full control on how to behave when there are no consumers, when the
queue grows too large, and so forth.
Asynchronous work and handle_subscribe
Both producer_consumer and consumer have been designed to do their
work in the handle_events/3
callback. This means that, after
handle_events/3
is invoked, both producer_consumer and consumer
will immediately send demand upstream and ask for more items, as
it assumes events have been fully processed by handle_events/3
.
Such default behaviour makes producer_consumer and consumer
unfeasible for doing asynchronous work. However, given GenStage
was designed to run with multiple consumers, it is not a problem
to perform synchronous or blocking actions inside handle_events/3
as you can then start multiple consumers in order to max both CPU
and IO usage as necessary.
On the other hand, if you must perform some work asynchronously,
GenStage
comes with an option that manually controls how demand
is sent upstream, avoiding the default behaviour where demand is
sent after handle_events/3
. Such can be done by implementing
the handle_subscribe/4
callback and returning {:manual, state}
instead of the default {:automatic, state}
. Once the producer mode
is set to :manual
, developers must use GenStage.ask/3
to send
demand upstream when necessary.
For example, the ConsumerSupervisor
module processes events
asynchronously by starting child process and such is done by
manually sending demand to producers. The ConsumerSupervisor
can be used to distribute work to a limited amount of
processes, behaving similar to a pool where a new process is
started per event. The minimum amount of concurrent children per
producer is specified by min_demand
and the maximum
is given
by max_demand
. See the ConsumerSupervisor
docs for more
information.
Setting the demand to :manual
in handle_subscribe/4
is not
only useful for asynchronous work but also for setting up other
mechanisms for back-pressure. As an example, let’s implement a
consumer that is allowed to process a limited number of events
per time interval. Those are often called rate limiters:
defmodule RateLimiter do
use GenStage
def init(_) do
# Our state will keep all producers and their pending demand
{:consumer, %{}}
end
def handle_subscribe(:producer, opts, from, producers) do
# We will only allow max_demand events every 5000 miliseconds
pending = opts[:max_demand] || 1000
interval = opts[:interval] || 5000
# Register the producer in the state
producers = Map.put(producers, from, {pending, interval})
# Ask for the pending events and schedule the next time around
producers = ask_and_schedule(producers, from)
# Returns manual as we want control over the demand
{:manual, producers}
end
def handle_cancel(_, from, producers) do
# Remove the producers from the map on unsubscribe
{:noreply, [], Map.delete(producers, from)}
end
def handle_events(events, from, producers) do
# Bump the amount of pending events for the given producer
producers = Map.update!(producers, from, fn {pending, interval} ->
{pending + length(events), interval}
end)
# Consume the events by printing them.
IO.inspect(events)
# A producer_consumer would return the processed events here.
{:noreply, [], producers}
end
def handle_info({:ask, from}, producers) do
# This callback is invoked by the Process.send_after/3 message below.
{:noreply, [], ask_and_schedule(producers, from)}
end
defp ask_and_schedule(producers, from) do
case producers do
%{^from => {pending, interval}} ->
# Ask for any pending events
GenStage.ask(from, pending)
# And let's check again after interval
Process.send_after(self(), {:ask, from}, interval)
# Finally, reset pending events to 0
Map.put(producers, from, {0, interval})
%{} ->
producers
end
end
end
With the RateLimiter
implemented, let’s subscribe it to the
producer we have implemented at the beginning of the module
documentation:
{:ok, a} = GenStage.start_link(A, 0)
{:ok, b} = GenStage.start_link(RateLimiter, :ok)
# Ask for 10 items every 2 seconds
GenStage.sync_subscribe(b, to: a, max_demand: 10, interval: 2000)
Although the rate limiter above is a consumer, it could be made a
producer_consumer by changing init/1
to return a :producer_consumer
and then forwarding the events in handle_events/3
.
Notifications
GenStage
also supports the ability to send notifications to all
consumers. Those notifications are sent as regular messages outside
of the demand-driven protocol but respecting the event ordering.
See sync_notify/3
and async_notify/2
.
Notifications are useful for out-of-band information, for example, to notify consumers the producer has sent all events it had to process or that a new batch of events is starting.
Note the notification system should not be used for broadcasting
events, for such, consider using GenStage.BroadcastDispatcher
.
Callbacks
GenStage
is implemented on top of a GenServer
with two additions.
Besides exposing all of the GenServer
callbacks, it also provides
handle_demand/2
to be implemented by producers and handle_events/3
to be implemented by consumers, as shown above. Furthermore, all the
callback responses have been modified to potentially emit events.
See the callbacks documentation for more information.
By adding use GenStage
to your module, Elixir will automatically
define all callbacks for you except the following:
init/1
- must be implemented to choose between:producer
,:consumer
or:producer_consumer
handle_demand/2
- must be implemented by:producer
typeshandle_events/3
- must be implemented by:producer_consumer
and:consumer
types
Although this module exposes functions similar to the ones found in
the GenServer
API, like call/3
and cast/2
, developers can also
rely directly on GenServer functions such as GenServer.multi_call/4
and GenServer.abcast/3
if they wish to.
Name Registration
GenStage
is bound to the same name registration rules as a GenServer
.
Read more about it in the GenServer
docs.
Message-protocol overview
This section will describe the message-protocol implemented by stages. By documenting these messages, we will allow developers to provide their own stage implementations.
Back-pressure
When data is sent between stages, it is done by a message protocol that provides back-pressure. The first step is for the consumer to subscribe to the producer. Each subscription has a unique reference.
Once subscribed, the consumer may ask the producer for messages for the given subscription. The consumer may demand more items whenever it wants to. A consumer must never receive more data than it has asked for from any given producer stage.
A consumer may have multiple producers, where each demand is
managed individually. A producer may have multiple consumers,
where the demand and events are managed and delivered according
to a GenStage.Dispatcher
implementation.
Producer messages
The producer is responsible for sending events to consumers based on demand.
{:"$gen_producer", from :: {consumer_pid, subscription_tag}, {:subscribe, current, options}}
- sent by the consumer to the producer to start a new subscription.Before sending, the consumer MUST monitor the producer for clean-up purposes in case of crashes. The
subscription_tag
is unique to identify the subscription. It is typically the subscriber monitoring reference although it may be any term.Once sent, the consumer MAY immediately send demand to the producer. The
subscription_tag
is unique to identify the subscription.The
current
field, when not nil, is a two-item tuple containing a subscription that must be cancelled with the given reason before the current one is accepted.Once received, the producer MUST monitor the consumer. However, if the subscription reference is known, it MUST send a
:cancel
message to the consumer instead of monitoring and accepting the subscription.{:"$gen_producer", from :: {pid, subscription_tag}, {:cancel, reason}}
- sent by the consumer to cancel a given subscription.Once received, the producer MUST send a
:cancel
reply to the registered consumer (which may not necessarily be the one received in the tuple above). Keep in mind, however, there is no guarantee such messages can be delivered in case the producer crashes before. If the pair is unknown, the producer MUST send an appropriate cancel reply.{:"$gen_producer", from :: {pid, subscription_tag}, {:ask, count}}
- sent by consumers to ask data in a given subscription.Once received, the producer MUST send data up to the demand. If the pair is unknown, the producer MUST send an appropriate cancel reply.
Consumer messages
The consumer is responsible for starting the subscription and sending demand to producers.
{:"$gen_consumer", from :: {producer_pid, subscription_tag}, {:notification, msg}}
- notifications sent by producers.{:"$gen_consumer", from :: {producer_pid, subscription_tag}, {:cancel, reason}}
- sent by producers to cancel a given subscription.It is used as a confirmation for client cancellations OR whenever the producer wants to cancel some upstream demand.
{:"$gen_consumer", from :: {producer_pid, subscription_tag}, [event]}
- events sent by producers to consumers.subscription_tag
identifies the subscription. The third argument is a non-empty list of events. If the subscription is unknown, the events must be ignored and a cancel message sent to the producer.
Summary
Functions
Asks the given demand to the producer
Asks the producer to send a notification to all consumers asynchronously
Cancels ref
with reason
and subscribe asynchronously in one step
Asks the consumer to subscribe to the given producer asynchronously
Makes a synchronous call to the stage
and waits for its reply
Cancels the given subscription on the producer
Sends an asynchronous request to the stage
Sets the demand mode for a producer
Starts a producer stage from an enumerable (or stream)
Replies to a client
Starts a GenStage
process without links (outside of a supervision tree)
Starts a GenStage
process linked to the current process
Stops the stage with the given reason
Creates a stream that subscribes to the given producers and emits the appropriate messages
Asks the producer to send a notification to all consumers synchronously
Cancels ref
with reason
and subscribe synchronously in one step
Asks the consumer to subscribe to the given producer synchronously
Callbacks
The same as GenServer.format_status/2
Invoked when a consumer is no longer subscribed to a producer
Invoked to handle asynchronous cast/2
messages
Invoked on :producer stages
Invoked on :producer_consumer and :consumer stages to handle events
Invoked to handle all other messages
Invoked when a consumer subscribes to a producer
Invoked when the server is started
The same as GenServer.terminate/2
Types
The stage reference
Functions
Asks the given demand to the producer.
The demand is a non-negative integer with the amount of events to
ask a producer for. If the demand is 0, it simply returns :ok
without asking for data.
This function must only be used in the rare cases when a consumer
sets a subscription to :manual
mode in the handle_subscribe/4
callback.
It accepts the same options as Process.send/3
.
Asks the producer to send a notification to all consumers asynchronously.
The given message will be delivered in the format
{{producer_pid, subscription_tag}, msg}
, where msg
is the message
given below.
This call returns :ok
regardless if the notification has been
received by the producer or sent. It is typically called from
the producer stage itself.
async_resubscribe(stage, ref :: term, reason :: term, opts :: keyword) :: :ok
Cancels ref
with reason
and subscribe asynchronously in one step.
See async_subscribe/2
for examples and options.
Asks the consumer to subscribe to the given producer asynchronously.
This call returns :ok
regardless if the subscription
effectively happened or not. It is typically called from
a stage’s init/1
callback.
Options
:cancel
-:permanent
(default) or:temporary
. When permanent, the consumer exits when the producer cancels or exits. In case of exits, the same reason is used to exit the consumer. In case of cancellations, the reason is wrapped in a:cancel
tuple.:min_demand
- the minimum demand for this subscription:max_demand
- the maximum demand for this subscription
All other options are sent as is to the producer stage.
Makes a synchronous call to the stage
and waits for its reply.
The client sends the given request
to the server and waits until a reply
arrives or a timeout occurs. handle_call/3
will be called on the stage
to handle the request.
stage
can be any of the values described in the “Name registration”
section of the documentation for this module.
Timeouts
timeout
is an integer greater than zero which specifies how many
milliseconds to wait for a reply, or the atom :infinity
to wait
indefinitely. The default value is 5000
. If no reply is received within
the specified time, the function call fails and the caller exits. If the
caller catches the failure and continues running, and the stage is just late
with the reply, it may arrive at any time later into the caller’s message
queue. The caller must in this case be prepared for this and discard any such
garbage messages that are two-element tuples with a reference as the first
element.
Cancels the given subscription on the producer.
Once the producer receives the request, a confirmation may be forwarded to the consumer (although there is no guarantee as the producer may crash for unrelated reasons before). This is an asynchronous request.
It accepts the same options as Process.send/3
.
Sends an asynchronous request to the stage
.
This function always returns :ok
regardless of whether
the destination stage
(or node) exists. Therefore it
is unknown whether the destination stage
successfully
handled the message.
handle_cast/2
will be called on the stage to handle
the request. In case the stage
is on a node which is
not yet connected to the caller one, the call is going to
block until a connection happens.
Sets the demand mode for a producer.
When :forward
, the demand is always forwarded to the handle_demand
callback. When :accumulate
, demand is accumulated until its mode is
set to :forward
. This is useful as a synchronization mechanism, where
the demand is accumulated until all consumers are subscribed. Defaults
to :forward
.
This command is asynchronous.
Starts a producer stage from an enumerable (or stream).
This function will start a stage linked to the current process that will take items from the enumerable when there is demand. Since streams are enumerables, we can also pass streams as arguments (in fact, streams are the most common argument to this function).
The enumerable is consumed in batches, retrieving max_demand
items the first time and then max_demand - min_demand
the
next times. Therefore, for streams that cannot produce items
that fast, it is recommended to pass a lower :max_demand
value as an option.
When the enumerable finishes or halts, a notification is sent
to all consumers in the format of
{{pid, subscription_tag}, {:producer, :halted | :done}}
. If the
stage is meant to terminate when there are no more consumers, we
recommend setting the :consumers
option to :permanent
.
Keep in mind that streams that require the use of the process inbox to work most likely won’t behave as expected with this function since the mailbox is controlled by the stage process itself.
Options
:link
- when false, does not link the stage to the current process. Defaults totrue
:consumers
- when:permanent
, the stage exits when there are no more consumers. Defaults to:temporary
:dispatcher
- the dispatcher responsible for handling demands. Defaults toGenStage.DemandDispatch
. May be either an atom or a tuple with the dispatcher and the dispatcher options:demand
- configures the demand to:forward
or:accumulate
mode. Seeinit/1
anddemand/2
for more information.
All other options that would be given for start_link/3
are
also accepted.
Replies to a client.
This function can be used to explicitly send a reply to a client that
called call/3
when the reply cannot be specified in the return value
of handle_call/3
.
client
must be the from
argument (the second argument) accepted by
handle_call/3
callbacks. reply
is an arbitrary term which will be given
back to the client as the return value of the call.
Note that reply/2
can be called from any process, not just the GenServer
that originally received the call (as long as that GenServer
communicated the
from
argument somehow).
This function always returns :ok
.
Examples
def handle_call(:reply_in_one_second, from, state) do
Process.send_after(self(), {:reply, from}, 1_000)
{:noreply, [], state}
end
def handle_info({:reply, from}, state) do
GenStage.reply(from, :one_second_has_passed)
end
Starts a GenStage
process without links (outside of a supervision tree).
See start_link/3
for more information.
Starts a GenStage
process linked to the current process.
This is often used to start the GenStage
as part of a supervision tree.
Once the server is started, the init/1
function of the given module
is
called with args
as its arguments to initialize the stage. To ensure a
synchronized start-up procedure, this function does not return until init/1
has returned.
Note that a GenStage
started with start_link/3
is linked to the
parent process and will exit in case of crashes from the parent. The GenStage
will also exit due to the :normal
reasons in case it is configured to trap
exits in the init/1
callback.
Options
:name
- used for name registration as described in the “Name registration” section of the module documentation:timeout
- if present, the server is allowed to spend the given amount of milliseconds initializing or it will be terminated and the start function will return{:error, :timeout}
:debug
- if present, the corresponding function in the:sys
module is invoked:spawn_opt
- if present, its value is passed as options to the underlying process as inProcess.spawn/4
Return values
If the server is successfully created and initialized, this function returns
{:ok, pid}
, where pid
is the pid of the server. If a process with the
specified server name already exists, this function returns
{:error, {:already_started, pid}}
with the pid of that process.
If the init/1
callback fails with reason
, this function returns
{:error, reason}
. Otherwise, if it returns {:stop, reason}
or :ignore
, the process is terminated and this function returns
{:error, reason}
or :ignore
, respectively.
Stops the stage with the given reason
.
The terminate/2
callback of the given stage
will be invoked before
exiting. This function returns :ok
if the server terminates with the
given reason; if it terminates with another reason, the call exits.
This function keeps OTP semantics regarding error reporting.
If the reason is any other than :normal
, :shutdown
or
{:shutdown, _}
, an error report is logged.
Creates a stream that subscribes to the given producers and emits the appropriate messages.
It expects a list of producers to subscribe to. Each element
represents the producer or a tuple with the producer and the
subscription options as defined in sync_subscribe/2
. Once
all producers are subscribed to, their demand is automatically
set to :forward
mode. See the :demand
and :producers
options below for more information.
GenStage.stream/1
will “hijack” the inbox of the process
enumerating the stream to subscribe and receive messages
from producers. However it guarantees it won’t remove or
leave unwanted messages in the mailbox after enumeration
except if one of the producers come from a remote node.
For more information, read the “Known limitations” section
below.
Options
:demand
- configures the demand to:forward
or:accumulate
mode. Seeinit/1
anddemand/2
for more information.:producers
- the processes to set the demand to:forward
on subscription. It defaults to the processes being subscribed to. Sometimes the stream is subscribing to a:producer_consumer
instead of a:producer
, in such cases, you can set this option to either an empty list or the list of actual producers so they receive the proper notification message.
Known limitations
from_enumerable
This module also provides a function called from_enumerable/2
which receives an enumerable (like a stream) and creates a stage
that emits data from the enumerable.
Given both GenStage.from_enumerable/2
and GenStage.stream/1
require the process inbox to send and receive messages, it is
impossible to run a stream/1
inside a from_enumerable/2
as
the stream/1
will never receive the messages it expects.
Remote nodes
While it is possible to stream messages from remote nodes
such should be done with care. In particular, in case of
disconnections, there is a chance the producer will send
messages after the consumer receives its DOWN messages and
those will remain in the process inbox, violating the
common scenario where GenStage.stream/1
does not pollute
the caller inbox. In such cases, it is recommended to
consume such streams from a separate process which will be
discarded after the stream is consumed.
sync_notify(stage, msg :: term, timeout) :: :ok | {:error, :not_a_producer}
Asks the producer to send a notification to all consumers synchronously.
This call is synchronous and will return after the producer has either sent the notification to all consumers or placed it in a buffer. In other words, it guarantees the producer has handled the message but not that the consumers have received it.
The given message will be delivered in the format
{{producer_pid, subscription_tag}, msg}
, where msg
is the message
given below.
This function will return :ok
as long as the notification request is
sent. It may return {:error, :not_a_producer}
in case the stage is not
a producer.
Cancels ref
with reason
and subscribe synchronously in one step.
See sync_subscribe/3
for examples and options.
Asks the consumer to subscribe to the given producer synchronously.
This call is synchronous and will return after the called consumer
sends the subscribe message to the producer. It does not, however,
wait for the subscription confirmation. Therefore this function
will return before handle_subscribe
is called in the consumer.
In other words, it guarantees the message was sent, but it does not
guarantee a subscription has effectively been established.
This function will return {:ok, ref}
as long as the subscription
message is sent. It may return {:error, :not_a_consumer}
in case
the stage is not a consumer.
Options
:cancel
-:permanent
(default) or:temporary
. When permanent, the consumer exits when the producer cancels or exits. In case of exits, the same reason is used to exit the consumer. In case of cancellations, the reason is wrapped in a:cancel
tuple.:min_demand
- the minimum demand for this subscription:max_demand
- the maximum demand for this subscription
Any other option is sent to the producer stage. This may be used by
dispatchers for custom configuration. For example, if a producer uses
a GenStage.BroadcastDispatcher
, an optional :selector
function
that receives an event and returns a boolean limits this subscription to
receiving only those events where the selector function returns a truthy
value:
GenStage.sync_subscribe(consumer,
to: producer,
selector: fn %{key: key} -> String.starts_with?(key, "foo-") end)
All other options are sent as is to the producer stage.
Callbacks
code_change(old_vsn, state :: term, extra :: term) :: {:ok, new_state :: term} | {:error, reason :: term} when old_vsn: term | {:down, term}
The same as GenServer.code_change/3
.
format_status(:normal | :terminate, [pdict :: {term, term} | state :: term, ...]) :: status :: term
The same as GenServer.format_status/2
.
handle_call(request :: term, GenServer.from, state :: term) :: {:reply, reply, [event], new_state} | {:reply, reply, [event], new_state, :hibernate} | {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term, event: term
Invoked to handle synchronous call/3
messages. call/3
will block until a
reply is received (unless the call times out or nodes are disconnected).
request
is the request message sent by a call/3
, from
is a 2-tuple
containing the caller’s PID and a term that uniquely identifies the call, and
state
is the current state of the GenStage
.
Returning {:reply, reply, [events], new_state}
sends the response reply
to the caller after events are dispatched (or buffered) and continues the
loop with new state new_state
. In case you want to deliver the reply before
the processing events, use GenStage.reply/2
and return {:noreply, [event],
state}
(see below).
Returning {:noreply, [event], new_state}
does not send a response to the
caller and processes the given events before continuing the loop with new
state new_state
. The response must be sent with reply/2
.
Hibernating is also supported as an atom to be returned from either
:reply
and :noreply
tuples.
Returning {:stop, reason, reply, new_state}
stops the loop and terminate/2
is called with reason reason
and state new_state
. Then the reply
is sent
as the response to call and the process exits with reason reason
.
Returning {:stop, reason, new_state}
is similar to
{:stop, reason, reply, new_state}
except a reply is not sent.
If this callback is not implemented, the default implementation by
use GenStage
will return {:stop, {:bad_call, request}, state}
.
handle_cancel({:cancel | :down, reason :: term}, GenServer.from, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason, new_state} when event: term, new_state: term, reason: term
Invoked when a consumer is no longer subscribed to a producer.
It receives the cancellation reason, the from
tuple and the state.
The cancel_reason
will be a {:cancel, _}
tuple if the reason for
cancellation was a GenStage.cancel/2
call. Any other value means
the cancellation reason was due to an EXIT.
If this callback is not implemented, the default implementation by
use GenStage
will return {:noreply, [], state}
.
Return values are the same as handle_cast/2
.
handle_cast(request :: term, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason :: term, new_state} when new_state: term, event: term
Invoked to handle asynchronous cast/2
messages.
request
is the request message sent by a cast/2
and state
is the current
state of the GenStage
.
Returning {:noreply, [event], new_state}
dispatches the events and continues
the loop with new state new_state
.
Returning {:noreply, [event], new_state, :hibernate}
is similar to
{:noreply, new_state}
except the process is hibernated before continuing the
loop.
Returning {:stop, reason, new_state}
stops the loop and terminate/2
is
called with the reason reason
and state new_state
. The process exits with
reason reason
.
If this callback is not implemented, the default implementation by
use GenStage
will return {:stop, {:bad_cast, request}, state}
.
handle_demand(demand :: pos_integer, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason, new_state} when new_state: term, reason: term, event: term
Invoked on :producer stages.
Must always be explicitly implemented by :producer
types.
It is invoked with the demand from consumers/dispatcher. The
producer must either store the demand or return the events requested.
handle_events([event], GenServer.from, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason, new_state} when new_state: term, reason: term, event: term
Invoked on :producer_consumer and :consumer stages to handle events.
Must always be explicitly implemented by such types.
Return values are the same as handle_cast/2
.
handle_info(msg :: term, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} | {:stop, reason :: term, new_state} when new_state: term, event: term
Invoked to handle all other messages.
msg
is the message and state
is the current state of the GenStage
. When
a timeout occurs the message is :timeout
.
If this callback is not implemented, the default implementation by
use GenStage
will return {:noreply, [], state}
.
Return values are the same as handle_cast/2
.
handle_subscribe(:producer | :consumer, options, to_or_from :: GenServer.from, state :: term) :: {:automatic | :manual, new_state} | {:stop, reason, new_state} when new_state: term, reason: term
Invoked when a consumer subscribes to a producer.
This callback is invoked in both producers and consumers.
For consumers, successful subscriptions must return {:automatic, new_state}
or {:manual, state}
. The default is to return :automatic
, which means
the stage implementation will take care of automatically sending demand to
producers. :manual
must be used when a special behaviour is desired
(for example, ConsumerSupervisor
uses :manual
demand) and demand must
be sent explicitly with ask/2
. The manual subscription must be cancelled
when handle_cancel/3
is called.
For producers, successful subscriptions must always return
{:automatic, new_state}
, the :manual
mode is not supported.
If this callback is not implemented, the default implementation by
use GenStage
will return {:automatic, state}
.
Invoked when the server is started.
start_link/3
(or start/3
) will block until it returns. args
is the argument term (second argument) passed to start_link/3
.
In case of successful start, this callback must return a tuple
where the first element is the stage type, which is either
a :producer
, :consumer
or :producer_consumer
if it is
taking both roles.
For example:
def init(args) do
{:producer, some_state}
end
The returned tuple may also contain 3 or 4 elements. The third
element may be the :hibernate
atom or a set of options defined
below.
Returning :ignore
will cause start_link/3
to return :ignore
and the process will exit normally without entering the loop or
calling terminate/2
.
Returning {:stop, reason}
will cause start_link/3
to return
{:error, reason}
and the process to exit with reason reason
without entering the loop or calling terminate/2
.
Options
This callback may return options. Some options are specific to the stage type while others are shared across all types.
:producer options
:demand
- when:forward
, the demand is always forwarded to thehandle_demand
callback. When:accumulate
, demand is accumulated until its mode is set to:forward
viademand/2
. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed. Defaults to:forward
.
:producer and :producer_consumer options
:buffer_size
- the size of the buffer to store events without demand. Check the “Buffer events” section on the module documentation (defaults to 10000 for:producer
,:infinity
for:producer_consumer
):buffer_keep
- returns if the:first
or:last
(default) entries should be kept on the buffer in case we exceed the buffer size:dispatcher
- the dispatcher responsible for handling demands. Defaults toGenStage.DemandDispatch
. May be either an atom or a tuple with the dispatcher and the dispatcher options
:consumer and :producer_consumer options
:subscribe_to
- a list of producers to subscribe to. Each element represents the producer or a tuple with the producer and the subscription options (as defined insync_subscribe/2
)
Dispatcher
When using a :producer
or :producer_consumer
, the dispatcher
may be configured on init as follows:
{:producer, state, dispatcher: GenStage.BroadcastDispatcher}
Some dispatchers may require options to be given on initialization, those can be done with a tuple:
{:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: 0..3}}
terminate(reason, state :: term) :: term when reason: :normal | :shutdown | {:shutdown, term} | term
The same as GenServer.terminate/2
.