Module sregulator

This module provides a load regulation service.

Behaviours: gen_fsm.

This module defines the sregulator behaviour.
Required callback functions: init/1.

Description

This module provides a load regulation service. A process joins the queue waiting to begin work. The level of concurrency is controlled by the sojourn time of another queue, such as a queue for an sbroker. Usually this means that the level of concurrency is decreased when a queue is slow (i.e. workers in excess) and increased when a queue is fast (i.e. workers in demand).

When the number of active workers is below the minimum concurrency limit, workers are dequeued until the minimum level is reached. When the number of active workers is at (or above) the maximum, workers are never dequeued. When the number of workers is greater than or equal to the minimum and less than the maximum, a worker can be dequeued based on the sojourn time of a queue in an sbroker, or similar process.

Before commencing work a worker calls sregulator:ask/1. If the regulator returns a go tuple, i.e. {go, Ref, RegulatorPid, Relative, SojournTime}, the worker may continue. Ref, the lock reference, and RegulatorPid, the pid of the regulator, are used for future communication with the regulator. RelativeTime is undefined if the regulator grants the lock due to the level of concurrency going below the minimum limit. Otherwise RelativeTime is the time (in native time units) spent waiting for the regulator's valve to dequeue the request after discounting time spent waiting for the regulator to handle requests. SojournTime is the time spent in both the regulator's message queue and internal queue, in native time units.

The regulator may also return a drop tuple, i.e. {drop, SojournTime}. This means that work can not begin as a lock was not acquired.

The worker uses update/4 to report and update the result of a successful enqueuing attempt against a sbroker (or another sregulator). For example:
  {go, Ref, Regulator, _, _} = sregulator:ask(Regulator),
  {go, _, _, RelativeTime, _} = sbroker:ask(Broker),
  {continue, Ref, _, _} = sregulator:update(Regulator, Ref, RelativeTime, 500).
Or drop/2 in the case of a failure:
  {go, Ref, Regulator, _, _} = sregulator:ask(Regulator),
  {drop, _} = sbroker:ask(Broker),
  case sregulator:drop(Regulator, Ref) of
      {dropped, _} -> dropped;
      {retry, _} -> retry
   end.

If drop/2 returns {dropped, SojournTime} the work lock is lost and the process should stop working. However if it returns {retry, SojournTime} the process should continue because dropping it would bring the number of active workers below the concurrency limit. The regulator will always favour active workers over queued workers, so a drop request can return a retry tuple even when workers are queued waiting for a lock.

The lock reference can be released using done(Regulator, Ref) or will be automatically released when a worker exits.

A regulator requires a callback module. The callback modules implements one callback, init/1, with single argument Args. init/1 should return {ok, {QueueSpec, ValveSpec, Interval}) or ignore. QueueSpec is the queue specification for the queue and Valve is the valve specification for the queue. Interval is the interval in milliseconds that the queue is polled. This ensures that the active queue management strategy is applied even if no processes are enqueued/dequeued. In the case of ignore the regulator is not started and start_link returns ignore.

A queue specification takes the following form: {Module, Args, Out, Size, Drop}. Module is the squeue callback module and Args are its arguments. The queue is created using squeue:new(Time, Module, Args), where Time is the current time in native time units. Out defines the method of dequeuing, it is either the atom out (dequeue items from the head, i.e. FIFO), or the atom out_r (dequeue items from the tail, i.e. LIFO). Size is the maximum size of the queue, it is either a non_neg_integer() or infinity. Drop defines the strategy to take when the maximum size, Size, of the queue is exceeded. It is either the atom drop (drop from the head of the queue, i.e. head drop) or drop_r (drop from the tail of the queue, i.e. tail drop).

A valve specification takes the following form: {Module, Args, Min, Max}. Module is the svalve callback module and Args are its arguments. The valve is created using svalve:new(Time, Module, Args), where Time is the current time in native time units. Min is the minimum desired level of concurrency, a non_neg_integer(). Max is the maximum desired level of concurrency and is a non_neg_integer() or infinity. The maximum must be greater than or equal to the minimum.

For example:

  -module(sregulator_example).
 
  -behaviour(sregulator).
 
  -export([start_link/0]).
  -export([ask/0]).
  -export([done/1]).
  -export([update/2]).
  -export([init/1]).
 
  start_link() ->
      sregulator:start_link({local, ?MODULE}, ?MODULE, []).
 
  ask() ->
      sregulator:ask(?MODULE).
 
  done(Ref) ->
      sregulator:done(?MODULE, Ref).
 
  update(Ref, RelativeTime) ->
      sregulator:update(?MODULE, Ref, RelativeTime, 500).
 
  drop(Ref) ->
      sregulator:drop(?MODULE, Ref).
 
  init([]) ->
      Timeout = sbroker_time:milli_seconds_to_native(5000),
      Target = sbroker_time:milli_seconds_to_native(5),
      ValveInternal = sbroker_time:milli_second_to_native(100),
      QueueSpec = {squeue_timeout, Timeout, out_r, infinity, drop},
      ValveSpec = {svalve_codel_r, {Target, ValveInternal}, 8, 64},
      RegInterval = 200,
      {ok, {QueueSpec, ValveSpec, RegInterval}}.

Data Types

name()

name() = {local, atom()} | {global, any()} | {via, module(), any()}

queue_spec()

queue_spec() = {module(), any(), out | out_r, non_neg_integer() | infinity, drop | drop_r}

regulator()

regulator() = pid() | atom() | {atom(), node()} | {global, any()} | {via, module(), any()}

start_return()

start_return() = {ok, pid()} | {error, any()}

valve_spec()

valve_spec() = {module(), any(), non_neg_integer(), non_neg_integer() | infinity}

Function Index

ask/1Tries to gain a work lock with the regulator.
async_ask/1Monitors the regulator and sends an asynchronous request to gain a work lock.
async_ask/2Sends an asynchronous request to gain a work lock with the regulator.
async_update/3Asynchronously update the regulator, Regulator, with a relative sojourn time, RelativeTime, in native time units.
async_update/4Asynchronous update the regulator, Regulator, with a relative sojourn time, RelativeTime, in native time units.
await/2Await the response to an asynchronous request identified by Tag.
cancel/3Cancels an asynchronous request.
change_config/2Change the configuration of the regulator.
done/3Release lock, Ref, on regulator Regulator.
drop/2Signal a drop.
ensure_dropped/3Signal a drop and release the lock.
len/2Get the length of the queue in the regulator, Regulator.
nb_ask/1Tries to gain a work lock with the regulator but does not enqueue the request if a lock is not immediately available.
size/2Get the number of active process using the regulator, Regulator.
start_link/2Starts a regulator with callback module Module and argument Args.
start_link/3Starts a regulator with name Name, callback module Module and argument Args.
update/4Update the regulator, Regulator, with a relative sojourn time, RelativeTime, in native time units.

Function Details

ask/1

ask(Regulator) -> Go | Drop

Tries to gain a work lock with the regulator. Returns {go, Ref, Pid, RelativeTime, SojournTime} on successfully gaining a lock or {drop, SojournTime}.

Ref is the lock reference, which is a reference(). Pid is the pid() of the regulator. RelativeTime is the time (in native time units) spent waiting for the svalve signal to dequeue the request after discounting time spent waiting for the regulator to handle requests. If the regulator dequeues the request due to going below the minimum concurrency limit the RelativeTime is undefined. SojournTime is the time spent in the message queue and the internal queue, in native time units.

RelativeTime represents the SojournTime without some of the overhead of the regulator. The value measures the level of queue congestion in the without being effected by the load of the regulator.

If RelativeTime is an integer, the request was enqueued in the internal queue awaiting a signal request to dequeue it sent approximately RelativeTime after this request was sent. Therefore SojournTime minus RelativeTime is the latency, or overhead, of the regulator in native time units.

async_ask/1

async_ask(Regulator) -> {await, Tag, Pid}

Monitors the regulator and sends an asynchronous request to gain a work lock. Returns {await, Tag, Process}.

Tag is a monitor reference() that uniquely identifies the reply containing the result of the request. Process, is the pid (pid()) of the monitored regulator. To cancel the request call cancel(Process, Tag).

The reply is of the form {Tag, {go, Ref, Pid, RelativeTime, SojournTime} or {Tag, {drop, SojournTime}}.

Multiple asynchronous requests can be made from a single process to a regulator and no guarantee is made of the order of replies. A process making multiple requests can reuse the monitor reference for subsequent requests to the same regulator process (Process) using async_ask/2.

See also: async_ask/2, cancel/2.

async_ask/2

async_ask(Regulator, Tag) -> {await, Tag, Pid}

Sends an asynchronous request to gain a work lock with the regulator. Returns {await, Tag, Process}.

Tag is a any() that identifies the reply containing the result of the request. Process, is the pid (pid()) of the regulator. To cancel all requests identified by Tag on regulator Process call cancel(Process, Tag).

The reply is of the form {Tag, {go, Ref, Pid, RelativeTime, SojournTime} or {Tag, {drop, SojournTime}}.

Multiple asynchronous requests can be made from a single process to a regulator and no guarantee is made of the order of replies. If the regulator exits or is on a disconnected node there is no guarantee of a reply and so the caller should take appropriate steps to handle this scenario.

See also: cancel/2.

async_update/3

async_update(Regulator, Ref, RelativeTime) -> {await, Ref2, Pid}

Asynchronously update the regulator, Regulator, with a relative sojourn time, RelativeTime, in native time units. Returns {await, Tag, Pid}, where Tag is a monitor reference of the regulator and Pid is the pid of the regulator. A reply is sent by the regulator idenitified by the Tag.

The reply if of the form {Tag, {continue, Ref, Pid, SojournTime}} or {Tag, {not_found, SojournTime}}. SojournTime is the time between sending the update request and the regulator sending a reply.

This function is intended to allow back pressure while not blocking the worker. An update request is assumed to occur at the moment a dequeue event occurs. If the regulator is overloaded and has not replied to a previous asynchronous update request a worker should skip updating the regulator rather than delay sending the update.

async_update/4

async_update(Regulator, Ref, RelativeTime, Tag) -> {await, Tag, Pid}

Asynchronous update the regulator, Regulator, with a relative sojourn time, RelativeTime, in native time units. Returns {await, Tag, Pid}, where Pid is the pid of the regulator.

The reply if of the form {Tag, {continue, Ref, Pid, SojournTime}} or {Tag, {not_found, SojournTime}}. SojournTime is the time between sending the update request and the regulator sending a reply.

This function behaves the same as async_update/3 except the regulator is not monitored and the supplied tag, Tag, is used in the reply. If the regulator exits there is no guarantee of a reply.

await/2

await(Tag, Timeout) -> Go | Drop | Continue | NotFound

Await the response to an asynchronous request identified by Tag.

Exits if a response is not received after Timeout milliseconds.

Exits if a DOWN message is received with the reference Tag.

See also: async_ask/2, async_ask_r/2.

cancel/3

cancel(Regulator, Tag, Timeout) -> ok | {error, not_found}

Cancels an asynchronous request. Returns the number of cancelled requests or false if no requests exist. In the later case a caller may wish to check its message queue for an existing reply.

See also: async_ask/1, async_ask/2.

change_config/2

change_config(Regulator, Timeout) -> ok | {error, Reason}

Change the configuration of the regulator. Returns ok on success and {error, Reason} on failure, where Reason, is the reason for failure.

Regulator calls the init/1 callback to get the new configuration. If init/1 returns ignore the config does not change.

See also: start_link/2.

done/3

done(Regulator, Ref, Timeout) -> ok | {error, not_found}

Release lock, Ref, on regulator Regulator. Returns ok on success and {error, not_found} if the lock reference does not exist on the regulator.

See also: ask/1.

drop/2

drop(Regulator, Ref) -> Response

Signal a drop. Returns ok if the lock is released, {error, retry} if the lock is maintained and {error, not_found} if the lock does not exist on the regulator.

This function can be used to signal to the regulator that an event has occured that should shrink the level of concurrency. For example a connection process that fails to connect to a remote server may call drop/2 so that the concurrency level decreases when the remote server is unavailable.

See also: ask/1.

ensure_dropped/3

ensure_dropped(Regulator, Ref, Timeout) -> ok | {error, not_found}

Signal a drop and release the lock. Returns ok if the lock is released and {error, not_found} if the lock does not exist on the regulator.

Unlike drop/2 the lock is always released if it exists.

See also: ask/1, drop/2.

len/2

len(Regulator, Timeout) -> Length

Get the length of the queue in the regulator, Regulator.

nb_ask/1

nb_ask(Regulator) -> Go | Retry

Tries to gain a work lock with the regulator but does not enqueue the request if a lock is not immediately available. Returns {go, Ref, Pid, undefined, SojournTime} on a successfully gaining a lock or {retry, SojournTime}.

Ref is the lock reference, which is a reference(). Pid is the pid() of the regulator. undefined reflects the fact that request is successful because the regulator is below its minimum concurrency limit. SojournTime is the time spent in the message queue of the regulator, in native time units.

See also: ask/1.

size/2

size(Regulator, Timeout) -> Size

Get the number of active process using the regulator, Regulator.

start_link/2

start_link(Module, Args) -> StartReturn

Starts a regulator with callback module Module and argument Args.

start_link/3

start_link(Name, Module, Args) -> StartReturn

Starts a regulator with name Name, callback module Module and argument Args.

See also: start_link/2.

update/4

update(Regulator, Ref, RelativeTime, Timeout) -> Continue | NotFound

Update the regulator, Regulator, with a relative sojourn time, RelativeTime, in native time units. Ref is the lock reference and Timeout is the time to wait in milliseconds for a reply. Returns {continue, Ref, Pid, SojournTime} if the lock reference, Ref, exists on the regulator. Pid is the pid of the regulator and SojournTime is the time spent waiting for the regulator to handle the request. If the lock reference, Ref, does not exist returns {not_found, SojournTime}.

The relative sojourn time, RelativeTime, is used as the sojourn time in a a svalve:sojourn/4 or svalve:sojourn_r/4 call on the valve in the regulator.


Generated by EDoc, May 1 2015, 11:53:46.