sbroker and sregulator.
Behaviours: sbroker_handlers.
This module defines the sbroker_queue behaviour.
Required callback functions: init/3, handle_in/5, handle_out/2, handle_timeout/2, handle_cancel/3, handle_info/3, code_change/4, config_change/3, len/1, send_time/1, terminate/2.
Behaviour for implementing queues for sbroker and sregulator.
sbroker_queue behaviour. The first
callback is init/3, which starts the queue:
-callback init(InternalQueue :: internal_queue(), Time :: integer(),
Args :: any()) ->
{State :: any(), TimeoutTime :: integer() | infinity}.
InternalQueue is the internal queue of requests, it is a queue:queue()
with items of the form {SendTime, From, Value, Reference}. SendTime is
the approximate time the request was sent in native time units and is
always less than or equal to Time.From is the a 2-tuple containing the
senders pid and a response tag. SendTime and From can be used with
drop/3 to drop a request. Value is any term, Reference is the monitor
reference of the sender.
Time is the time, in native time units, of the queue at creation. Some
other callbacks will receive the current time of the queue as the second last
argument. It is monotically increasing, so subsequent calls will have the
same or a greater time.
Args is the arguments for the queue. It can be any term.
State is the state of the queue and used in the next call.
TimeoutTime represents the next time a queue wishes to call
handle_timeout/2 to drop items. If a message is not received the timeout
should occur at or after TimeoutTime. The time must be greater than or
equal to Time. If a queue does not require a timeout then TimeoutTime
should be infinity. The value may be ignored or unavailable in other
callbacks if the queue is empty.
handle_in/6:
-callback handle_in(SendTime :: integer(),
From :: {Sender :: pid(), Tag :: any()}, Value :: any(),
Time :: integer(), State :: any()) ->
{NState :: any(), TimeoutTime :: integer() | infinity}.
The variables are equivalent to those in init/3, with NState being the
new state.
handle_out/2:
-callback handle_out(Time :: integer(), State :: any()) ->
{SendTime :: integer(), From :: {Sender :: pid(), Tag :: any()},
Value:: any(), Ref :: reference, NState :: any(),
TimeoutTime :: integer() | infinity} |
{empty, NState :: any()}.
The variables are equivalent to those in init/3, with NState being the
new state. This callback either returns a single request, added in the
InternalQueue from init/3 or enqueued with handle_in/6. If the queue is
empty an empty tuple is returned.
handle_timeout/2:
-callback handle_timeout(Time :: integer(), State :: any()) ->
{NState :: any(), TimeoutTime :: integer() | infinity}.
The variables are equivalent to those in init/3, with NState being the
new state.
handle_cancel/3:
-callback handle_cancel(Tag :: any(), Time :: integer(), State :: any()) ->
{Reply :: false | pos_integer(), NState :: any(),
TimeoutTime :: integer() | infinity}.
Tag is a response tag, which is part of the From tuple passed via
InternalQueue in init/3 or directly in handle_in/6. There may be
multiple requests with the same tag and all should be removed.
If no requests are cancelled the Reply is false, otherwise it is the
number of cancelled requests.
The other variables are equivalent to those in init/3, with NState being
the new state.
handle_info/3:
-callback handle_info(Msg :: any(), Time :: integer(), State :: any()) ->
{NState :: any(), TimeoutTime :: integer() | infinity}.
Msg is the message, and may be intended for another queue.
The other variables are equivalent to those in init/3, with NState being
the new state.
code_change/4:
-callback code_change(OldVsn :: any(), Time :: integer(), State :: any(),
Extra :: any()) ->
{NState :: any(), TimeoutTime :: integer() | infinity}.
On an upgrade OldVsn is version the state was created with and on an
downgrade is the same form except {down, OldVsn}. OldVsn is defined by
the vsn attribute(s) of the old version of the callback module. If no such
attribute is defined, the version is the checksum of the BEAM file. Extra
is from {advanced, Extra} in the update instructions.
The other variables are equivalent to those in init/3, with NState being
the new state.
config_change/4:
-callback config_change(Args :: any(), Time :: integer(), State :: any()) ->
{NState :: any(), TimeoutTime :: integer() | infinity}.
The variables are equivalent to those in init/3, with NState being the
new state.
len/1:
-callback len(State :: any()) -> Len :: non_neg_integer().
State is the current state of the queue and Len is the number of queued
requests. This callback must be idempotent and so not drop any requests.
send_time/1:
-callback send_time(State :: any()) -> SendTime :: integer() | empty.
State is the current state of the queue and SendTime is the send time of
the oldest request, if not requests then empty. This callback must be
idempotent and so not drop any requests.
terminate/2:
-callback terminate(Reason :: sbroker_handlers:reason(), State :: any()) ->
InternalQueue :: internal_queue().
Reason is stop if the queue is being shutdown, change if the queue is
being replaced by another queue, {bad_return_value, Return} if a previous
callback returned an invalid term or {Class, Reason, Stack} if a previous
callback raised an exception.
State is the current state of the queue.
InternalQueue is the same as init/3 and is passed to the next queue if
Reason is change.
terminate/2 should do any clean up required.
internal_queue() = queue:queue({integer(), {pid(), any()}, any(), reference()})
| drop/3 | Drop a request from From, sent at SendTime from the queue. |
drop(From, SendTime, Time) -> ok
Drop a request from From, sent at SendTime from the queue.
drop/3 when dropping a request from a queue.
Generated by EDoc