Behaviours: gen_fsm.
This module defines the sbroker behaviour.
Required callback functions: init/1.
This module provides a process match making service. A process joins one of
two queues and is matcheid with a process in the other queue. The queues can
be actively managed using an squeue callback module, and passively managed
using head or tail drop. A different strategy can be used for both queues.
Processes that die while in a queue are automatically removed to prevent
matching with a process that is nolonger alive.
There are two functions to join a queue: ask/1 and ask_r/1. Processes
that call ask/1 are matched against processes that call ask_r/1. If no
match is immediately avaliable a process is queued in the relevant queue
until a match becomes avaliable. If queue management is used processes may be
dropped without a match.
Processes calling ask/1 try to match with/dequeue a process in the ask_r
queue. If no process exists they are queued in the ask queue and await a
process to call ask_r/1.
Similarly processes calling ask_r/1 try to match with/dequeue a process
in the ask queue. If no process exists they are queued in the ask_r queue
and await a process to call ask/1.
A broker requires a callback module. The callback modules implements one
callback, init/1, with single argument Args. init/1 should return
{ok, {AskQueueSpec, AskRQueueSpec, Interval}) or ignore. AskQueuSpec is
the queue specification for the ask queue and AskRQueueSpec is the queue
specification for the ask_r queue. Interval is the interval in
milliseconds that the active queue is polled. This ensures that the active
queue management strategy is applied even if no processes are
enqueued/dequeued. In the case of ignore the broker is not started and
start_link returns ignore.
A queue specifcation takes the following form:
{Module, Args, Out, Size, Drop}. Module is the squeue callback module
and Args are its arguments. The queue is created using
squeue:new(Time, Module, Args), where Time is chte current time in
native time units. Out defines the method of dequeuing, it is either the
atom out (dequeue items from the head, i.e. FIFO), or the atom out_r
(dequeue items from the tail, i.e. LIFO). Size is the maximum size of the
queue, it is either a non_neg_integer() or infinity. Drop defines the
strategy to take when the maximum size, Size, of the queue is exceeded. It
is either the atom drop (drop from the head of the queue, i.e. head drop)
or drop_r (drop from the tail of the queue, i.e. tail drop).
For example:
-module(sbroker_example).
-behaviour(sbroker).
-export([start_link/0]).
-export([ask/0]).
-export([ask_r/1]).
-export([init/1]).
start_link() ->
sbroker:start_link({local, ?MODULE}, ?MODULE, []).
ask() ->
sbroker:ask(?MODULE).
ask_r() ->
sbroker:ask_r(?MODULE).
init([]) ->
AskQueueSpec = {squeue_codel, {5, 100}, out, 64, drop},
AskRQueueSpec = {squeue_timeout, 5000, out_r, infinity, drop},
Interval = 200,
{ok, {AskQueueSpec, AskRQueueSpec, Interval}}.
broker() = pid() | atom() | {atom(), node()} | {global, any()} | {via, module(), any()}
name() = {local, atom()} | {global, any()} | {via, module(), any()}
queue_spec() = {module(), any(), out | out_r, non_neg_integer() | infinity, drop | drop_r}
start_return() = {ok, pid()} | ignore | {error, any()}
| ask/1 | Tries to match with a process calling ask_r/1 on the same broker. |
| ask_r/1 | Tries to match with a process calling ask/1 on the same broker. |
| async_ask/1 | Monitors the broker and sends an asynchronous request to match with a
process calling ask_r/1. |
| async_ask/2 | Sends an asynchronous request to match with a process calling ask_r/1. |
| async_ask_r/1 | Monitors the broker and sends an asynchronous request to match with a
process calling ask/1. |
| async_ask_r/2 | Sends an asynchronous request to match with a process calling ask/1. |
| await/2 | Await the response to an asynchronous request identified by Tag. |
| cancel/3 | Cancels an asynchronous request. |
| change_config/2 | Change the configuration of the broker. |
| len/2 | Get the length of the ask queue in the broker, Broker. |
| len_r/2 | Get the length of the ask_r queue in the broker, Broker. |
| nb_ask/1 | Tries to match with a process calling ask_r/1 on the same broker but
does not enqueue the request if no immediate match. |
| nb_ask_r/1 | Tries to match with a process calling ask/1 on the same broker but
does not enqueue the request if no immediate match. |
| start_link/2 | Starts a broker with callback module Module and argument Args. |
| start_link/3 | Starts a broker with name Name, callback module Module and argument
Args. |
ask(Broker) -> Go | Drop
Tries to match with a process calling ask_r/1 on the same broker.
Returns {go, Ref, Pid, RelativeTime, SojournTime} on a successful match
or {drop, SojournTime}.
Ref is the transaction reference, which is a reference(). Pid is the
matched process. RelativeTime is the time (in native time units) spent
waiting for a match after discounting time spent waiting for the broker to
handle requests. SojournTime is the time spent in both the broker's message
queue and internal queue, in native time units.
RelativeTime represents the SojournTime without the overhead of the
broker. The value measures the level of queue congestion without being
effected by the load of the broker.
If RelativeTime is positive, the request was enqueued in the internal
queue awaiting a match with another request sent approximately RelativeTime
after this request was sent. Therefore SojournTime minus RelativeTime
is the latency, or overhead, of the broker in native time units.
If RelativeTime is negative, the request dequeued a request in the internal
queue that was sent approximately RelativeTime before this request was
sent. Therefore SojournTime is the latency, or overhead, of the broker in
native time units.
If RelativeTime is 0, the request was matched with a request sent at
approximately the same time. Therefore SojournTime is the latency, or
overhead, of the broker in native time units.
Pid (in native time units) can be approximated by
SojournTime minus RelativeTime.
ask_r(Broker) -> Go | Drop
Tries to match with a process calling ask/1 on the same broker.
See also: ask/1.
async_ask(Broker) -> {await, Tag, Pid}
Monitors the broker and sends an asynchronous request to match with a
process calling ask_r/1. Returns {await, Tag, Pid}.
Tag is a monitor reference() that uniquely identifies the reply
containing the result of the request. Pid, is the pid (pid()) of the
monitored broker. To cancel the request call cancel(Pid, Tag).
The reply is of the form {Tag, {go, Ref, Pid, RelativeTime, SojournTime}
or {Tag, {drop, SojournTime}}.
Process) using async_ask/2.
See also: async_ask/2, cancel/2.
async_ask(Broker, Tag) -> {await, Tag, Pid}
Sends an asynchronous request to match with a process calling ask_r/1.
Returns {await, Tag, Pid}.
Tag is a any() that identifies the reply containing the result of the
request. Pid, is the pid (pid()) of the broker. To cancel all requests
identified by Tag on broker Pid call cancel(Pid, Tag).
The reply is of the form {Tag, {go, Ref, Pid, RelativeTime, SojournTime} or
{Tag, {drop, SojournTime}}.
See also: cancel/2.
async_ask_r(Broker) -> {await, Tag, Pid}
Monitors the broker and sends an asynchronous request to match with a
process calling ask/1.
See also: async_ask/1, cancel/2.
async_ask_r(Broker, Tag) -> {await, Tag, Pid}
Sends an asynchronous request to match with a process calling ask/1.
See also: async_ask/2, cancel/2.
await(Tag, Timeout) -> Go | Drop
Await the response to an asynchronous request identified by Tag.
Exits if a response is not received after Timeout milliseconds.
DOWN message is received with the reference Tag.
See also: async_ask/2, async_ask_r/2.
cancel(Broker, Tag, Timeout) -> Count | false
Cancels an asynchronous request. Returns the number of cancelled
requests or false if no requests exist. In the later case a caller may wish
to check its message queue for an existing reply.
See also: async_ask/1, async_ask_r/1.
change_config(Broker, Timeout) -> ok | {error, Reason}
Change the configuration of the broker. Returns ok on success and
{error, Reason} on failure, where Reason, is the reason for failure.
init/1 callback to get the new configuration. If init/1
returns ignore the config does not change.
len(Broker, Timeout) -> Length
Get the length of the ask queue in the broker, Broker.
len_r(Broker, Timeout) -> Length
Get the length of the ask_r queue in the broker, Broker.
nb_ask(Broker) -> Go | Retry
Tries to match with a process calling ask_r/1 on the same broker but
does not enqueue the request if no immediate match. Returns
{go, Ref, Pid, RelativeTime, SojournTime} on a successful match or
{retry, SojournTime}.
Ref is the transaction reference, which is a reference(). Pid is the
matched process. RelativeTime is the time (in native time units) spent
waiting for a match after discounting time spent waiting for the broker to
handle requests. SojournTime is the time spent in the broker's message
queue in native time units.
See also: ask/1.
nb_ask_r(Broker) -> Go | Retry
Tries to match with a process calling ask/1 on the same broker but
does not enqueue the request if no immediate match.
See also: nb_ask/1.
start_link(Module, Args) -> StartReturn
Starts a broker with callback module Module and argument Args.
start_link(Name, Module, Args) -> StartReturn
Starts a broker with name Name, callback module Module and argument
Args.
Generated by EDoc, May 1 2015, 11:53:46.