BlockingQueue
BlockingQueue is a simple queue implemented as a GenServer. It has a fixed maximum length.
The queue is designed to decouple but limit the latency of a producer and
consumer. When pushing to a full queue the push
operation blocks
preventing the producer from making progress until the consumer catches up.
Likewise, when calling pop
on an empty queue the call blocks until there
is work to do.
Protocols
The BlockingQueue module implements the Collectable
protocol.
Examples
{:ok, pid} = BlockingQueue.start_link(5)
BlockingQueue.push(pid, "Hi")
BlockingQueue.pop(pid) # should return "Hi"
{:ok, pid} = BlockingQueue.start_link(:infinity)
BlockingQueue.push(pid, "Hi")
BlockingQueue.pop(pid) # should return "Hi"
Summary
Functions
Pops the least recently pushed item from the queue. Blocks if the queue is empty until an item is available
Returns a Stream where each element comes from the BlockingQueue
Pushes a new item into the queue. Blocks if the queue is full
Pushes all items in a stream into the blocking queue. Blocks as necessary
Start a queue process with GenServer.start_link/3
Types
maximum_t :: pos_integer | :infinity
on_start ::
{:ok, pid} |
:ignore |
{:error, {:already_started, pid} | term}
t :: %BlockingQueue{pid: pid}
The %BlockingQueue
struct is used with the Collectable
protocol.
Examples
input = ["Hello", "World"]
{:ok, pid} = BlockingQueue.start_link(5)
Enum.into(input, %BlockingQueue{pid: pid})
BlockingQueue.pop_stream(pid) |> Enum.take(2) # should return input
Functions
Specs
pop(pid, integer) :: any
Pops the least recently pushed item from the queue. Blocks if the queue is empty until an item is available.
pid
is the process ID of the BlockingQueue server.
timeout
(optional) is the timeout value passed to GenServer.call (does not impact how long pop will wait for a message from the queue)
Specs
pop_stream(pid) :: Enumerable.t
Returns a Stream where each element comes from the BlockingQueue.
pid
is the process ID of the BlockingQueue server.
Specs
push(pid, any, integer) :: nil
Pushes a new item into the queue. Blocks if the queue is full.
pid
is the process ID of the BlockingQueue server.
item
is the value to be pushed into the queue. This can be anything.
timeout
(optional) is the timeout value passed to GenServer.call (does not impact how long pop will wait for a message from the queue)
Specs
push_stream(Enumerable.t, pid) :: nil
Pushes all items in a stream into the blocking queue. Blocks as necessary.
stream
is the the stream of values to push into the queue.
pid
is the process ID of the BlockingQueue server.
Start a queue process with GenServer.start_link/3.
n
Is the maximum queue depth. Pass the atom :infinity
to start a queue
with no maximum. An infinite queue will never block in push/2
but may
block in pop/1
options
Are options as described for GenServer.start_link/3
and are optional.