Module squeue

This module provides sojourn-time based active queue management with a similar API to OTP's queue module.

This module defines the squeue behaviour.
Required callback functions: init/2, handle_timeout/3, handle_out/3, handle_out_r/3, handle_join/3.

See also: squeue_codel, squeue_naive, squeue_timeout.

Description

This module provides sojourn-time based active queue management with a similar API to OTP's queue module.

A subset of the squeue API is a similar to a subset of the queue API. There are two main differences. Firstly when {value, Item} would be returned by queue, squeue returns {SojournTime, Item}, where SojournTime (non_neg_integer()) is the sojourn time of the item (or length of time an item waited in the queue).

Secondly that items may be dropped by the queue's management algorithm. The dropped items will be included in the return value when the queue itself is also returned. The dropped items are a list of the form: [{SojournTime, Item}], which is ordered with the item with the greatest SojournTime (i.e. the oldest) at the head.

squeue also provides an optional first argument to some functions in common with queue: Time. This argument is of type integer() and sets the current time of the queue, if Time is greater than (or equal) to the queue's previous time.

squeue includes 4 queue management algorithms: squeue_naive, squeue_timeout, squeue_codel and squeue_codel_timeout.

A custom queue management algorithm must implement the squeue behaviour. The first callback is init/2:
  -callback init(Time :: integer(), Args :: any()) -> State :: any().

Time is the time of the queue at creation. It is 0 for squeue:new/2 and for squeue:new/3 it is the Time (first) argument. Time can be positive or negative. All other callbacks will receive the current time of the queue as the first argument. It is monotically increasing, so subsequent calls will use the same or a greater time.

Args is the last argument passed to new/2 or new/3. It can be any term.

The queue management callback module is called before any squeue function alters an internal queue. Before squeue:in/2,3 and squeue:filter/2,3 the callback function is handle_timeout/3:
  -callback handle_timeout(Time ::integer(), Q :: internal_queue(Item),
                           State :: any()) ->
      {Drops :: [{InTime :: integer(), Item}], NQ :: internal_queue(Item),
       NState :: any()}.

Q is the internal representation of the queue. It is a queue with 2-tuple items of the form {InTime, Item}. InTime is the time the item was inserted into the queue. The insertion time may lag behind the current time of the queue. Item is the item itself. The order of items in the queue is such that an item has a InTime less than or equal to the InTime of all items behind it in the queue. NQ is the queue after dropping any items.

State is the state of the callback module, and NState is the new state after applying the queue management.

Drops is a list of dropped items. The items must be ordered with item nearest the tail of queue nearest the head of the list. For example:
  {DropQ, NQ} = queue:split(N, Q),
  Drops = lists:reverse(queue:to_list(DropQ)),
  {Drops, NQ, NState}.

squeue will set the sojourn times and reverse the list before returning the drops. All items from Q must either be in Drops or NQ, but not both.

When an item is dequeued from the head of queue with squeue:out/1,2, the callback function handle_out/3 is called to dequeue the item:
  -callback handle_out(Time ::integer(), Q :: internal_queue(Item),
                      State :: any()) ->
     {empty | {InTime :: integer(), Item},
      Drops :: [{InTime2 :: integer(), Item}],
      NQ :: internal_queue(Item), NState :: any()}.

Time, Q and State represent the current time, the internal queue and the state of the callback module, respectively.

The first element of the returned 3-tuple is empty if NQ is empty. Otherwise it is {InTime, Item}, the item at the head of the queue after dropping Drops. NQ is the queue after dequeuing drops and the head of the queue. For example:
  case queue:out(Q) of
      {empty, NQ} ->
          {empty, Drops, NQ, NState};
      {{value, {InTime, Item}}, NQ} ->
          {{InTime, Item}, Drops, NQ, NState}
  end.
Similarly when an item is dequeued from the tail of the queue with squeue:out_r/1,2, the callback function handle_out_r/3:
  -callback handle_out_r(Time ::integer(), Q :: internal_queue(Item),
                         State :: any()) ->
      {empty | {InTime :: integer(), Item},
       Drops :: [{InTime2 :: integer(), Item}],
       NQ :: internal_queue(Item), NState :: any()}.

This callback is the same as handle_out/3, except the item is dequeued from the tail.

Before joining a queue to the tail of the queue with squeue:join/2, the callback function handle_join/3 is called:
  -callback handle_join(Time ::integer(), Q :: internal_queue(Item :: any()),
                        State :: any()) ->
     NState :: any().
This callback can update the state but not drop items or change the internal queue.

Data Types

internal_queue()

internal_queue(_Any) = queue()

internal_queue()

internal_queue(_Any) = queue()

squeue()

squeue(Item) = #squeue{queue = internal_queue(Item)}

squeue()

squeue() = squeue(any())

squeue()

squeue(Item) = #squeue{queue = internal_queue(Item)}

squeue()

squeue() = squeue(any())

Function Index

drop/1Equivalent to drop(time(S), S).
drop/2Advances the queue, S, to time Time and drops items, Drops, from the queue.
drop_r/1Equivalent to drop_r(time(S), S).
drop_r/2Advances the queue, S, to time Time and drops items, Drops, from the queue.
filter/2Equivalent to filter(time(S), Filter, S).
filter/3Advances the queue, S, to time Time and applies a filter fun, Filter, to all items in the queue.
in/2Equivalent to in(time(S), time(S), Item, S).
in/3Equivalent to in(Time, Time, Item, S).
in/4Advances the queue, S, to time Time and then inserts the item, Item, with time InTime at the tail of queue, S.
is_queue/1Tests if a term, Term, is an squeue queue, returns true if is, otherwise false.
join/2Joins two queues, S1 and S2, into one queue, NS, with the items in S1 at the head and the items in S2 at the tail.
len/1Returns the length of the queue, S.
new/0Equivalent to new(0, squeue_naive, undefined).
new/1Equivalent to new(Time, squeue_naive, undefined).
new/2Equivalent to new(0, Module, Args).
new/3Returns an empty queue, S, with the Module management algorithm started with arguments Args and time of Time.
out/1Equivalent to out(time(S), S).
out/2Advances the queue, S, to time Time and removes the item, Item, from the head of queue, S.
out_r/1Equivalent to out_r(time(S), S).
out_r/2Advances the queue, S, to time Time and removes the item, Item, from the tail of queue, S.
time/1Returns the current time, Time, of the queue, S.
time/2Advances the queue, S, to time Time, without applying active queue management.
timeout/1Equivalent to timeout(time(S), S).
timeout/2Advances the queue, S, to time Time.
to_list/1Returns a list of items, List, in the queue, S.

Function Details

drop/1

drop(S) -> {Drops, NS}

Equivalent to drop(time(S), S).

drop/2

drop(Time, S) -> {Drops, NS}

Advances the queue, S, to time Time and drops items, Drops, from the queue. Returns {Drops, NS}, where Drops is the list of dropped items and their sojourn times, and NS is the resulting queue without the dropped items. If S is empty rauses an empty error.

If the active queue management callback module does not drop any items, the item at the head of the queue is dropped.

This function is different from queue:drop/1, as the return value is a 2-tuple with the dropped items, Drops and the new queue , NS, instead of just the new queue.

This function raises the error badarg if Time is less than the current time of the queue.

drop_r/1

drop_r(S) -> {Drops, NS}

Equivalent to drop_r(time(S), S).

drop_r/2

drop_r(Time, S) -> {Drops, NS}

Advances the queue, S, to time Time and drops items, Drops, from the queue. Returns {Drops, NS}, where Drops is the list of dropped items and their sojourn times, and NS is the resulting queue without the dropped items. If S is empty rauses an empty error.

If the active queue management callback module does not drop any items, the item at the tail the queue is dropped.

This function is different from queue:drop_r/1, as the return value is a 2-tuple with the dropped items, Drops and the new queue , NS, instead of just the new queue. Also the dropped item or items may not be from the tail of the queue.

This function raises the error badarg if Time is less than the current time of the queue.

filter/2

filter(Filter, S) -> {Drops, NS}

Equivalent to filter(time(S), Filter, S).

filter/3

filter(Time, Filter, S) -> {Drops, NS}

Advances the queue, S, to time Time and applies a filter fun, Filter, to all items in the queue. Returns a tuple containing the dropped items and their sojourn times, Drops, and the new queue, NS.

If Filter(Item) returns true, the item appears in the new queue.

If Filter(Item) returns false, the item does not appear in the new queue or the dropped items.

If Filter(Item) returns a list of items, these items appear in the new queue with all items having the start time of the origin item, Item.

This function raises the error badarg if Time is less than the current time of the queue.

in/2

in(Item, S) -> {Drops, NS}

Equivalent to in(time(S), time(S), Item, S).

in/3

in(Time, Item, S) -> {Drops, NS}

Equivalent to in(Time, Time, Item, S).

in/4

in(Time, InTime, Item, S) -> {Drops, NS}

Advances the queue, S, to time Time and then inserts the item, Item, with time InTime at the tail of queue, S. Returns a tuple containing the dropped items and their sojourn times, Drops, and resulting queue, NS.

If InTime is less than the last time an item was inserted at the tail of the queue, the item is inserted with the same time as the last item. This time may be before the current time of the queue.

This function raises the error badarg if InTime is greater than Time. This function raises the error badarg if Time is less than the current time of the queue.

is_queue/1

is_queue(Term) -> Bool

Tests if a term, Term, is an squeue queue, returns true if is, otherwise false.

join/2

join(S1, S2) -> NS

Joins two queues, S1 and S2, into one queue, NS, with the items in S1 at the head and the items in S2 at the tail.

This function raises the error badarg if any item in queue S1 was added after any item in queue S2.

This function raises the error badarg if the current time of the queues, S1 and S2, are not the same.

len/1

len(S) -> Len

Returns the length of the queue, S.

new/0

new() -> S

Equivalent to new(0, squeue_naive, undefined).

new/1

new(Time) -> S

Equivalent to new(Time, squeue_naive, undefined).

new/2

new(Module, Args) -> S

Equivalent to new(0, Module, Args).

new/3

new(Time, Module, Args) -> S

Returns an empty queue, S, with the Module management algorithm started with arguments Args and time of Time.

out/1

out(S) -> {Result, Drops, NS}

Equivalent to out(time(S), S).

out/2

out(Time, S) -> {Result, Drops, NS}

Advances the queue, S, to time Time and removes the item, Item, from the head of queue, S. Returns {{SojournTime, Item}, Drops, NS}, where SojournTime is the time length of time Item spent in the queue, Drops is the list of dropped items and their sojourn times, and NS is the resulting queue without the removed and dropped items. If the queue is empty after dropping items {empty, Drops, NS} is returned.

This function is slightly different from queue:out/1, as the sojourn time is included in the result in the place of the atom value and items can be dropped.

This function raises the error badarg if Time is less than the current time of the queue.

out_r/1

out_r(S) -> {Result, Drops, NS}

Equivalent to out_r(time(S), S).

out_r/2

out_r(Time, S) -> {Result, Drops, NS}

Advances the queue, S, to time Time and removes the item, Item, from the tail of queue, S. Returns {{SojournTime, Item}, Drops, NS}, where SojournTime is the time length of time Item spent in the queue, Drops is the list of dropped items and their sojourn times and NS is the resulting queue without the removed and dropped items. If the queue is empty after dropping items {empty, Drops, NS} is returned.

This function is slightly different from queue:out_r/1, as the sojourn time is included in the result in the place of the atom value and items can be dropped.

This function raises the error badarg if Time is less than the current time of the queue.

time/1

time(S) -> Time

Returns the current time, Time, of the queue, S.

time/2

time(Time, S) -> NS

Advances the queue, S, to time Time, without applying active queue management. Returns the new queue, NS.

This function raises the error badarg if Time is less than the current time of the queue.

timeout/1

timeout(S) -> {Drops, NS}

Equivalent to timeout(time(S), S).

timeout/2

timeout(Time, S) -> {Drops, NS}

Advances the queue, S, to time Time. Returns a tuple containing the dropped items and their sojourn times, Drops, and resulting queue, NS.

This function raises the error badarg if Time is less than the current time of the queue.

to_list/1

to_list(S) -> List

Returns a list of items, List, in the queue, S.

The order of items in List matches their order in the queue, S, so that the item at the head of the queue is at the head of the list.


Generated by EDoc, May 1 2015, 11:53:45.