BlockingQueue

BlockingQueue is a simple queue implemented as a GenServer. It has a fixed maximum length.

The queue is designed to decouple but limit the latency of a producer and consumer. When pushing to a full queue the push operation blocks preventing the producer from making progress until the consumer catches up. Likewise, when calling pop on an empty queue the call blocks until there is work to do.

Protocols

The BlockingQueue module implements the Collectable protocol.

Examples

{:ok, pid} = BlockingQueue.start_link(5)
BlockingQueue.push(pid, "Hi")
BlockingQueue.pop(pid) # should return "Hi"

{:ok, pid} = BlockingQueue.start_link(:infinity)
BlockingQueue.push(pid, "Hi")
BlockingQueue.pop(pid) # should return "Hi"

Summary

Types

t()

The %BlockingQueue struct is used with the Collectable protocol

Functions

Pops the least recently pushed item from the queue. Blocks if the queue is empty until an item is available

Returns a Stream where each element comes from the BlockingQueue

Pushes a new item into the queue. Blocks if the queue is full

Pushes all items in a stream into the blocking queue. Blocks as necessary

Start a queue process with GenServer.start_link/3

Types

maximum_t :: pos_integer | :infinity
on_start ::
  {:ok, pid} |
  :ignore |
  {:error, {:already_started, pid} | term}
t :: %BlockingQueue{pid: pid}

The %BlockingQueue struct is used with the Collectable protocol.

Examples

input = ["Hello", "World"]
{:ok, pid} = BlockingQueue.start_link(5)
Enum.into(input, %BlockingQueue{pid: pid})
BlockingQueue.pop_stream(pid) |> Enum.take(2)  # should return input

Functions

pop(pid, timeout \\ 5000)

Specs

pop(pid, integer) :: any

Pops the least recently pushed item from the queue. Blocks if the queue is empty until an item is available.

pid is the process ID of the BlockingQueue server. timeout (optional) is the timeout value passed to GenServer.call (does not impact how long pop will wait for a message from the queue)

pop_stream(pid)

Specs

pop_stream(pid) :: Enumerable.t

Returns a Stream where each element comes from the BlockingQueue.

pid is the process ID of the BlockingQueue server.

push(pid, item, timeout \\ 5000)

Specs

push(pid, any, integer) :: nil

Pushes a new item into the queue. Blocks if the queue is full.

pid is the process ID of the BlockingQueue server. item is the value to be pushed into the queue. This can be anything. timeout (optional) is the timeout value passed to GenServer.call (does not impact how long pop will wait for a message from the queue)

push_stream(stream, pid)

Specs

push_stream(Enumerable.t, pid) :: nil

Pushes all items in a stream into the blocking queue. Blocks as necessary.

stream is the the stream of values to push into the queue. pid is the process ID of the BlockingQueue server.

start_link(n, options \\ [])

Specs

start_link(maximum_t, [any]) :: on_start

Start a queue process with GenServer.start_link/3.

n Is the maximum queue depth. Pass the atom :infinity to start a queue with no maximum. An infinite queue will never block in push/2 but may block in pop/1

options Are options as described for GenServer.start_link/3 and are optional.