cubq v0.3.0 CubQ View Source

CubQ is a queue abstraction on top of CubDB. It implements persistent local (double-ended) queue and stack semantics.

Usage

CubQ is given a CubDB process and a queue identifier upon start:

{:ok, db} = CubDB.start_link(data_dir: "my/data/directory")
{:ok, pid} = CubQ.start_link(db: db, queue: :my_queue_id)

Queues

Queue semantics are implemented by the enqueue/2 and dequeue/1 functions:

CubQ.enqueue(pid, :one)
#=> :ok

CubQ.enqueue(pid, :two)
#=> :ok

CubQ.dequeue(pid)
#=> {:ok, :one}

CubQ.dequeue(pid)
#=> {:ok, :two}

# When there are no more items in the queue, [`dequeue/1`](#dequeue/1) returns `nil`:

CubQ.dequeue(pid)
#=> nil

Note that items can be any Elixir (or Erlang) term:

CubQ.enqueue(pid, %SomeStruct{foo: "bar"})
#=> :ok

CubQ.dequeue(pid)
#=> {:ok, %SomeStruct{foo: "bar"}}

The queue is actually double-ended, so items can be prepended too:

CubQ.enqueue(pid, :one)
#=> :ok

CubQ.prepend(pid, :zero)
#=> :ok

CubQ.dequeue(pid)
#=> {:ok, :zero}

Stacks

Stack semantics are implemented by the push/2 and pop/1 functions:

CubQ.push(pid, :one)
#=> :ok

CubQ.push(pid, :two)
#=> :ok

CubQ.pop(pid)
#=> {:ok, :two}

CubQ.pop(pid)
#=> {:ok, :one}

# When there are no more items in the stack, [`pop/1`](#pop/1) returns `nil`:

CubQ.pop(pid)
#=> nil

Mixing things up

As the underlying data structure used for stacks and queues is the same, queue and stack semantics can be mixed on the same queue.

Link to this section Summary

Functions

Positively acknowledges the successful consumption of an item taken with dequeue_ack/2 or pop_ack/2.

Returns a specification to start this module under a supervisor.

Deletes all items from the queue.

Removes an item from the beginning of the queue and returns it.

Removes an item from the beginning of the queue and returns it, expecting a manual confirmation of its successful processing with ack/2. If ack/2 is not called before timeout, the item is put back at the beginning of the queue.

Inserts an item at the end of the queue.

Negatively acknowledges an item taken with dequeue_ack/2 or pop_ack/2, causing it to be put back in the queue.

Returns the item at the beginning of the queue without removing it.

Returns the item at the end of the queue without removing it.

Removes an item from the end of the queue and returns it.

Removes an item from the end of the queue and returns it, expecting a manual confirmation of its successful processing with ack/2. If ack/2 is not called before timeout, the item is put back at the end of the queue.

Inserts an item at the beginning of the queue.

Starts a CubQ process without link.

Starts a CubQ process linked to the calling process.

Link to this section Types

Link to this opaque

ack_id()

View Source (opaque)
ack_id()
Link to this type

option()

View Source
option() :: {:db, GenServer.server()} | {:queue, term()}

Link to this section Functions

Link to this function

ack(pid, ack_id)

View Source
ack(GenServer.server(), ack_id()) :: :ok | {:error, term()}

Positively acknowledges the successful consumption of an item taken with dequeue_ack/2 or pop_ack/2.

See the documentation for dequeue_ack/2 for more details.

Link to this function

append(pid, item)

View Source
append(GenServer.server(), item()) :: :ok | {:error, term()}

Same as enqueue/2

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

delete_all(pid, batch_size \\ 100)

View Source
delete_all(GenServer.server(), pos_integer()) :: :ok | {:error, term()}

Deletes all items from the queue.

The items are deleted in batches, and the size of the batch can be specified as the optional second argument.

Link to this function

dequeue(pid)

View Source
dequeue(GenServer.server()) :: {:ok, item()} | nil | {:error, term()}

Removes an item from the beginning of the queue and returns it.

It returns {:ok, item} if there are items in the queue, nil if the queue is empty, and {:error, cause} on error.

Example:

CubQ.enqueue(pid, :one)
#=> :ok

CubQ.enqueue(pid, :two)
#=> :ok

CubQ.dequeue(pid)
#=> {:ok, :one}

CubQ.dequeue(pid)
#=> {:ok, :two}
Link to this function

dequeue_ack(pid, timeout \\ 5000)

View Source
dequeue_ack(GenServer.server(), timeout()) ::
  {:ok, item(), ack_id()} | nil | {:error, term()}

Removes an item from the beginning of the queue and returns it, expecting a manual confirmation of its successful processing with ack/2. If ack/2 is not called before timeout, the item is put back at the beginning of the queue.

The dequeue_ack/2 function is useful when implementing "at least once" semantics, especially when more than one consumer takes items from the same queue, to ensure that each item is successfully consumed before being discarded.

The problem is the following: if a consumer took an item with dequeue/1 and then crashes before processing it, the item would be lost. With dequeue_ack/2 instead, the item is not immediately removed, but instead atomically transfered to a staging storage. If the consumer successfully processes the item, it can call ack/2 (acknowledgement) to confirm the success, after which the item is discarded. If ack/2 is not called within the timeout (expressed in milliseconds, 5000 by default), the item is automatically put back to the queue, so it can be dequeued again by a consumer. If the consumer wants to put back the item to the queue immediately, it can also call nack/2 (negative acknowledgement) explicitly.

The return value of dequeue_ack/2 is a 3-tuple: {:ok, item, ack_id}. The ack_id is the argument that must be passed to ack/2 or nack/2 to confirm the successful (or insuccessful) consumption of the item.

Note that dequeue_ack/2 performs its operation in an atomic and durable way, so even if the CubQ process crashes, after restarting it will still re-insert the items pending acknowledgement in the queue after the timeout elapses. After restarting though, the timeouts are also restarted, so the effective time before the item goes back to the queue can be larger than the original timeout.

In case of timeout or negative acknowledgement, the item is put back in the queue from the start, so while global ordering cannot be enforced in case of items being put back to the queue, the items will be ready to be dequeued again immediately after being back to the queue.

Example

CubQ.enqueue(pid, :one)
#=> :ok

CubQ.enqueue(pid, :two)
#=> :ok

{:ok, item, ack_id} = CubQ.dequeue_ack(pid, 3000)
#=> {:ok, :one, ack_id}

# More items can be now taken from the queue
CubQ.dequeue(pid)
#=> {:ok, :two}

# If 3 seconds elapse without `ack` being called, or `nack` is called,
# the item `:one` would be put back to the queue, so it can be dequeued
# again:
CubQ.nack(pid, ack_id)
#=> :ok

{:ok, item, ack_id} = CubQ.dequeue_ack(pid, 3000)
#=> {:ok, :one, ack_id}

# When successful consumption is confirmed by calling `ack`, the item
# is finally discarded and won't be put back in the queue anymore:
CubQ.ack(pid, ack_id)
#=> :ok
Link to this function

enqueue(pid, item)

View Source
enqueue(GenServer.server(), item()) :: :ok | {:error, term()}

Inserts an item at the end of the queue.

The item can be any Elixir (or Erlang) term.

Example:

CubQ.enqueue(pid, :one)
#=> :ok

CubQ.enqueue(pid, :two)
#=> :ok

CubQ.dequeue(pid)
#=> {:ok, :one}
Link to this function

nack(pid, ack_id)

View Source
nack(GenServer.server(), ack_id()) :: :ok | {:error, term()}

Negatively acknowledges an item taken with dequeue_ack/2 or pop_ack/2, causing it to be put back in the queue.

See the documentation for dequeue_ack/2 for more details.

Link to this function

peek_first(pid)

View Source
peek_first(GenServer.server()) :: {:ok, item()} | nil | {:error, term()}

Returns the item at the beginning of the queue without removing it.

It returns {:ok, item} if there are items in the queue, nil if the queue is empty, and {:error, cause} on error.

Example:

CubQ.enqueue(pid, :one)
#=> :ok

CubQ.enqueue(pid, :two)
#=> :ok

CubQ.peek_first(pid)
#=> {:ok, :one}

CubQ.dequeue(pid)
#=> {:ok, :one}
Link to this function

peek_last(pid)

View Source
peek_last(GenServer.server()) :: {:ok, item()} | nil | {:error, term()}

Returns the item at the end of the queue without removing it.

It returns {:ok, item} if there are items in the queue, nil if the queue is empty, and {:error, cause} on error.

Example:

CubQ.enqueue(pid, :one)
#=> :ok

CubQ.enqueue(pid, :two)
#=> :ok

CubQ.peek_last(pid)
#=> {:ok, :two}

CubQ.pop(pid)
#=> {:ok, :two}
Link to this function

pop(pid)

View Source
pop(GenServer.server()) :: {:ok, item()} | nil | {:error, term()}

Removes an item from the end of the queue and returns it.

It returns {:ok, item} if there are items in the queue, nil if the queue is empty, and {:error, cause} on error.

Example:

CubQ.push(pid, :one)
#=> :ok

CubQ.push(pid, :two)
#=> :ok

CubQ.pop(pid)
#=> {:ok, :two}

CubQ.pop(pid)
#=> {:ok, :one}
Link to this function

pop_ack(pid, timeout \\ 5000)

View Source
pop_ack(GenServer.server(), timeout()) ::
  {:ok, item(), ack_id()} | nil | {:error, term()}

Removes an item from the end of the queue and returns it, expecting a manual confirmation of its successful processing with ack/2. If ack/2 is not called before timeout, the item is put back at the end of the queue.

See the documentation for dequeue_ack/2 for more details: the pop_ack/2 function works in the same way as dequeue_ack/2, but with stack semantics instead of queue semantics.

Link to this function

prepend(pid, item)

View Source
prepend(GenServer.server(), item()) :: :ok | {:error, term()}

Inserts an item at the beginning of the queue.

The item can be any Elixir (or Erlang) term.

Example:

CubQ.enqueue(pid, :one)
#=> :ok

CubQ.prepend(pid, :zero)
#=> :ok

CubQ.dequeue(pid)
#=> {:ok, :zero}

CubQ.dequeue(pid)
#=> {:ok, :one}
Link to this function

push(pid, item)

View Source
push(GenServer.server(), item()) :: :ok | {:error, term()}

Same as enqueue/2.

Normally used together with pop/1 for stack semantics.

Starts a CubQ process without link.

The argument is a keyword list of options, see start_link/1 for details.

Starts a CubQ process linked to the calling process.

The argument is a keyword list of options:

  • db (required): the pid (or name) of the CubDB process for storing the queue

  • queue (required): the identifier of the queue. It can be any Elixir term, but typically one would use an atom, like :my_queue

GenServer options like name and timeout can also be given, and are forwarded to GenServer.start_link/3 as the third argument.