sregulator behaviour (sbroker v1.1.1)
This module provides a job regulator for controlling the level of concurrency of processes carrying out a task. A process requests permission to run and is queued until it is allowed to begin. Once the task is complete the process informs the regulator that is is done. Alternatively the process can ask if it can continue running and gains priority over any queued processes. The queue is managed using an sbroker_queue callback module, and the level of concurrency by an sregulator_valve callback module. The message queue delay and processing delay are monitorred by an sbroker_meter.
The main function to ask to begin is ask/1, which blocks until the request is accepted or the queue drops the request. done/3 is then called after the task has finished or continue/2 to continue.
A regulator requires a callback module to be configured, in a similar way to a supervisor's children are specified. The callback modules implements one callback, init/1, with single argument Args. init/1 should return {ok, {QueueSpec, ValveSpec, [MeterSpec]}} or ignore. QueueSpec is the sbroker_queue specification, ValveSpec is the sregulator_valve specification and MeterSpec is a sbroker_meter specification. There can be any number of meters but a meter module can only be included once. All three take the same format: {Module, Args}, where Module is the callback module and Args the arguments term for the module. In the case of ignore the regulator is not started and start_link returns ignore. As the callback modules are defined in the init/1 callback a regulator supports the dynamic modules supervisor child specification.
For example:
-module(sregulator_example).
-behaviour(sregulator).
-export([start_link/0]).
-export([ask/0]).
-export([continue/1]).
-export([done/1]).
-export([init/1]).
start_link() ->
sregulator:start_link({local, ?MODULE}, ?MODULE, [], []).
ask() ->
case sregulator:ask(?MODULE) of
{go, Ref, _, _, _} -> {ok, Ref};
{drop, _} -> {error, dropped}
end.
continue(Ref) ->
case sregulator:continue(?MODULE, Ref) of
{go, Ref, _, _, _} -> {ok, Ref};
{done, _} -> {error, dropped};
{not_found, _} -> {error, not_found}
end.
done(Ref) ->
sregulator:done(?MODULE, Ref).
init([]) ->
QueueSpec = {sbroker_codel_queue, #{}},
ValveSpec = {sregulator_open_valve, #{}},
MeterSpec = {sbroker_overload_meter, #{alarm => {overload, ?MODULE}}},
{ok, {QueueSpec, ValveSpec, [MeterSpec]}}.
Link to this section Summary
Functions
Send a run request to the regulator, Regulator.
Monitor the regulator and send an asynchronous run request. Returns {await, Tag, Process}.
Send an asynchronous run request using tag, Tag. Returns {await, Tag, Process}.
Await the response to an asynchronous request idenitifed by Tag.
Equivalent to cancel(Regulator, Tag, infinity).
Cancel an asynchronous request.
Update the valve in the regulator without waiting for the regulator to handle the update.
Equivalent to change_config(Regulator, infinity).
Change the configuration of the regulator. Returns ok on success and {error, Reason} on failure, where Reason is the reason for failure.
Send a request to continue running using an existing lock reference, Ref. The request is not queued.
Cancels an asynchronous request.
Asynchronously inform the regulator the process has finished running and should release the lock, Ref.
Equivalent to done(Regulator, Ref, infinity).
Inform the regulator the process has finished running and release the lock, Ref.
Send a run request to the regulator, Regulator. If not immediately allowed to run the request is converted to an async_ask/1.
Equivalent to len(Regulator, infinity).
Regulator.Send a run request to the regulator, Regulator, but do not enqueue the request if not immediately allowed to run.
Equivalent to size(Regulator, infinity).
Regulator.Starts a regulator with callback module Module and argument Args, and regulator options Opts.
Name, callback module Module and argument Args, and regulator options Opts.See also: start_link/3.
Equivalent to update(Regulator, Value, infinity).
Synchronously update the valve in the regulator.
Link to this section Types
debug_option/0
Specs
debug_option() ::
trace | log |
{log, pos_integer()} |
statistics |
{log_to_file, file:filename()} |
{install, {fun(), any()}}.
handler_spec/0
Specs
handler_spec() :: {module(), any()}.
name/0
Specs
name() :: {local, atom()} | {global, any()} | {via, module(), any()}.
regulator/0
Specs
regulator() :: pid() | atom() | {atom(), node()} | {global, any()} | {via, module(), any()}.
start_option/0
Specs
start_option() ::
{debug, debug_option()} |
{timeout, timeout()} |
{spawn_opt, [proc_lib:spawn_option()]} |
{read_time_after, non_neg_integer() | infinity}.
start_return/0
Specs
start_return() :: {ok, pid()} | ignore | {error, any()}.
Link to this section Callbacks
init/1
Specs
init(Args :: any()) ->
{ok,
{QueueSpec :: handler_spec(), ValveSpec :: handler_spec(), [MeterSpec :: handler_spec()]}} |
ignore.
Link to this section Functions
ask(Regulator)
Specs
ask(Regulator) -> Go | Drop
when
Regulator :: regulator(),
Go :: {go, Ref, Pid, RelativeTime, SojournTime},
Ref :: reference(),
Pid :: pid(),
RelativeTime :: integer(),
SojournTime :: non_neg_integer(),
Drop :: {drop, SojournTime}.
Send a run request to the regulator, Regulator.
Returns {go, Ref, RegulatorPid, RelativeTIme, SojournTime} on successfully being allowed to run or {drop, SojournTime}.
Ref is the lock reference, which is a reference(). RegulatorPid is the pid() of the regulator process. RelativeTime is the time difference between when the request was sent and the message that opened the regulator's valve was sent. SojournTime is the approximate time spent in both the regulator's message queue and internal queue.
RelativeTime represents the SojournTime without the overhead of the regulator. The value measures the queue congestion without being effected by the load of the regulator or node.
If RelativeTime is positive, the request was enqueued in the internal queue awaiting a message to open the value that was sent approximately RelativeTime ater this request was sent. Therefore SojournTime minus RelativeTime is the latency, or overhead, of the regulator.
If RelativeTime is negative, the regulator's valve was opened by a message sent abs(RelativeTime) before this request. Therefore SojournTime is the latency, or overhead, of the regulator.
RelativeTime is 0, the request was sent at approximately the same as the message that open the regulator's valve.
async_ask(Regulator)
Specs
async_ask(Regulator) -> {await, Tag, Process} | {drop, 0}
when
Regulator :: regulator(), Tag :: reference(), Process :: pid() | {atom(), node()}.
Monitor the regulator and send an asynchronous run request. Returns {await, Tag, Process}.
Tag is a monitor reference() that uniquely identifies the reply containing the result of the request. Process is the pid() of the regulator or {atom(), node()} if the regulator is registered locally on a different node.
The reply is of the form {Tag, Msg} where Msg is either {go, Ref, RegulatorPid, RelativeTime, SojournTime} or {drop, SojournTime}.
Ref is the lock reference, which is a reference(). RegulatorPid is the pid() of the regulator process. RelativeTime is the time difference between when the request was sent and the message that opened the regulator's valve was sent. SojournTime is the approximate time spent in both the regulator's message queue and internal queue.
Multiple asynchronous requests can be made from a single process to a regulator and no guarantee is made of the order of replies. A process making multiple requests can reuse the monitor reference for subsequent requests to the same regulator process (Process) using async_ask/2.
via module sprotector returns {drop, 0} and does not send the request.See also: async_ask/2, cancel/2.
async_ask(Regulator, To)
Specs
async_ask(Regulator, To) -> {await, Tag, Process} | {drop, 0}
when
Regulator :: regulator(),
To :: {Pid, Tag},
Pid :: pid(),
Tag :: any(),
Process :: pid() | {atom(), node()}.
Send an asynchronous run request using tag, Tag. Returns {await, Tag, Process}.
To is a tuple containing the process, pid(), to send the reply to and Tag, any(), that idenitifes the reply containing the result of the request. Process is the pid() of the regulator or {atom(), node()} if the regulator is registered locally on a different node.
async_ask/1.See also: async_ask/1, cancel/2.
await(Tag, Timeout)
Specs
await(Tag, Timeout) -> Go | Drop
when
Tag :: any(),
Timeout :: timeout(),
Go :: {go, Ref, Value, RelativeTime, SojournTime},
Ref :: reference(),
Value :: any(),
RelativeTime :: integer(),
SojournTime :: non_neg_integer(),
Drop :: {drop, SojournTime}.
Await the response to an asynchronous request idenitifed by Tag.
Exits if a response is not received after Timeout milliseconds.
DOWN message is received with reference Tag.See also: async_ask/1, async_ask/2.
cancel(Regulator, Tag)
Specs
cancel(Regulator, Tag) -> Count | false
when Regulator :: regulator(), Tag :: any(), Count :: pos_integer().
Equivalent to cancel(Regulator, Tag, infinity).
cancel(Regulator, Tag, Timeout)
Specs
cancel(Regulator, Tag, Timeout) -> Count | false
when
Regulator :: regulator(), Tag :: any(), Timeout :: timeout(), Count :: pos_integer().
Cancel an asynchronous request.
Returns the number of cancelled requests orfalse if no requests exist with tag Tag. In the later case a caller may wish to check is message queue for an existing reply.See also: async_ask/1, async_ask/2.
cast(Regulator, Value)
Specs
cast(Regulator, Value) -> ok when Regulator :: regulator(), Value :: integer().
Update the valve in the regulator without waiting for the regulator to handle the update.
Value is an integer().
ok.
change_config(Regulator)
Specs
change_config(Regulator) -> ok | {error, Reason} when Regulator :: regulator(), Reason :: any().
Equivalent to change_config(Regulator, infinity).
change_config(Regulator, Timeout)
Specs
change_config(Regulator, Timeout) -> ok | {error, Reason}
when Regulator :: regulator(), Timeout :: timeout(), Reason :: any().
Change the configuration of the regulator. Returns ok on success and {error, Reason} on failure, where Reason is the reason for failure.
init/1 callback to get the new configuration. If init/1 returns ignore the config does not change.
continue(Regulator, Ref)
Specs
continue(Regulator, Ref) -> Go | Stop | NotFound | Drop
when
Regulator :: regulator(),
Ref :: reference(),
Go :: {go, Ref, Pid, RelativeTime, SojournTime},
Pid :: pid(),
RelativeTime :: integer(),
SojournTime :: non_neg_integer(),
Stop :: {stop, SojournTime},
NotFound :: {not_found, SojournTime},
Drop :: {drop, 0}.
Send a request to continue running using an existing lock reference, Ref. The request is not queued.
Returns {go, Ref, RegulatorPid, RelativeTime, SojournTime} on successfully being allowed to run, {stop, SojournTime} when the process should stop running or {not_found, SojournTime} when the lock reference does not exist on the regulator.
Ref is the lock reference, which is a reference(). RegulatorPid is the pid() of the regulator process. RelativeTime is the time difference between when the request was sent and the message that opened the regulator's valve was sent. SojournTime is the approximate time spent in the regulator's message queue.
via module sprotector returns {drop, 0} and does not send the request. In this situation the Ref is still a valid lock on the regulator.See also: ask/1.
dirty_cancel(Regulator, Tag)
Specs
dirty_cancel(Regulator, Tag) -> ok when Regulator :: regulator(), Tag :: any().
Cancels an asynchronous request.
Returnsok without waiting for the regulator to cancel requests.See also: cancel/3.
dirty_done(Regulator, Ref)
Specs
dirty_done(Regulator, Ref) -> ok when Regulator :: regulator(), Ref :: reference().
Asynchronously inform the regulator the process has finished running and should release the lock, Ref.
ok without waiting for the regulator to release the lock.See also: done/3.
done(Regulator, Ref)
Specs
done(Regulator, Ref) -> Stop | NotFound
when
Regulator :: regulator(),
Ref :: reference(),
Stop :: {stop, SojournTime},
SojournTime :: non_neg_integer(),
NotFound :: {not_found, SojournTime}.
Equivalent to done(Regulator, Ref, infinity).
done(Regulator, Ref, Timeout)
Specs
done(Regulator, Ref, Timeout) -> Stop | NotFound
when
Regulator :: regulator(),
Ref :: reference(),
Timeout :: timeout(),
Stop :: {stop, SojournTime},
SojournTime :: non_neg_integer(),
NotFound :: {not_found, SojournTime}.
Inform the regulator the process has finished running and release the lock, Ref.
Returns {stop, SojournTime} if the regulator acknowledged the process has stopped running or {not_found, SojournTime} if the lock reference, Ref, does not exist on the regulator.
SojournTime is the time the request spent in the regulator's message queue.See also: ask/1.
dynamic_ask(Regulator)
Specs
dynamic_ask(Regulator) -> Go | Await | Drop
when
Regulator :: regulator(),
Go :: {go, Ref, Pid, RelativeTime, SojournTime},
Ref :: reference(),
Pid :: pid(),
RelativeTime :: 0 | neg_integer(),
SojournTime :: non_neg_integer(),
Await :: {await, Tag, Pid},
Tag :: reference(),
Drop :: {drop, 0}.
Send a run request to the regulator, Regulator. If not immediately allowed to run the request is converted to an async_ask/1.
Returns {go, Ref, RegulatorPid, RelativeTime, SojournTime} on successfully being allowed to run or {await, Tag, RegulatorPid}.
Ref is the lock reference, which is a reference(). RegulatorPid is the pid() of the regulator process. RelativeTime is the time difference between when the request was sent and the message that opened the regulator's valve was sent. SojournTime is the approximate time spent in the regulator's message queue. Tag is a monitor reference, as returned by async_ask/1.
via module sprotector returns {drop, 0} and does not send the request.See also: async_ask/1, nb_ask/1.
len(Regulator)
Specs
len(Regulator) -> Length when Regulator :: regulator(), Length :: non_neg_integer().
Equivalent to len(Regulator, infinity).
len(Regulator, Timeout)
Specs
len(Regulator, Timeout) -> Length
when Regulator :: regulator(), Timeout :: timeout(), Length :: non_neg_integer().
Regulator.
nb_ask(Regulator)
Specs
nb_ask(Regulator) -> Go | Drop
when
Regulator :: regulator(),
Go :: {go, Ref, Pid, RelativeTime, SojournTime},
Ref :: reference(),
Pid :: pid(),
RelativeTime :: 0 | neg_integer(),
SojournTime :: non_neg_integer(),
Drop :: {drop, SojournTime}.
Send a run request to the regulator, Regulator, but do not enqueue the request if not immediately allowed to run.
Returns {go, Ref, RegulatorPid, RelativeTime, SojournTime} on successfully being allowed to run or {drop, SojournTime}.
Ref is the lock reference, which is a reference(). RegulatorPid is the pid() of the regulator process. RelativeTime is the time difference between when the request was sent and the message that opened the regulator's valve was sent. SojournTime is the approximate time spent in the regulator's message queue.
via module sprotector returns {drop, 0} and does not send the request.See also: ask/1.
size(Regulator)
Specs
size(Regulator) -> Size when Regulator :: regulator(), Size :: non_neg_integer().
Equivalent to size(Regulator, infinity).
size(Regulator, Timeout)
Specs
size(Regulator, Timeout) -> Size
when Regulator :: regulator(), Timeout :: timeout(), Size :: non_neg_integer().
Regulator.
start_link(Module, Args, Opts)
Specs
start_link(Module, Args, Opts) -> StartReturn
when
Module :: module(),
Args :: any(),
Opts :: [start_option()],
StartReturn :: start_return().
Starts a regulator with callback module Module and argument Args, and regulator options Opts.
Opts is a proplist and supports debug, timeout and spawn_opt used by gen_server and gen_fsm. read_time_after sets the number of requests when a cached time is stale and the time is read again. Its value is non_neg_integer() or infinity and defaults to 16.See also: gen_server:start_link/3.
start_link(Name, Module, Args, Opts)
Specs
start_link(Name, Module, Args, Opts) -> StartReturn
when
Name :: name(),
Module :: module(),
Args :: any(),
Opts :: [start_option()],
StartReturn :: start_return().
Name, callback module Module and argument Args, and regulator options Opts.See also: start_link/3.
timeout(Regulator)
Specs
timeout(Regulator) -> ok when Regulator :: regulator().
update(Regulator, Value)
Specs
update(Regulator, Value) -> ok when Regulator :: regulator(), Value :: integer().
Equivalent to update(Regulator, Value, infinity).
update(Regulator, Value, Timeout)
Specs
update(Regulator, Value, Timeout) -> ok
when Regulator :: regulator(), Value :: integer(), Timeout :: timeout().
Synchronously update the valve in the regulator.
Value is an integer() and Timeout is the timout, timeout(), to wait in milliseconds for the regulator to reply to the update.
ok.