Behaviours: gen_fsm.
This module defines the sregulator behaviour.
Required callback functions: init/1.
This module provides a load regulation service. A process joins the queue
waiting to begin work. The level of concurrency is controlled by the sojourn
time of another queue, such as a queue for an sbroker. Usually this
means that the level of concurrency is decreased when a queue is slow (i.e.
workers in excess) and increased when a queue is fast (i.e. workers in
demand).
When the number of active workers is below the minimum concurrency limit,
workers are dequeued until the minimum level is reached. When the number of
active workers is at (or above) the maximum, workers are never dequeued. When
the number of workers is greater than or equal to the minimum and less than
the maximum, a worker can be dequeued based on the sojourn time of a queue in
an sbroker, or similar process.
Before commencing work a worker calls sregulator:ask/1. If the regulator
returns a go tuple, i.e. {go, Ref, RegulatorPid, Relative, SojournTime},
the worker may continue. Ref, the lock reference, and RegulatorPid, the
pid of the regulator, are used for future communication with the regulator.
RelativeTime is undefined if the regulator grants the lock due to the
level of concurrency going below the minimum limit. Otherwise RelativeTime
is the time (in native time units) spent waiting for the regulator's valve
to dequeue the request after discounting time spent waiting for the regulator
to handle requests. SojournTime is the time spent in both the regulator's
message queue and internal queue, in native time units.
The regulator may also return a drop tuple, i.e. {drop, SojournTime}.
This means that work can not begin as a lock was not acquired.
update/4 to report and update the result of a successful
enqueuing attempt against a sbroker (or another sregulator). For example:
{go, Ref, Regulator, _, _} = sregulator:ask(Regulator),
{go, _, _, RelativeTime, _} = sbroker:ask(Broker),
{continue, Ref, _, _} = sregulator:update(Regulator, Ref, RelativeTime, 500).
Or drop/2 in the case of a failure:
{go, Ref, Regulator, _, _} = sregulator:ask(Regulator),
{drop, _} = sbroker:ask(Broker),
case sregulator:drop(Regulator, Ref) of
{dropped, _} -> dropped;
{retry, _} -> retry
end.
If drop/2 returns {dropped, SojournTime} the work lock is lost and the
process should stop working. However if it returns {retry, SojournTime} the
process should continue because dropping it would bring the number of active
workers below the concurrency limit. The regulator will always favour active
workers over queued workers, so a drop request can return a retry tuple
even when workers are queued waiting for a lock.
The lock reference can be released using done(Regulator, Ref) or will be
automatically released when a worker exits.
A regulator requires a callback module. The callback modules implements one
callback, init/1, with single argument Args. init/1 should return
{ok, {QueueSpec, ValveSpec, Interval}) or ignore. QueueSpec is the
queue specification for the queue and Valve is the valve specification for
the queue. Interval is the interval in milliseconds that the queue is
polled. This ensures that the active queue management strategy is applied
even if no processes are enqueued/dequeued. In the case of ignore the
regulator is not started and start_link returns ignore.
A queue specification takes the following form:
{Module, Args, Out, Size, Drop}. Module is the squeue callback module
and Args are its arguments. The queue is created using
squeue:new(Time, Module, Args), where Time is the current time in
native time units. Out defines the method of dequeuing, it is either the
atom out (dequeue items from the head, i.e. FIFO), or the atom out_r
(dequeue items from the tail, i.e. LIFO). Size is the maximum size of the
queue, it is either a non_neg_integer() or infinity. Drop defines the
strategy to take when the maximum size, Size, of the queue is exceeded. It
is either the atom drop (drop from the head of the queue, i.e. head drop)
or drop_r (drop from the tail of the queue, i.e. tail drop).
A valve specification takes the following form:
{Module, Args, Min, Max}. Module is the svalve callback module and
Args are its arguments. The valve is created using
svalve:new(Time, Module, Args), where Time is the current time in
native time units. Min is the minimum desired level of concurrency, a
non_neg_integer(). Max is the maximum desired level of concurrency and is
a non_neg_integer() or infinity. The maximum must be greater than or
equal to the minimum.
For example:
-module(sregulator_example).
-behaviour(sregulator).
-export([start_link/0]).
-export([ask/0]).
-export([done/1]).
-export([update/2]).
-export([init/1]).
start_link() ->
sregulator:start_link({local, ?MODULE}, ?MODULE, []).
ask() ->
sregulator:ask(?MODULE).
done(Ref) ->
sregulator:done(?MODULE, Ref).
update(Ref, RelativeTime) ->
sregulator:update(?MODULE, Ref, RelativeTime, 500).
drop(Ref) ->
sregulator:drop(?MODULE, Ref).
init([]) ->
Timeout = sbroker_time:milli_seconds_to_native(5000),
Target = sbroker_time:milli_seconds_to_native(5),
ValveInternal = sbroker_time:milli_second_to_native(100),
QueueSpec = {squeue_timeout, Timeout, out_r, infinity, drop},
ValveSpec = {svalve_codel_r, {Target, ValveInternal}, 8, 64},
RegInterval = 200,
{ok, {QueueSpec, ValveSpec, RegInterval}}.
name() = {local, atom()} | {global, any()} | {via, module(), any()}
queue_spec() = {module(), any(), out | out_r, non_neg_integer() | infinity, drop | drop_r}
regulator() = pid() | atom() | {atom(), node()} | {global, any()} | {via, module(), any()}
start_return() = {ok, pid()} | {error, any()}
valve_spec() = {module(), any(), non_neg_integer(), non_neg_integer() | infinity}
| ask/1 | Tries to gain a work lock with the regulator. |
| async_ask/1 | Monitors the regulator and sends an asynchronous request to gain a work lock. |
| async_ask/2 | Sends an asynchronous request to gain a work lock with the regulator. |
| async_update/3 | Asynchronously update the regulator, Regulator, with a relative
sojourn time, RelativeTime, in native time units. |
| async_update/4 | Asynchronous update the regulator, Regulator, with a relative sojourn
time, RelativeTime, in native time units. |
| await/2 | Await the response to an asynchronous request identified by Tag. |
| cancel/3 | Cancels an asynchronous request. |
| change_config/2 | Change the configuration of the regulator. |
| done/3 | Release lock, Ref, on regulator Regulator. |
| drop/2 | Signal a drop. |
| ensure_dropped/3 | Signal a drop and release the lock. |
| len/2 | Get the length of the queue in the regulator, Regulator. |
| nb_ask/1 | Tries to gain a work lock with the regulator but does not enqueue the request if a lock is not immediately available. |
| size/2 | Get the number of active process using the regulator, Regulator. |
| start_link/2 | Starts a regulator with callback module Module and argument Args. |
| start_link/3 | Starts a regulator with name Name, callback module Module and
argument Args. |
| update/4 | Update the regulator, Regulator, with a relative sojourn time,
RelativeTime, in native time units. |
ask(Regulator) -> Go | Drop
Tries to gain a work lock with the regulator. Returns
{go, Ref, Pid, RelativeTime, SojournTime} on successfully gaining a lock or
{drop, SojournTime}.
Ref is the lock reference, which is a reference(). Pid is the pid()
of the regulator. RelativeTime is the time (in native time units) spent
waiting for the svalve signal to dequeue the request after discounting time
spent waiting for the regulator to handle requests. If the regulator dequeues
the request due to going below the minimum concurrency limit the
RelativeTime is undefined. SojournTime is the time spent in the message
queue and the internal queue, in native time units.
RelativeTime represents the SojournTime without some of the overhead of
the regulator. The value measures the level of queue congestion in the
without being effected by the load of the regulator.
RelativeTime is an integer, the request was enqueued in the internal
queue awaiting a signal request to dequeue it sent approximately
RelativeTime after this request was sent. Therefore SojournTime minus
RelativeTime is the latency, or overhead, of the regulator in native time
units.
async_ask(Regulator) -> {await, Tag, Pid}
Monitors the regulator and sends an asynchronous request to gain a work
lock. Returns {await, Tag, Process}.
Tag is a monitor reference() that uniquely identifies the reply
containing the result of the request. Process, is the pid (pid()) of the
monitored regulator. To cancel the request call cancel(Process, Tag).
The reply is of the form {Tag, {go, Ref, Pid, RelativeTime, SojournTime}
or {Tag, {drop, SojournTime}}.
Process) using async_ask/2.
See also: async_ask/2, cancel/2.
async_ask(Regulator, Tag) -> {await, Tag, Pid}
Sends an asynchronous request to gain a work lock with the regulator.
Returns {await, Tag, Process}.
Tag is a any() that identifies the reply containing the result of the
request. Process, is the pid (pid()) of the regulator. To cancel all
requests identified by Tag on regulator Process call
cancel(Process, Tag).
The reply is of the form {Tag, {go, Ref, Pid, RelativeTime, SojournTime}
or {Tag, {drop, SojournTime}}.
See also: cancel/2.
async_update(Regulator, Ref, RelativeTime) -> {await, Ref2, Pid}
Asynchronously update the regulator, Regulator, with a relative
sojourn time, RelativeTime, in native time units. Returns
{await, Tag, Pid}, where Tag is a monitor reference of the regulator
and Pid is the pid of the regulator. A reply is sent by the regulator
idenitified by the Tag.
The reply if of the form {Tag, {continue, Ref, Pid, SojournTime}} or
{Tag, {not_found, SojournTime}}. SojournTime is the time between sending
the update request and the regulator sending a reply.
async_update(Regulator, Ref, RelativeTime, Tag) -> {await, Tag, Pid}
Asynchronous update the regulator, Regulator, with a relative sojourn
time, RelativeTime, in native time units. Returns {await, Tag, Pid},
where Pid is the pid of the regulator.
The reply if of the form {Tag, {continue, Ref, Pid, SojournTime}} or
{Tag, {not_found, SojournTime}}. SojournTime is the time between sending
the update request and the regulator sending a reply.
async_update/3 except the regulator is
not monitored and the supplied tag, Tag, is used in the reply. If the
regulator exits there is no guarantee of a reply.
await(Tag, Timeout) -> Go | Drop | Continue | NotFound
Await the response to an asynchronous request identified by Tag.
Exits if a response is not received after Timeout milliseconds.
DOWN message is received with the reference Tag.
See also: async_ask/2, async_ask_r/2.
cancel(Regulator, Tag, Timeout) -> ok | {error, not_found}
Cancels an asynchronous request. Returns the number of cancelled
requests or false if no requests exist. In the later case a caller may wish
to check its message queue for an existing reply.
See also: async_ask/1, async_ask/2.
change_config(Regulator, Timeout) -> ok | {error, Reason}
Change the configuration of the regulator. Returns ok on success and
{error, Reason} on failure, where Reason, is the reason for failure.
init/1 callback to get the new configuration. If
init/1 returns ignore the config does not change.
See also: start_link/2.
done(Regulator, Ref, Timeout) -> ok | {error, not_found}
Release lock, Ref, on regulator Regulator. Returns ok on success
and {error, not_found} if the lock reference does not exist on the
regulator.
See also: ask/1.
drop(Regulator, Ref) -> Response
Signal a drop. Returns ok if the lock is released, {error, retry} if
the lock is maintained and {error, not_found} if the lock does not exist on
the regulator.
drop/2 so that
the concurrency level decreases when the remote server is unavailable.
See also: ask/1.
ensure_dropped(Regulator, Ref, Timeout) -> ok | {error, not_found}
Signal a drop and release the lock. Returns ok if the lock is
released and {error, not_found} if the lock does not exist on the
regulator.
drop/2 the lock is always released if it exists.
len(Regulator, Timeout) -> Length
Get the length of the queue in the regulator, Regulator.
nb_ask(Regulator) -> Go | Retry
Tries to gain a work lock with the regulator but does not enqueue the
request if a lock is not immediately available. Returns
{go, Ref, Pid, undefined, SojournTime} on a successfully gaining a lock or
{retry, SojournTime}.
Ref is the lock reference, which is a reference(). Pid is the pid()
of the regulator. undefined reflects the fact that request is successful
because the regulator is below its minimum concurrency limit. SojournTime
is the time spent in the message queue of the regulator, in native time
units.
See also: ask/1.
size(Regulator, Timeout) -> Size
Get the number of active process using the regulator, Regulator.
start_link(Module, Args) -> StartReturn
Starts a regulator with callback module Module and argument Args.
start_link(Name, Module, Args) -> StartReturn
Starts a regulator with name Name, callback module Module and
argument Args.
See also: start_link/2.
update(Regulator, Ref, RelativeTime, Timeout) -> Continue | NotFound
Update the regulator, Regulator, with a relative sojourn time,
RelativeTime, in native time units. Ref is the lock reference and
Timeout is the time to wait in milliseconds for a reply. Returns
{continue, Ref, Pid, SojournTime} if the lock reference, Ref, exists on
the regulator. Pid is the pid of the regulator and SojournTime is the
time spent waiting for the regulator to handle the request. If the lock
reference, Ref, does not exist returns {not_found, SojournTime}.
RelativeTime, is used as the sojourn time in a
a svalve:sojourn/4 or svalve:sojourn_r/4 call on the valve in the
regulator.
Generated by EDoc, May 1 2015, 11:53:46.