cubq v0.3.0 CubQ View Source
CubQ
is a queue abstraction on top of CubDB
. It implements persistent
local (double-ended) queue and stack semantics.
Usage
CubQ
is given a CubDB
process and a queue identifier upon start:
{:ok, db} = CubDB.start_link(data_dir: "my/data/directory")
{:ok, pid} = CubQ.start_link(db: db, queue: :my_queue_id)
Queues
Queue semantics are implemented by the enqueue/2
and dequeue/1
functions:
CubQ.enqueue(pid, :one)
#=> :ok
CubQ.enqueue(pid, :two)
#=> :ok
CubQ.dequeue(pid)
#=> {:ok, :one}
CubQ.dequeue(pid)
#=> {:ok, :two}
# When there are no more items in the queue, [`dequeue/1`](#dequeue/1) returns `nil`:
CubQ.dequeue(pid)
#=> nil
Note that items can be any Elixir (or Erlang) term:
CubQ.enqueue(pid, %SomeStruct{foo: "bar"})
#=> :ok
CubQ.dequeue(pid)
#=> {:ok, %SomeStruct{foo: "bar"}}
The queue is actually double-ended, so items can be prepended too:
CubQ.enqueue(pid, :one)
#=> :ok
CubQ.prepend(pid, :zero)
#=> :ok
CubQ.dequeue(pid)
#=> {:ok, :zero}
Stacks
Stack semantics are implemented by the push/2
and pop/1
functions:
CubQ.push(pid, :one)
#=> :ok
CubQ.push(pid, :two)
#=> :ok
CubQ.pop(pid)
#=> {:ok, :two}
CubQ.pop(pid)
#=> {:ok, :one}
# When there are no more items in the stack, [`pop/1`](#pop/1) returns `nil`:
CubQ.pop(pid)
#=> nil
Mixing things up
As the underlying data structure used for stacks and queues is the same, queue and stack semantics can be mixed on the same queue.
Link to this section Summary
Functions
Positively acknowledges the successful consumption of an item taken with
dequeue_ack/2
or pop_ack/2
.
Same as enqueue/2
Returns a specification to start this module under a supervisor.
Deletes all items from the queue.
Removes an item from the beginning of the queue and returns it.
Inserts an item at the end of the queue.
Negatively acknowledges an item taken with dequeue_ack/2
or pop_ack/2
,
causing it to be put back in the queue.
Returns the item at the beginning of the queue without removing it.
Returns the item at the end of the queue without removing it.
Removes an item from the end of the queue and returns it.
Inserts an item at the beginning of the queue.
Same as enqueue/2
.
Starts a CubQ
process without link.
Starts a CubQ
process linked to the calling process.
Link to this section Types
Link to this section Functions
ack(pid, ack_id)
View Sourceack(GenServer.server(), ack_id()) :: :ok | {:error, term()}
Positively acknowledges the successful consumption of an item taken with
dequeue_ack/2
or pop_ack/2
.
See the documentation for dequeue_ack/2
for more details.
append(pid, item)
View Sourceappend(GenServer.server(), item()) :: :ok | {:error, term()}
Same as enqueue/2
Returns a specification to start this module under a supervisor.
See Supervisor
.
delete_all(pid, batch_size \\ 100)
View Sourcedelete_all(GenServer.server(), pos_integer()) :: :ok | {:error, term()}
Deletes all items from the queue.
The items are deleted in batches, and the size of the batch can be specified as the optional second argument.
dequeue(pid)
View Sourcedequeue(GenServer.server()) :: {:ok, item()} | nil | {:error, term()}
Removes an item from the beginning of the queue and returns it.
It returns {:ok, item}
if there are items in the queue, nil
if the
queue is empty, and {:error, cause}
on error.
Example:
CubQ.enqueue(pid, :one)
#=> :ok
CubQ.enqueue(pid, :two)
#=> :ok
CubQ.dequeue(pid)
#=> {:ok, :one}
CubQ.dequeue(pid)
#=> {:ok, :two}
dequeue_ack(pid, timeout \\ 5000)
View Sourcedequeue_ack(GenServer.server(), timeout()) :: {:ok, item(), ack_id()} | nil | {:error, term()}
Removes an item from the beginning of the queue and returns it, expecting a
manual confirmation of its successful processing with ack/2
. If ack/2
is
not called before timeout
, the item is put back at the beginning of the
queue.
The dequeue_ack/2
function is useful when implementing "at least once"
semantics, especially when more than one consumer takes items from the same
queue, to ensure that each item is successfully consumed before being
discarded.
The problem is the following: if a consumer took an item with dequeue/1
and
then crashes before processing it, the item would be lost. With
dequeue_ack/2
instead, the item is not immediately removed, but instead
atomically transfered to a staging storage. If the consumer successfully
processes the item, it can call ack/2
(acknowledgement) to confirm the
success, after which the item is discarded. If ack/2
is not called within
the timeout
(expressed in milliseconds, 5000 by default), the item is
automatically put back to the queue, so it can be dequeued again by a
consumer. If the consumer wants to put back the item to the queue immediately,
it can also call nack/2
(negative acknowledgement) explicitly.
The return value of dequeue_ack/2
is a 3-tuple: {:ok, item, ack_id}
.
The ack_id
is the argument that must be passed to ack/2
or nack/2
to
confirm the successful (or insuccessful) consumption of the item.
Note that dequeue_ack/2
performs its operation in an atomic and durable way,
so even if the CubQ
process crashes, after restarting it will still
re-insert the items pending acknowledgement in the queue after the timeout
elapses. After restarting though, the timeouts are also restarted, so the
effective time before the item goes back to the queue can be larger than the
original timeout.
In case of timeout or negative acknowledgement, the item is put back in the queue from the start, so while global ordering cannot be enforced in case of items being put back to the queue, the items will be ready to be dequeued again immediately after being back to the queue.
Example
CubQ.enqueue(pid, :one)
#=> :ok
CubQ.enqueue(pid, :two)
#=> :ok
{:ok, item, ack_id} = CubQ.dequeue_ack(pid, 3000)
#=> {:ok, :one, ack_id}
# More items can be now taken from the queue
CubQ.dequeue(pid)
#=> {:ok, :two}
# If 3 seconds elapse without `ack` being called, or `nack` is called,
# the item `:one` would be put back to the queue, so it can be dequeued
# again:
CubQ.nack(pid, ack_id)
#=> :ok
{:ok, item, ack_id} = CubQ.dequeue_ack(pid, 3000)
#=> {:ok, :one, ack_id}
# When successful consumption is confirmed by calling `ack`, the item
# is finally discarded and won't be put back in the queue anymore:
CubQ.ack(pid, ack_id)
#=> :ok
enqueue(pid, item)
View Sourceenqueue(GenServer.server(), item()) :: :ok | {:error, term()}
Inserts an item at the end of the queue.
The item can be any Elixir (or Erlang) term.
Example:
CubQ.enqueue(pid, :one)
#=> :ok
CubQ.enqueue(pid, :two)
#=> :ok
CubQ.dequeue(pid)
#=> {:ok, :one}
nack(pid, ack_id)
View Sourcenack(GenServer.server(), ack_id()) :: :ok | {:error, term()}
Negatively acknowledges an item taken with dequeue_ack/2
or pop_ack/2
,
causing it to be put back in the queue.
See the documentation for dequeue_ack/2
for more details.
peek_first(pid)
View Sourcepeek_first(GenServer.server()) :: {:ok, item()} | nil | {:error, term()}
Returns the item at the beginning of the queue without removing it.
It returns {:ok, item}
if there are items in the queue, nil
if the
queue is empty, and {:error, cause}
on error.
Example:
CubQ.enqueue(pid, :one)
#=> :ok
CubQ.enqueue(pid, :two)
#=> :ok
CubQ.peek_first(pid)
#=> {:ok, :one}
CubQ.dequeue(pid)
#=> {:ok, :one}
peek_last(pid)
View Sourcepeek_last(GenServer.server()) :: {:ok, item()} | nil | {:error, term()}
Returns the item at the end of the queue without removing it.
It returns {:ok, item}
if there are items in the queue, nil
if the
queue is empty, and {:error, cause}
on error.
Example:
CubQ.enqueue(pid, :one)
#=> :ok
CubQ.enqueue(pid, :two)
#=> :ok
CubQ.peek_last(pid)
#=> {:ok, :two}
CubQ.pop(pid)
#=> {:ok, :two}
pop(pid)
View Sourcepop(GenServer.server()) :: {:ok, item()} | nil | {:error, term()}
Removes an item from the end of the queue and returns it.
It returns {:ok, item}
if there are items in the queue, nil
if the
queue is empty, and {:error, cause}
on error.
Example:
CubQ.push(pid, :one)
#=> :ok
CubQ.push(pid, :two)
#=> :ok
CubQ.pop(pid)
#=> {:ok, :two}
CubQ.pop(pid)
#=> {:ok, :one}
pop_ack(pid, timeout \\ 5000)
View Sourcepop_ack(GenServer.server(), timeout()) :: {:ok, item(), ack_id()} | nil | {:error, term()}
Removes an item from the end of the queue and returns it, expecting a manual
confirmation of its successful processing with ack/2
. If ack/2
is not
called before timeout
, the item is put back at the end of the queue.
See the documentation for dequeue_ack/2
for more details: the pop_ack/2
function works in the same way as dequeue_ack/2
, but with stack semantics
instead of queue semantics.
prepend(pid, item)
View Sourceprepend(GenServer.server(), item()) :: :ok | {:error, term()}
Inserts an item at the beginning of the queue.
The item can be any Elixir (or Erlang) term.
Example:
CubQ.enqueue(pid, :one)
#=> :ok
CubQ.prepend(pid, :zero)
#=> :ok
CubQ.dequeue(pid)
#=> {:ok, :zero}
CubQ.dequeue(pid)
#=> {:ok, :one}
push(pid, item)
View Sourcepush(GenServer.server(), item()) :: :ok | {:error, term()}
Same as enqueue/2
.
Normally used together with pop/1
for stack semantics.
start(options)
View Sourcestart([option() | GenServer.option()]) :: GenServer.on_start()
Starts a CubQ
process without link.
The argument is a keyword list of options, see start_link/1
for details.
start_link(options)
View Sourcestart_link([option() | GenServer.option()]) :: GenServer.on_start()
Starts a CubQ
process linked to the calling process.
The argument is a keyword list of options:
db
(required): thepid
(or name) of theCubDB
process for storing the queuequeue
(required): the identifier of the queue. It can be any Elixir term, but typically one would use an atom, like:my_queue
GenServer
options like name
and timeout
can also be given, and are
forwarded to GenServer.start_link/3
as the third argument.