Module svalve

This module provides sojourn-time based active queue management using squeue, with a feedback loop that dequeues items based on the sojourn times of items in another queue.

This module defines the svalve behaviour.
Required callback functions: init/2, handle_sojourn/4, handle_sojourn_r/4, handle_sojourn_closed/4, handle_dropped/3, handle_dropped_r/3, handle_dropped_closed/3.

See also: squeue, svalve_codel_r, svalve_naive, svalve_timeout.

Description

This module provides sojourn-time based active queue management using squeue, with a feedback loop that dequeues items based on the sojourn times of items in another queue.

new/0 and new/1 create a new svalve with the default feedback loop svalve_naive using an empty squeue managed by squeue_naive. svalve_naive will never trigger a dequeue due to a signal.new/2 and new/3 create a new svalve with a custom feedback loop, such as svalve_codel_r, using an empty squeue managed by squeue_naive.

The underlying squeue queue can be retrieved and replaced with squeue/1 and squeue/2. sojourn/2,3,4 signal the feedback loop to dequeue from the head of the queue. dropped/1,2,3 signal a drop and can also result in a dequeue from the head. Alternatively sojourn_r/2,3,4 and dropped_r/1,2,3 are used to dequeue from the tail.

To close the valve close/1, and prevent sojourn/2,3,4, sojourn_r/2,3,4,, dropped/1,2,3 and dropped_r/1,2,3 from dequeuing items. To open the valve open/1, and allow items to be dequeued by the feedback loop.

All other functions are equivalent to those of the same name and arity in squeue.

svalve includes 3 feedback loop algorithms: svalve_naive, svalve_timeout and svalve_codel_r.

A custom queue management algorithm must implement the svalve behaviour. The first callback is init/2:
  -callback init(Time :: integer(), Args :: any()) -> State :: any().

Time is the time of the feedback loop, which may be less than the time of the squeue. It is 0 for new/2,3. Time can be positive or negative. All other callbacks will receive the current time of queue as the first argument. It is monotonically increasing, so subsequent calls will use the same or a greater time.

Args is the last argument passed to new/2 or new/3. It can be any term.

The feedback loop is called for every signal sent to the queue and should either dequeue an item with squeue:out/1 or squeue:out_r/1, or apply active queue management with squeue:timeout/1. As the squeue uses a different time the other variants should not be used.

When the valve is open and a sojourn is signaled to dequeue from the head using sojourn/2,3,4 the callback function is handle_sojourn/4:
  -callback handle_sojourn(Time ::integer(),
                           SojournTime :: non_neg_integer(),
                           S :: squeue:squeue(Item), State :: any()) ->
      {empty | closed | {ItemSojournTime :: non_neg_integer(), Item},
       Drops :: [{DropSojournTime :: non_neg_integer(), Item}],
       NS :: squeue:squeue(Item), NState :: any()}.

SojournTime is the time an item spent in another queue. S is the internal squeue and NS is that squeue after dropping items and dequeuing (or not) an item.

closed means the feedback loop is closed. empty means the feedback loop is open and would have dequeued an item if the queue was not empty. {ItemSojournTime, Item} is the item and its sojourn time from the head of the queue as returned by squeue:out/1.

State is the state of the callback module, and NState is the new state after handling the signal.

Drops is a list of dropped items returned by squeue:out/1 or squeue:timeout/1.

Similarly when the valve is open and a sojourn is signalled to dequeue from the tail using sojourn_r/2,3,4 the callback function is handle_sojourn_r/4:
  -callback handle_sojourn_r(Time ::integer(),
                             SojournTime :: non_neg_integer(),
                             S :: squeue:squeue(Item), State :: any()) ->
      {empty | closed | {ItemSojournTime :: non_neg_integer(), Item},
       Drops :: [{DropSojournTime :: non_neg_integer(), Item}],
       NS :: squeue:squeue(Item), NState :: any()}.

This callback should behave the same as handle_sojourn/4 except use squeue:out_r/1 instead of squeue:out/1.

When the valve is closed and a sojourn is signalled using either sojourn/2,3,4 or sojourn_r/2,3,4 the callback function is handle_sojourn_closed/4:
  -callback handle_sojourn_closed(Time ::integer(),
                                  SojournTime :: non_neg_integer(),
                                  S :: squeue:squeue(Item), State :: any()) ->
      {closed, Drops :: [{DropSojournTime :: non_neg_integer(), Item}],
       NS :: squeue:squeue(Item), NState :: any()}.

As the valve is closed no items should be dequeued and active queue management should be applied using squeue:timeout/1.

Similarly when a drop is signalled, using dropped/1,2,3 and dropped_r/1,2,3, except there is no SojournTime argument:
  -callback handle_dropped(Time ::integer(),
                           S :: squeue:squeue(Item), State :: any()) ->
      {empty | closed | {ItemSojournTime :: non_neg_integer(), Item},
       Drops :: [{DropSojournTime :: non_neg_integer(), Item}],
       NS :: squeue:squeue(Item), NState :: any()}.
  -callback handle_dropped_r(Time ::integer(),
                             S :: squeue:squeue(Item), State :: any()) ->
      {empty | closed | {ItemSojournTime :: non_neg_integer(), Item},
       Drops :: [{DropSojournTime :: non_neg_integer(), Item}],
       NS :: squeue:squeue(Item), NState :: any()}.
  -callback handle_dropped_closed(Time ::integer(),
                                  S :: squeue:squeue(Item), State :: any()) ->
      {closed, Drops :: [{DropSojournTime :: non_neg_integer(), Item}],
       NS :: squeue:squeue(Item), NState :: any()}.

Data Types

svalve()

svalve(Item) = #svalve{squeue = squeue:squeue(Item)}

svalve()

svalve(Item) = #svalve{squeue = squeue:squeue(Item)}

svalve()

svalve() = svalve(any())

Function Index

close/1Disable the feedback loop from dequeuing items from queue, V.
drop/1Equivalent to drop(time(V), V).
drop/2Advances the queue, V, to time Time and drops items, Drops, from the queue.
drop_r/1Equivalent to drop_r(time(V), V).
drop_r/2Advances the queue, V, to time Time and drops items, Drops, from the queue.
dropped/1Equivalent to dropped(time(V), time(V), V).
dropped/2Equivalent to dropped(Time, Time, V).
dropped/3Advance time of the queue, V, to Time and signal the dropping of an item, at time DropTime, to trigger a dequeue from the head of the queue, V.
dropped_r/1Equivalent to dropped_r(time(V), time(V), V).
dropped_r/2Equivalent to dropped_r(Time, Time, V).
dropped_r/3Advance time of the queue, V, to Time and signal the dropping of an item, at time DropTime, to trigger a dequeue from the tail of the queue, V.
filter/2Equivalent to filter(time(V), Filter, V).
filter/3Advances the queue, V, to time Time and applies a filter fun, Filter, to all items in the queue.
in/2Equivalent to in(time(V), time(V), Item, V).
in/3Equivalent to in(Time, Time, Item, V).
in/4Advances the queue, V, to time Time and inserts the item, Item, with time InTime at the tail of queue, V.
is_queue/1Tests if a term, Term, is an svalve queue, returns true if is, otherwise false.
join/2Joins two queues, V1 and V2, into one queue, VS, with the items in V1 at the head and the items in V2 at the tail.
len/1Returns the length of the queue, V.
new/0Equivalent to new(0, svalve_naive, undefined).
new/1Equivalent to new(Time, svalve_naive, undefined).
new/2Equivalent to new(0, Module, Args).
new/3Returns an empty queue, V, with the Module feedback loop started with arguments Args, using an squeue managed by squeue_naive and time of Time.
open/1Enable the feedback loop to dequeue items from queue, V.
out/1Equivalent to out(time(V), V).
out/2Advances the queue, V, to time Time and removes the item, Item, from the head of queue, V.
out_r/1Equivalent to out_r(time(V), V).
out_r/2Advances the queue, V, to time Time and removes the item, Item, from the tail of queue, V.
sojourn/2Equivalent to sojourn(time(V), time(V), SojournTime, V).
sojourn/3Equivalent to sojourn(Time, Time, SojournTime, V).
sojourn/4Advance time of the queue, V, to Time and signal the sojourn time, SojournTime, at time OutTime of another queue to trigger a dequeue from the head of the queue, V.
sojourn_r/2Equivalent to sojourn_r(time(V), time(V), SojournTime, V).
sojourn_r/3Equivalent to sojourn_r(Time, Time, SojournTime, V).
sojourn_r/4Advance time of the queue, V, to Time and signal the sojourn time, SojournTime, at time OutTime of another queue to trigger a dequeue from the tail of the queue, V.
squeue/1Get the internal squeue inside the queue, V.
squeue/2Replace the internal squeue inside the queue, V.
time/1Returns the current time, Time, of the queue, V.
time/2Advances the queue, V, to time Time, without applying active queue management.
timeout/1Equivalent to timeout(time(V), V).
timeout/2Advances the queue, V, to time Time.
to_list/1Returns a list of items, List, in the queue, V.

Function Details

close/1

close(V) -> NV

Disable the feedback loop from dequeuing items from queue, V.

drop/1

drop(V) -> {Drops, NV}

Equivalent to drop(time(V), V).

drop/2

drop(Time, V) -> {Drops, NV}

Advances the queue, V, to time Time and drops items, Drops, from the queue.

See also: squeue:drop/2.

drop_r/1

drop_r(V) -> {Drops, NV}

Equivalent to drop_r(time(V), V).

drop_r/2

drop_r(Time, V) -> {Drops, NV}

Advances the queue, V, to time Time and drops items, Drops, from the queue.

See also: squeue:drop_r/2.

dropped/1

dropped(V) -> {Result, Drops, NV}

Equivalent to dropped(time(V), time(V), V).

dropped/2

dropped(Time, V) -> {Result, Drops, NV}

Equivalent to dropped(Time, Time, V).

dropped/3

dropped(Time, DropTime, V) -> {Result, Drops, NV}

Advance time of the queue, V, to Time and signal the dropping of an item, at time DropTime, to trigger a dequeue from the head of the queue, V. Returns the same values as sojourn/4.

If DropTime is less than the last time a sojourn time or drop was signalled to the feedback loop, the drop is signalled with the same time as the last signal. This time may be before the current time of the queue.

This function raises the error badarg if DropTime is greater than Time. This function raises the error badarg if Time is less than the current time of the queue.

See also: sojourn/4.

dropped_r/1

dropped_r(V) -> {Result, Drops, NV}

Equivalent to dropped_r(time(V), time(V), V).

dropped_r/2

dropped_r(Time, V) -> {Result, Drops, NV}

Equivalent to dropped_r(Time, Time, V).

dropped_r/3

dropped_r(Time, DropTime, V) -> {Result, Drops, NV}

Advance time of the queue, V, to Time and signal the dropping of an item, at time DropTime, to trigger a dequeue from the tail of the queue, V.

Except for dequeuing from the tail behaves the same as dropped/3.

See also: dropped/3.

filter/2

filter(Filter, V) -> {Drops, NV}

Equivalent to filter(time(V), Filter, V).

filter/3

filter(Time, Filter, V) -> {Drops, NV}

Advances the queue, V, to time Time and applies a filter fun, Filter, to all items in the queue.

See also: squeue:filter/3.

in/2

in(Item, V) -> {Drops, NV}

Equivalent to in(time(V), time(V), Item, V).

in/3

in(Time, Item, V) -> {Drops, NV}

Equivalent to in(Time, Time, Item, V).

in/4

in(Time, InTime, Item, V) -> {Drops, NV}

Advances the queue, V, to time Time and inserts the item, Item, with time InTime at the tail of queue, V.

See also: squeue:in/4.

is_queue/1

is_queue(Term) -> Bool

Tests if a term, Term, is an svalve queue, returns true if is, otherwise false.

See also: squeue:is_queue/1.

join/2

join(V1, V2) -> NV

Joins two queues, V1 and V2, into one queue, VS, with the items in V1 at the head and the items in V2 at the tail.

See also: squeue:join/2.

len/1

len(V) -> Len

Returns the length of the queue, V.

See also: squeue:len/1.

new/0

new() -> V

Equivalent to new(0, svalve_naive, undefined).

new/1

new(Time) -> V

Equivalent to new(Time, svalve_naive, undefined).

new/2

new(Module, Args) -> V

Equivalent to new(0, Module, Args).

new/3

new(Time, Module, Args) -> V

Returns an empty queue, V, with the Module feedback loop started with arguments Args, using an squeue managed by squeue_naive and time of Time.

See also: squeue:new/3.

open/1

open(V) -> NV

Enable the feedback loop to dequeue items from queue, V.

out/1

out(V) -> {Result, Drops, NV}

Equivalent to out(time(V), V).

out/2

out(Time, V) -> {Result, Drops, NV}

Advances the queue, V, to time Time and removes the item, Item, from the head of queue, V.

See also: squeue:out/2.

out_r/1

out_r(V) -> {Result, Drops, NV}

Equivalent to out_r(time(V), V).

out_r/2

out_r(Time, V) -> {Result, Drops, NV}

Advances the queue, V, to time Time and removes the item, Item, from the tail of queue, V.

See also: squeue:out/2.

sojourn/2

sojourn(SojournTime, V) -> {Result, Drops, NV}

Equivalent to sojourn(time(V), time(V), SojournTime, V).

sojourn/3

sojourn(Time, SojournTime, V) -> {Result, Drops, NV}

Equivalent to sojourn(Time, Time, SojournTime, V).

sojourn/4

sojourn(Time, OutTime, SojournTime, V) -> {Result, Drops, NV}

Advance time of the queue, V, to Time and signal the sojourn time, SojournTime, at time OutTime of another queue to trigger a dequeue from the head of the queue, V. Returns {Result, Drops, NS}, where Result is closed if the feedback loop is closed, empty if the feedback loop is open but the queue is empty and {SojournTime, Item} if Item is dequeued from the head of the queue. SojournTime is the time length of time Item spent in the queue. Drops is the list of dropped items and their sojourn times, and NS is the resulting queue without the removed and dropped items.

If OutTime is less than the last time a sojourn time or drop was signalled to the feedback loop, the sojourn time is signalled with the same time as the last signal. This time may be before the current time of the queue.

This function raises the error badarg if OutTime is greater than Time. This function raises the error badarg if Time is less than the current time of the queue.

sojourn_r/2

sojourn_r(SojournTime, V) -> {Result, Drops, NV}

Equivalent to sojourn_r(time(V), time(V), SojournTime, V).

sojourn_r/3

sojourn_r(Time, SojournTime, V) -> {Result, Drops, NV}

Equivalent to sojourn_r(Time, Time, SojournTime, V).

sojourn_r/4

sojourn_r(Time, OutTime, SojournTime, V) -> {Result, Drops, NV}

Advance time of the queue, V, to Time and signal the sojourn time, SojournTime, at time OutTime of another queue to trigger a dequeue from the tail of the queue, V.

Except for dequeuing from the tail behaves the same as sojourn/4.

See also: sojourn/4.

squeue/1

squeue(V) -> S

Get the internal squeue inside the queue, V.

squeue/2

squeue(S, V) -> NV

Replace the internal squeue inside the queue, V.

This function raises the error badarg if the current time of the new squeue, S, does not have the same time as the queue, V.

time/1

time(V) -> Time

Returns the current time, Time, of the queue, V.

See also: squeue:time/1.

time/2

time(Time, V) -> NV

Advances the queue, V, to time Time, without applying active queue management.

See also: squeue:time/2.

timeout/1

timeout(V) -> {Drops, NV}

Equivalent to timeout(time(V), V).

timeout/2

timeout(Time, V) -> {Drops, NV}

Advances the queue, V, to time Time.

See also: squeue:timeout/2.

to_list/1

to_list(V) -> List

Returns a list of items, List, in the queue, V.

See also: squeue:to_list/1.


Generated by EDoc, May 1 2015, 11:53:45.