Express v1.3.3 Express.PushRequests.Buffer View Source
GenStage producer. Acts like a buffer for incoming push messages. Default buffer size is 5000 events. This size can be adjusted via config file:
config :express,
buffer: [
max_size: 10_000
]
Spawns number of GenStage consumers on init. Default amount of the consumers is 5. This amount can be changed in config file:
config :express,
buffer: [
consumers_count: 10
]
Link to this section Summary
Functions
Adds a push request to the buffer
Invoked on :producer
stages
Invoked when the server is started
Link to this section Functions
add(Express.PushRequests.PushRequest.t) :: :ok | {:error, any}
Adds a push request to the buffer.
Invoked on :producer
stages.
This callback is invoked on :producer
stages with the demand from
consumers/dispatcher. The producer that implements this callback must either
store the demand, or return the amount of requested events.
Must always be explicitly implemented by :producer
stages.
Examples
def handle_demand(demand, state) do
# We check if we're able to satisfy the demand and fetch
# events if we aren't.
events =
if length(state.events) >= demand do
state.events
else
# fetch_events()
end
# We dispatch only the requested number of events.
{to_dispatch, remaining} = Enum.split(events, demand)
{:noreply, to_dispatch, %{state | events: remaining}}
end
Callback implementation for GenStage.handle_demand/2
.
Invoked when the server is started.
start_link/3
(or start/3
) will block until this callback returns.
args
is the argument term (second argument) passed to start_link/3
(or start/3
).
In case of successful start, this callback must return a tuple where the first element is the stage type, which is one of:
:producer
:consumer
:producer_consumer
(if the stage is acting as both)
For example:
def init(args) do
{:producer, some_state}
end
The returned tuple may also contain 3 or 4 elements. The third
element may be the :hibernate
atom or a set of options defined
below.
Returning :ignore
will cause start_link/3
to return :ignore
and the process will exit normally without entering the loop or
calling terminate/2
.
Returning {:stop, reason}
will cause start_link/3
to return
{:error, reason}
and the process to exit with reason reason
without entering the loop or calling terminate/2
.
Options
This callback may return options. Some options are specific to the chosen stage type while others are shared across all types.
:producer
options
:demand
- when:forward
, the demand is always forwarded to thec:handle_demand/2
callback. When:accumulate
, demand is accumulated until its mode is set to:forward
viademand/2
. This is useful as a synchronization mechanism, where the demand is accumulated until all consumers are subscribed. Defaults to:forward
.
:producer
and :producer_consumer
options
:buffer_size
- the size of the buffer to store events without demand. Can be:infinity
to signal no limit on the buffer size. Check the “Buffer events” section of the module documentation. Defaults to10_000
for:producer
,:infinity
for:producer_consumer
.:buffer_keep
- returns whether the:first
or:last
entries should be kept on the buffer in case the buffer size is exceeded. Defaults to:last
.:dispatcher
- the dispatcher responsible for handling demands. Defaults toGenStage.DemandDispatch
. May be either an atom representing a dispatcher module or a two-element tuple with the dispatcher module and the dispatcher options.
:consumer
and :producer_consumer
options
:subscribe_to
- a list of producers to subscribe to. Each element represents either the producer module or a tuple with the producer module and the subscription options (as defined insync_subscribe/2
).
Callback implementation for GenStage.init/1
.