Behaviours: gen_server.
Stages are data-exchange steps that send and/or receive data from other stages.
When a stage sends data, it acts as a producer. When it receives data, it acts as a consumer. Stages may take both producer and consumer roles at once.
## Stage types
Besieds taking both producer and consumer roles, a stage may be called "source" if it only produces items or called "sink" if it only cousumes items.
For example, imagine the stages below where A sends data to B that sends data to C:
[A] -> [B] -> [C]
we conclude that:
* A is only a producer (and therefore a source) * B is both producer and consumer * C is only a consumer (and therefore a sink)
As we will see in the upcoming Examples section, we must specify the type of the stage when we implement each of them.
To start the flow of events, we subscribe consumers to producers. Once the communication channel between them is established, consumers will ask the producers for events. We typically say the consumer is sending demand upstream. Once demand arrives, the producer will emit items, never emitting more items than the consumer asked for. This provides a back-pressure mechanism.
A consumer may have multiple producers and a producer may have multiple consumers. When a consumer asks for data, each producer is handled separately, with its own demand. When a producer receives demand and sends data to multiple consumers, the demand is tracked and the events are sent by a dispatcher. This allows producers to send data using different "strategies". See gen_stage_dispatcher for more information.
Many developers tend to create layers of stages, such as A, B and C, for achieving concurrency. If all you want is concurrency, starting multiple instances of the same stage is enough. Layers in gen_stage must be created when there is a need for back-pressure or to route the data in different ways.
For example, if you need the data to go over multiple steps but without a need for back-pressure or without a need to break the data apart, do not design it as such:
[Producer] -> [Step 1] -> [Step 2] -> [Step 3]
Instead it is better to design it as:
[Consumer] / [Producer]-<-[Consumer] \ [Consumer]
where "Consumer" are multiple processes running the same code that subscribe to the same "Producer".debug() = trace | log | statistics | {log_to_file, iolist() | binary()}
from() = {pid(), subscription_tag()}
name() = {local, atom()} | {global, term()} | {via, module(), term()}
on_start() = {ok, pid()} | ignore | {error, {already_started, pid()} | term()}
option() = {debug, [debug()]} | {timeout, timeout()} | {spawn_opt, [term()]} | {hibernate_after, timeout()}
options() = [option()]
server() = pid() | atom() | {global, term()} | {via, module(), term()} | {atom(), node()}
stage() = pid() | atom() | {global, term()} | {via, module(), term()} | {atom(), node()}
subscription_option() = {cancel, permanent | transient | temporary} | {to, server()} | {min_demand, integer()} | {max_demand, integer()} | {atom(), term()}
subscription_options() = [subscription_option()]
subscription_tag() = reference()
| ask/2 | Asks the given demand to the producer. |
| ask/3 | |
| async_info/2 | Asynchronously queues an info message that is delivered after all currently buffered events. |
| async_resubscribe/4 |
Cancels SubscriptionTag with Reason and resubscribe
to the same stage with the given options. |
| async_subscribe/2 | Asks the consumer to subscribe to the given producer asynchronously. |
| call/2 |
Makes a synchronous call to the stage and waits for its reply. |
| call/3 | |
| cancel/2 | Cancels the given subscription on the producer. |
| cancel/3 | |
| cast/2 |
Sends an asynchronous request to the stage. |
| code_change/3 | |
| consumer_receive/4 | |
| consumer_subscribe/4 | |
| demand/1 | Returns the demand mode for a producer. |
| demand/2 | Sets the demand mode for a producer. |
| estimate_buffered_count/1 | Returns the estimated number of buffered items for a producer. |
| estimate_buffered_count/2 | |
| handle_call/3 | |
| handle_cast/2 | |
| handle_info/2 | |
| init/1 | |
| reply/2 | Replies to a client. |
| start/2 |
Starts a gen_stage process without links (outside of a supervision tree). |
| start/3 | |
| start/4 | |
| start_link/2 |
Starts a gen_stage process linked to the current process. |
| start_link/3 | |
| start_link/4 | |
| stop/1 |
Stops the stage with the given reason. |
| stop/3 | |
| sync_info/2 | Queues an info message that is delivered after all currently buffered events. |
| sync_info/3 | |
| sync_resubscribe/4 |
Cancels SubscriptionTag with Reason and resubscribe
to the same stage with the given options. |
| sync_resubscribe/5 | |
| sync_subscribe/2 | Asks the consumer to subscribe to the given producer synchronously. |
| sync_subscribe/3 | |
| terminate/2 |
ask(ProducerSubscription::from(), Demand::non_neg_integer()) -> ok | noconnect | nosuspend
Asks the given demand to the producer.
ProducerSubscription is the subscription this demand will be asked on; this
term could be for example stored in the stage when received in
handle_subscribe/4.
0`, this function simply returns `ok`
without asking for data.
This function must only be used in the cases when a consumer
sets a subscription to `manual mode in the handle_subscribe/4`
callback.
It accepts the same options as `erlang:send/3`, and returns the same value as
`erlang:send/3.
ask(X1::from(), Demand::non_neg_integer(), Opts::[noconnect | nosuspend]) -> ok | noconnect | nosuspend
async_info(Stage::stage(), Msg::term()) -> ok
Asynchronously queues an info message that is delivered after all currently buffered events.
If the stage is a consumer, it does not have buffered events, so the message is queued immediately.
This call returnsok regardless if the info has been successfully
queued or not. It is typically called from the stage itself.
async_resubscribe(Stage::stage(), SubscriptionTag::subscription_tag(), Reason::term(), Opts::subscription_options()) -> ok
Cancels SubscriptionTag with Reason and resubscribe
to the same stage with the given options.
This is useful in case you need to update the options in which you are currently subscribed to in a producer.
This function is async, which means it always returns
ok once the request is dispatched but without waiting
for its completion.
Options
This function accepts the same options assync_subscribe/2.
async_subscribe(Stage::stage(), Opts::subscription_options()) -> ok
Asks the consumer to subscribe to the given producer asynchronously.
This function is async, which means it always returns
ok once the request is dispatched but without waiting
for its completion. This particular function is usually
called from a stage's init/1 callback.
Options
This function accepts the same options assync_subscribe/2.
call(Stage::stage(), Request::term()) -> term()
Makes a synchronous call to the stage and waits for its reply.
The client sends the given request to the stage and waits until a
reply arrives or a timeout occurs. handle_call/3 will be called on
the stage to handle the request.
stage can be any of the values described in the "Name registration"
section of the documentation for this module.
Timeouts
timeout is an integer greater than zero which specifies how many
milliseconds to wait for a reply, or the atom infinity to wait
indefinitely. The default value is 5000. If no reply is received
within the specified time, the function call fails and the caller
exits. If the caller catches the failure and continues running, and
the stage is just late with the reply, such reply may arrive at any
time later into the caller's message queue. The caller must in this
case be prepared for this and discard any such garbage messages that
are two-element tuples with a reference as the first element.
call(Stage::stage(), Request::term(), Timeout::timeout()) -> term()
cancel(ProducerSubscription::from(), Reason::term()) -> ok | noconnect | nosuspend
Cancels the given subscription on the producer.
The second argument is the cancellation reason. Once the
producer receives the request, a confirmation may be
forwarded to the consumer (although there is no guarantee
as the producer may crash for unrelated reasons before).
The consumer will react to the cancellation according to
the cancel option given when subscribing. For example:
gen_stage:cancel({Pid, Subscription}, shutdown)
will cause the consumer to crash if the cancel given
when subscribing is permanent (the default) but it
won't cause a crash in other modes. See the options in
sync_subscribe/3 for more information.
cancel operation is an asynchronous request. The
third argument are same options as erlang:send/3,
allowing you to pass noconnect or nosuspend which
is useful when working across nodes. This function returns
the same value as erlang:send/3.
cancel(ProducerSubscription::from(), Reason::term(), Opts::[noconnect | nosuspend]) -> ok | noconnect | nosuspend
cast(Stage::stage(), Request::term()) -> ok
Sends an asynchronous request to the stage.
This function always returns ok regardless of whether
the destination stage (or node) exists. Therefore it
is unknown whether the destination stage successfully
handled the message.
handle_cast/2 will be called on the stage to handle
the request. In case the stage is on a node which is
not yet connected to the caller one, the call is going to
block until a connection happens.
code_change(OldVsn, Stage, Extra) -> any()
consumer_receive(From, X2, Events, Stage) -> any()
consumer_subscribe(Cancel, To, Opts, Stage) -> any()
demand(Stage::stage()) -> forward | accumulate
Returns the demand mode for a producer.
It is eitherforward or accumulate. See demand/2.
demand(Stage::stage(), Mode::forward | accumulate) -> ok
Sets the demand mode for a producer.
When forward, the demand is always forwarded to the handle_demand
callback. When accumulate, demand is accumulated until its mode is
set to forward. This is useful as a synchronization mechanism, where
the demand is accumulated until all consumers are subscribed. Defaults
to forward.
estimate_buffered_count(Stage) -> any()
Returns the estimated number of buffered items for a producer.
estimate_buffered_count(Stage::stage(), Timeout::non_neg_integer()) -> non_neg_integer()
handle_call(Msg, From, Stage) -> any()
handle_cast(Msg, Stage) -> any()
handle_info(Msg, Stage) -> any()
init(X1) -> any()
reply(X1, Reply) -> any()
Replies to a client.
This function can be used to explicitly send a reply to a client that
called call/3 when the reply cannot be specified in the return value
of handle_call/3.
client must be the from argument (the second argument) accepted by
handle_call/3 callbacks. reply is an arbitrary term which will be
given back to the client as the return value of the call.
Note that reply/2 can be called from any process, not just the gen_stage`
that originally received the call (as long as that `gen_stage communicated the
from argument somehow).
ok.
start(Mod::module(), Args::term()) -> on_start()
Starts a gen_stage process without links (outside of a supervision tree).
start_link/3 for more information.
start(Mod::module(), Args::term(), Options::options()) -> on_start()
start(Name::name(), Mod::module(), Args::term(), Options::options()) -> on_start()
start_link(Mod::module(), Args::term()) -> on_start()
Starts a gen_stage process linked to the current process.
This is often used to start the gen_stage as part of a supervision tree.
Once the server is started, the init/1 function of the given Mod is
called with Args as its arguments to initialize the stage. To ensure a
synchronized start-up procedure, this function does not return until init/1
has returned.
gen_stage started with start_link/3 is linked to the
parent process and will exit in case of crashes from the parent. The gen_stage
will also exit due to the normal reason in case it is configured to trap
exits in the init/1 callback.
start_link(Mod::module(), Args::term(), Options::options()) -> on_start()
start_link(Name::name(), Mod::module(), Args::term(), Options::options()) -> on_start()
stop(Stage) -> any()
Stops the stage with the given reason.
terminate/2 callback of the given stage will be invoked before
exiting. This function returns ok if the server terminates with the
given reason; if it terminates with another reason, the call exits.
This function keeps OTP semantics regarding error reporting.
If the reason is any other than normal, shutdown or
{shutdown, _}, an error report is logged.
stop(Stage::stage(), Reason::term(), Timeout::timeout()) -> ok
sync_info(Stage::stage(), Msg::term()) -> ok
Queues an info message that is delivered after all currently buffered events.
This call is synchronous and will return after the stage has queued
the info message. The message will be eventually handled by the
handle_info/2 callback.
If the stage is a consumer, it does not have buffered events, so the messaged is queued immediately.
This function will returnok if the info message is successfully queued.
sync_info(Stage::stage(), Msg::term(), Timeout::timeout()) -> ok
sync_resubscribe(Stage::stage(), SubscriptionTag::subscription_tag(), Reason::term(), Opts::subscription_options()) -> {ok, subscription_tag()} | {error, not_a_consumer} | {error, {bad_opts, iolist() | binary()}}
Cancels SubscriptionTag with Reason and resubscribe
to the same stage with the given options.
This is useful in case you need to update the options in which you are currently subscribed to in a producer.
This function is sync, which means it will wait until the subscription message is sent to the producer, although it won't wait for the subscription confirmation.
Seesync_subscribe/2 for options and more information.
sync_resubscribe(Stage::stage(), SubscriptionTag::subscription_tag(), Reason::term(), Opts::subscription_options(), Timeout::timeout()) -> {ok, subscription_tag()} | {error, not_a_consumer} | {error, {bad_opts, iolist() | binary()}}
sync_subscribe(Stage::stage(), Opts::subscription_options()) -> {ok, subscription_tag()} | {error, not_a_consumer} | {error, {bad_opts, iolist() | binary()}}
Asks the consumer to subscribe to the given producer synchronously.
This call is synchronous and will return after the called consumer
sends the subscribe message to the producer. It does not, however,
wait for the subscription confirmation. Therefore this function
will return before handle_subscribe/4 is called in the consumer.
In other words, it guarantees the message was sent, but it does not
guarantee a subscription has effectively been established.
{ok, subscription_tag} as long as the
subscription message is sent. It will return {error, not_a_consumer}
when the stage is not a consumer. subscription_tag is the second element
of the two-element tuple that will be passed to handle_subscribe/4.
sync_subscribe(Stage::stage(), Opts::subscription_options(), Timeout::timeout()) -> {ok, subscription_tag()} | {error, not_a_consumer} | {error, {bad_opts, iolist() | binary()}}
terminate(Reason, Stage) -> any()
Generated by EDoc