cbuf v0.7.1 Cbuf.Queue

Cbuf.Queue implements the Cbuf behaviour with Erlang’s built-in queue as its implementation.

For examples of typical use, see the documentation for new/1, insert/2, peek/1, and delete/1.

Operations that must interact with the actual data of the buffer (new/1, insert/2, peek/1, pop/1, delete/1, to_list/1, member?/2), perform as well as Queue does. This is good, as Queue is fast.

Queues, like the other normal Elixir/Erlang datastructures, typically exist in process memory. For example, it is recommended to use this module behind a GenServer in order to share its usage across the system. Using Cbuf.Queue in this way means that updates to the buffer will be at the mercy of the process’ garbage collection. For large data sets with a lot of process GC, your application may benefit from a different approach. For this reason, there is also Cbuf.ETS that is drop-in API compatible. I recommend defaulting to using Cbuf.Queue with a GenServer and benchmarking your application.

An example partially-complete GenServer might look something like the following:

defmodule EventTracker do
  use GenServer
  alias Cbuf.Queue, as: Cbuf

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(_args) do
    {:ok, nil}
  end

  ###########
  ### API ###
  ###########

  def new(n) do
    GenServer.call(__MODULE__, {:new, n})
  end

  def insert(val) do
    GenServer.call(__MODULE__, {:insert, val})
  end

  def insert_async(val) do
    GenServer.cast(__MODULE__, {:insert, val})
  end

  def peek do
    GenServer.call(__MODULE__, :peek)
  end

  def pop do
    GenServer.call(__MODULE__, :pop)
  end

  #################
  ### CALLBACKS ###
  #################

  ## handle_call

  def handle_call({:new, n}, _from, _state) do
    buf = Cbuf.new(n)

    {:reply, :ok, buf}
  end

  def handle_call({:insert, val}, _from, buf) do
    new_buf = Cbuf.insert(buf, val)

    {:reply, :ok, new_buf}
  end

  def handle_call(:peek, _from, buf) do
    val = Cbuf.peek(buf)

    {:reply, val, buf}
  end

  def handle_call(:pop, _from, buf) do
    {val, new_buf} = Cbuf.pop(buf)

    {:reply, val, new_buf}
  end

  ## handle_cast

  def handle_cast({:insert, val}, buf) do
    new_buf = Cbuf.insert(buf, val)

    {:noreply, new_buf}
  end
end

Link to this section Summary

Functions

Returns the count of the non-empty values in the buffer

Return a new buffer with the oldest item in the buffer removed

Whether or not the buffer is empty. This value corresponds to when the buffer has a count of zero, not its size

Insert a value into a circular buffer. Values are inserted such that when the buffer is full, the oldest items are overwritten first

Queries buf for the presence of val

Create a new circular buffer of a given size

See the oldest value in the buffer. Works in constant time

Return the oldest value in the buffer, and a new buffer with that value removed

Calculate the allocated size for the buffer. This is maximum addressable size of the buffer, not how many values it currently contains. For the number of values in the current buffer, see count/1

Convert a circular buffer to a list. The list is ordered by age, oldest to newest. This operation takes linear time

Link to this section Types

Link to this opaque t() (opaque)
t()

Link to this section Functions

Link to this function count(buf)
count(t()) :: non_neg_integer()

Returns the count of the non-empty values in the buffer.

iex> Cbuf.Queue.new(5) |> Cbuf.Queue.insert("hi") |> Cbuf.Queue.count()
1

iex> Cbuf.Queue.new(5) |> Cbuf.Queue.count()
0

iex> Cbuf.Queue.new(5) |> Cbuf.Queue.insert(nil) |> Cbuf.Queue.count()
1

iex> Cbuf.Queue.new(5) |> Cbuf.Queue.insert("hi") |> Cbuf.Queue.delete() |> Cbuf.Queue.count()
0

iex> buf = Enum.reduce(1..13, Cbuf.Queue.new(5), &Cbuf.Queue.insert(&2, &1))
iex> Cbuf.Queue.delete(buf) |> Cbuf.Queue.count()
4

iex> Cbuf.Queue.new(3) |> Cbuf.Queue.delete() |> Cbuf.Queue.delete() |> Cbuf.Queue.count()
0
Link to this function delete(buf)
delete(t()) :: t()

Return a new buffer with the oldest item in the buffer removed.

iex> buf = Enum.reduce(1..20, Cbuf.Queue.new(3), fn(val, acc) -> Cbuf.Queue.insert(acc, val) end)
iex> buf = Cbuf.Queue.delete(buf)
iex> Cbuf.Queue.peek(buf)
19

iex> buf = Enum.reduce(1..6, Cbuf.Queue.new(5), fn(val, acc) -> Cbuf.Queue.insert(acc, val) end)
iex> buf = Cbuf.Queue.delete(buf)
iex> Cbuf.Queue.peek(buf)
3

iex> buf = Enum.reduce(1..6, Cbuf.Queue.new(5), fn(val, acc) -> Cbuf.Queue.insert(acc, val) end)
iex> Cbuf.Queue.delete(buf) |> Cbuf.Queue.count()
4

iex> buf = Cbuf.Queue.new(5)
iex> buf = Cbuf.Queue.insert(buf, "ok")
iex> Cbuf.Queue.delete(buf)
#Cbuf<[]>

iex> buf = Cbuf.Queue.new(5)
iex> Cbuf.Queue.delete(buf)
#Cbuf<[]>
Link to this function empty?(buf)
empty?(t()) :: boolean()

Whether or not the buffer is empty. This value corresponds to when the buffer has a count of zero, not its size.

iex> buf = Cbuf.Queue.new(5)
iex> Cbuf.Queue.empty?(buf)
true

iex> buf = Cbuf.Queue.new(5) |> Cbuf.Queue.insert("hi")
iex> Cbuf.Queue.empty?(buf)
false

iex> buf = Cbuf.Queue.new(5) |> Cbuf.Queue.insert("hi") |> Cbuf.Queue.delete()
iex> Cbuf.Queue.empty?(buf)
true
Link to this function insert(buf, val)
insert(t(), term()) :: t()

Insert a value into a circular buffer. Values are inserted such that when the buffer is full, the oldest items are overwritten first.

iex> buf = Cbuf.Queue.new(5)
iex> buf |> Cbuf.Queue.insert("a") |> Cbuf.Queue.insert("b")
#Cbuf<["a", "b"]>

iex> buf = Cbuf.Queue.new(3)
iex> Enum.reduce(1..20, buf, fn(val, acc) -> Cbuf.Queue.insert(acc, val) end)
#Cbuf<[18, 19, 20]>

iex> buf = Cbuf.Queue.new(1)
iex> Enum.reduce(1..20, buf, fn(val, acc) -> Cbuf.Queue.insert(acc, val) end)
#Cbuf<[20]>
Link to this function member?(buf, val)
member?(t(), term()) :: boolean()

Queries buf for the presence of val.

iex> Cbuf.Queue.new(5) |> Cbuf.Queue.insert("hello") |> Cbuf.Queue.member?("hello")
true

iex> Cbuf.Queue.new(5) |> Cbuf.Queue.insert("hello") |> Cbuf.Queue.member?("nope")
false
Link to this function new(size)
new(pos_integer()) :: t()

Create a new circular buffer of a given size.

iex> Cbuf.Queue.new(5)
#Cbuf<[]>
Link to this function peek(buf)
peek(t()) :: term() | nil

See the oldest value in the buffer. Works in constant time.

iex> buf = Enum.reduce(1..20, Cbuf.Queue.new(3), fn(val, acc) -> Cbuf.Queue.insert(acc, val) end)
iex> Cbuf.Queue.peek(buf)
18

iex> buf = Cbuf.Queue.new(20) |> Cbuf.Queue.insert("ok") |> Cbuf.Queue.insert("fine")
iex> Cbuf.Queue.peek(buf)
"ok"

iex> Cbuf.Queue.new(3) |> Cbuf.Queue.peek()
nil
Link to this function pop(buf)
pop(t()) :: {term() | nil, t()}

Return the oldest value in the buffer, and a new buffer with that value removed.

iex> buf = Enum.reduce(1..20, Cbuf.Queue.new(3), fn(val, acc) -> Cbuf.Queue.insert(acc, val) end)
iex> {val, buf} = Cbuf.Queue.pop(buf)
iex> {val, Cbuf.Queue.to_list(buf)} # Elixir has trouble inspecting a nested struct, see https://hexdocs.pm/ex_unit/ExUnit.DocTest.html#module-opaque-types
{18, [19, 20]}

iex> {val, buf} = Cbuf.Queue.new(1) |> Cbuf.Queue.insert("hi") |> Cbuf.Queue.pop()
iex> {val, Cbuf.Queue.to_list(buf)}
{"hi", []}

Calculate the allocated size for the buffer. This is maximum addressable size of the buffer, not how many values it currently contains. For the number of values in the current buffer, see count/1

iex> Cbuf.Queue.new(5) |> Cbuf.Queue.size()
5
Link to this function to_list(buf)
to_list(t()) :: [term()] | []

Convert a circular buffer to a list. The list is ordered by age, oldest to newest. This operation takes linear time.

iex> buf = Cbuf.Queue.new(5)
iex> buf |> Cbuf.Queue.insert("a") |> Cbuf.Queue.insert("b") |> Cbuf.Queue.to_list()
["a", "b"]

iex> buf = Cbuf.Queue.new(3)
iex> Enum.reduce(1..20, buf, fn(val, acc) -> Cbuf.Queue.insert(acc, val) end) |> Cbuf.Queue.to_list()
[18, 19, 20]

iex> Cbuf.Queue.new(5) |> Cbuf.Queue.to_list()
[]