This module defines the sbroker behaviour.
Required callback functions: init/1.
This module provides a process match making service. A process joins one of
two queues and is matched with a process in the other queue. The queues are
managed using sbroker_queue callback module per queue so that a different
strategy can be used for both queues. Processes that die while in a queue are
automatically removed to prevent matching with a process that is nolonger
alive. A broker also uses an sbroker_meter callback module to monitor the
queue and processing delays of the broker.
There are two functions to join a queue: ask/1 and ask_r/1. Processes
that call ask/1 are matched against processes that call ask_r/1. If no
match is immediately avaliable a process is queued in the relevant queue
until a match becomes avaliable. If queue management is used processes may be
dropped without a match.
Processes calling ask/1 try to match with/dequeue a process in the ask_r
queue. If no process exists they are queued in the ask queue and await a
process to call ask_r/1.
Similarly processes calling ask_r/1 try to match with/dequeue a process
in the ask queue. If no process exists they are queued in the ask_r queue
and await a process to call ask/1.
A broker requires a callback module. The callback modules implements one
callback, init/1, with single argument Args. init/1 should return
{ok, {AskQueueSpec, AskRQueueSpec, [MeterSpec]}) or ignore.
AskQueueSpec is the queue specification for the ask queue,
AskRQueueSpec is the queue specification for the ask_r queue and
MeterSpec is a meter specification. There can any number of meters but a
meter module can only be included once. In the case of ignore the broker is
not started and start_link returns ignore. As the callback modules are
defined in the init/1 callback a broker supports the dynamic modules
supervisor child specification.
Both queue and meter specifcations take the form: {Module, Args}. Module
is the callback module and Args are its arguments.
For example:
-module(sbroker_example).
-behaviour(sbroker).
-export([start_link/0]).
-export([ask/0]).
-export([ask_r/1]).
-export([init/1]).
start_link() ->
sbroker:start_link({local, ?MODULE}, ?MODULE, [], []).
ask() ->
sbroker:ask(?MODULE).
ask_r() ->
sbroker:ask_r(?MODULE).
init([]) ->
AskQueueSpec = {sbroker_codel_queue, #{}},
AskRQueueSpec = {sbroker_timeout_queue, #{}},
MeterSpec = {sbroker_overload_meter, #{alarm => {overload, ?MODULE}}},
{ok, {AskQueueSpec, AskRQueueSpec, [MeterSpec]}}.
broker() = pid() | atom() | {atom(), node()} | {global, any()} | {via, module(), any()}
debug_option() = trace | log | {log, pos_integer()} | statistics | {log_to_file, file:filename()} | {install, {function(), any()}}
handler_spec() = {module(), any()}
name() = {local, atom()} | {global, any()} | {via, module(), any()}
start_option() = {debug, debug_option()} | {timeout, timeout()} | {spawn_opt, [proc_lib:spawn_option()]} | {read_time_after, non_neg_integer() | infinity}
start_return() = {ok, pid()} | ignore | {error, any()}
| ask/1 | Equivalent to ask(Broker, self()). |
| ask/2 | Send a match request, with value ReqValue, to try to match with a
process calling ask_r/2 on the broker, Broker. |
| ask_r/1 | Equivalent to ask_r(Broker, self()). |
| ask_r/2 | Tries to match with a process calling ask/2 on the same broker. |
| async_ask/1 | Equivalent to async_ask(Broker, self()). |
| async_ask/2 | Monitors the broker and sends an asynchronous request to match with a
process calling ask_r/2. |
| async_ask/3 | Sends an asynchronous request to match with a process calling ask_r/2. |
| async_ask_r/1 | Equivalent to async_ask_r(Broker, self()). |
| async_ask_r/2 | Monitors the broker and sends an asynchronous request to match with a
process calling ask/2. |
| async_ask_r/3 | Sends an asynchronous request to match with a process calling ask/2. |
| await/2 | Await the response to an asynchronous request identified by Tag. |
| cancel/2 | Equivalent to cancel(Broker, Tag, infinity). |
| cancel/3 | Cancels an asynchronous request. |
| change_config/1 | Equivalent to change_config(Broker, infinity). |
| change_config/2 | Change the configuration of the broker. |
| dirty_cancel/2 | Cancels an asynchronous request. |
| dynamic_ask/1 | Equivalent to dynamic_ask(Broker, self()). |
| dynamic_ask/2 | Tries to match with a process calling ask_r/2 on the same broker. |
| dynamic_ask_r/1 | Equivalent to dynamic_ask_r(Broker, self()). |
| dynamic_ask_r/2 | Tries to match with a process calling ask/2 on the same broker. |
| len/1 | Equivalent to len(Broker, infinity). |
| len/2 | Get the length of the ask queue in the broker, Broker. |
| len_r/1 | Equivalent to len_r(Broker, infinity). |
| len_r/2 | Get the length of the ask_r queue in the broker, Broker. |
| nb_ask/1 | Equivalent to nb_ask(Broker, self()). |
| nb_ask/2 | Tries to match with a process calling ask_r/2 on the same broker but
does not enqueue the request if no immediate match. |
| nb_ask_r/1 | Equivalent to nb_ask_r(Broker, self()). |
| nb_ask_r/2 | Tries to match with a process calling ask/2 on the same broker but
does not enqueue the request if no immediate match. |
| start_link/3 | Starts a broker with callback module Module and argument Args, and
broker options Opts. |
| start_link/4 | Starts a broker with name Name, callback module Module and argument
Args, and broker options Opts. |
ask(Broker) -> Go | Drop
Equivalent to ask(Broker, self()).
ask(Broker, ReqValue) -> Go | Drop
Send a match request, with value ReqValue, to try to match with a
process calling ask_r/2 on the broker, Broker.
Returns {go, Ref, Value, RelativeTime, SojournTime} on a successful
match or {drop, SojournTime}.
value of the matched request sent by the counterparty process. RelativeTime
is the approximate time differnece (in the broker's time unit) between when
the request was sent and the matching request was sent. SojournTime is the
approximate time spent in both the broker's message queue and internal queue.
RelativeTime represents the SojournTime without the overhead of the
broker. The value measures the level of queue congestion without being
effected by the load of the broker.
If RelativeTime is positive, the request was enqueued in the internal
queue awaiting a match with another request sent approximately RelativeTime
after this request was sent. Therefore SojournTime minus RelativeTime
is the latency, or overhead, of the broker.
If RelativeTime is negative, the request dequeued a request in the internal
queue that was sent approximately RelativeTime before this request was
sent. Therefore SojournTime is the latency, or overhead, of the broker.
If RelativeTime is 0, the request was matched with a request sent at
approximately the same time. Therefore SojournTime is the latency, or
overhead, of the broker.
The sojourn time for matched process can be approximated by SojournTime
minus RelativeTime.
via module sprotector returns
{drop, 0} and does not send the request.
ask_r(Broker) -> Go | Drop
Equivalent to ask_r(Broker, self()).
ask_r(Broker, ReqValue) -> Go | Drop
Tries to match with a process calling ask/2 on the same broker.
See also: ask/2.
async_ask(Broker) -> {await, Tag, Process} | {drop, 0}
Equivalent to async_ask(Broker, self()).
async_ask(Broker, ReqValue) -> {await, Tag, Process} | {drop, 0}
Monitors the broker and sends an asynchronous request to match with a
process calling ask_r/2. Returns {await, Tag, Pid} or {drop, 0}.
Tag is a monitor reference() that uniquely identifies the reply
containing the result of the request. Process, is the pid() of the
monitored broker or {atom(), node()} if the broker is registered locally
in another node. To cancel the request call cancel(Process, Tag).
The reply is of the form {Tag, {go, Ref, Value, RelativeTime, SojournTime}
or {Tag, {drop, SojournTime}}.
Ref is the transaction reference, which is a reference(). Value is the
value of the matched process. RelativeTime is the time spent waiting for a
match after discounting time spent waiting for the broker to handle requests.
SojournTime is the time spent in the broker's message queue.
Multiple asynchronous requests can be made from a single process to a
broker and no guarantee is made of the order of replies. A process making
multiple requests can reuse the monitor reference for subsequent requests to
the same broker process (Process) using async_ask/3.
via module sprotector returns
{drop, 0} and does not send the request.
See also: async_ask/3, cancel/2.
async_ask(Broker, ReqValue, To) -> {await, Tag, Process} | {drop, 0}
Sends an asynchronous request to match with a process calling ask_r/2.
Returns {await, Tag, Pid}.
To is a tuple containing the process, pid(), to send the reply to and
Tag, any(), that identifies the reply containing the result of the
request. Process is the pid() of the broker or {atom(), node()} if the
broker is registered locally on a different node. To cancel all requests
identified by Tag on broker Process call cancel(Process, Tag).
The reply is of the form {Tag, {go, Ref, Value, RelativeTime, SojournTime}
or {Tag, {drop, SojournTime}}.
Ref is the transaction reference, which is a reference(). Value is the
value of the matched process. RelativeTime is the time spent waiting for a
match after discounting time spent waiting for the broker to handle requests.
SojournTime is the time spent in the broker's message queue.
Multiple asynchronous requests can be made from a single process to a broker and no guarantee is made of the order of replies. If the broker exits or is on a disconnected node there is no guarantee of a reply and so the caller should take appropriate steps to handle this scenario.
If the request is dropped when usingvia module sprotector, returns
{drop, 0} and does not send the request.
See also: cancel/2.
async_ask_r(Broker) -> {await, Tag, Process} | {drop, 0}
Equivalent to async_ask_r(Broker, self()).
async_ask_r(Broker, ReqValue) -> {await, Tag, Process} | {drop, 0}
Monitors the broker and sends an asynchronous request to match with a
process calling ask/2.
See also: async_ask/2, cancel/2.
async_ask_r(Broker, ReqValue, To) -> {await, Tag, Process} | {drop, 0}
Sends an asynchronous request to match with a process calling ask/2.
See also: async_ask/3, cancel/2.
await(Tag, Timeout) -> Go | Drop
Await the response to an asynchronous request identified by Tag.
Exits if a response is not received after Timeout milliseconds.
DOWN message is received with the reference Tag.
See also: async_ask/2, async_ask_r/2.
cancel(Broker, Tag) -> Count | false
Equivalent to cancel(Broker, Tag, infinity).
cancel(Broker, Tag, Timeout) -> Count | false
Cancels an asynchronous request. Returns the number of cancelled
requests or false if no requests exist. In the later case a caller may wish
to check its message queue for an existing reply.
See also: async_ask/1, async_ask_r/1.
change_config(Broker) -> ok | {error, Reason}
Equivalent to change_config(Broker, infinity).
change_config(Broker, Timeout) -> ok | {error, Reason}
Change the configuration of the broker. Returns ok on success and
{error, Reason} on failure, where Reason, is the reason for failure.
init/1 callback to get the new configuration. If init/1
returns ignore the config does not change.
dirty_cancel(Broker, Tag) -> ok
Cancels an asynchronous request.
Returnsok without waiting for the broker to cancel requests.
See also: cancel/3.
dynamic_ask(Broker) -> Go | Await | Drop
Equivalent to dynamic_ask(Broker, self()).
dynamic_ask(Broker, ReqValue) -> Go | Await | Drop
Tries to match with a process calling ask_r/2 on the same broker. If
no immediate match available the request is converted to an async_ask/2.
Returns {go, Ref, Value, RelativeTime, SojournTime} on a successful match
or {await, Tag, BrokerPid}.
Ref is the transaction reference, which is a reference(). Value is the
value of the matched process. RelativeTime is the time spent waiting for a
match after discounting time spent waiting for the broker to handle requests.
SojournTime is the time spent in the broker's message queue. Tag is a
monitor reference and BrokerPid the pid() of the broker, as returned by
async_ask/2.
via module sprotector returns
{drop, 0} and does not send the request.
See also: async_ask/2, nb_ask/2.
dynamic_ask_r(Broker) -> Go | Await | Drop
Equivalent to dynamic_ask_r(Broker, self()).
dynamic_ask_r(Broker, ReqValue) -> Go | Await | Drop
Tries to match with a process calling ask/2 on the same broker. If
no immediate match available the request is converted to an async_ask_r/2.
See also: dynamic_ask/2.
len(Broker) -> Length
Equivalent to len(Broker, infinity).
len(Broker, Timeout) -> Length
Get the length of the ask queue in the broker, Broker.
len_r(Broker) -> Length
Equivalent to len_r(Broker, infinity).
len_r(Broker, Timeout) -> Length
Get the length of the ask_r queue in the broker, Broker.
nb_ask(Broker) -> Go | Drop
Equivalent to nb_ask(Broker, self()).
nb_ask(Broker, ReqValue) -> Go | Drop
Tries to match with a process calling ask_r/2 on the same broker but
does not enqueue the request if no immediate match. Returns
{go, Ref, Value, RelativeTime, SojournTime} on a successful match or
{drop, SojournTime}.
Ref is the transaction reference, which is a reference(). Value is the
value of the matched process. RelativeTime is the time spent waiting for a
match after discounting time spent waiting for the broker to handle requests.
SojournTime is the time spent in the broker's message queue.
via module sprotector returns
{drop, 0} and does not send the request.
See also: ask/2.
nb_ask_r(Broker) -> Go | Drop
Equivalent to nb_ask_r(Broker, self()).
nb_ask_r(Broker, ReqValue) -> Go | Drop
Tries to match with a process calling ask/2 on the same broker but
does not enqueue the request if no immediate match.
See also: nb_ask/2.
start_link(Module, Args, Opts) -> StartReturn
Starts a broker with callback module Module and argument Args, and
broker options Opts.
Opts is a proplist and supports debug, timeout and spawn_opt used
by gen_server and gen_fsm. read_time_after sets the number of requests
when a cached time is stale and the time is read again. Its value is
non_neg_integer() or infinity and defaults to 16.
See also: gen_server:start_link/3.
start_link(Name, Module, Args, Opts) -> StartReturn
Starts a broker with name Name, callback module Module and argument
Args, and broker options Opts.
See also: start_link/3.
Generated by EDoc, Oct 10 2016, 18:50:59.