SuperCache.Internal.Queue (SuperCache v1.3.0)

Copy Markdown View Source

Internal concurrent queue used by SuperCache buffer streams.

This module implements a lightweight message-passing queue that supports:

  • Multiple producers adding items concurrently.
  • Multiple consumers reading batches of items.
  • Graceful shutdown that notifies waiting readers.

Design

The queue runs as a registered process. It maintains two lists:

  • readers: PIDs waiting for data.
  • data: Buffered items waiting to be consumed.

When data arrives and readers are waiting, the entire buffer is sent to the first reader. When readers arrive and data is available, it is delivered immediately.

Timeouts & Retries

To prevent infinite hangs, get/2 accepts a timeout and a maximum number of retries. If the queue does not respond within the timeout, it retries up to :max_retries times before returning {:error, :timeout}.

Warning

This is an internal module. Do not use it directly in application code. Use SuperCache.Buffer or SuperCache.lazy_put/1 instead.

Summary

Functions

Adds data to the queue.

Starts a new queue process registered under name.

Stops the queue process gracefully.

Functions

add(pid, data)

@spec add(atom() | pid(), any()) :: :ok

Adds data to the queue.

Returns :ok immediately. If the queue process is not alive, logs a warning and returns {:error, :process_down}.

get(pid)

start(name)

@spec start(atom()) :: pid()

Starts a new queue process registered under name.

Returns the PID of the started process. Raises if name is already taken.

stop(pid)

@spec stop(atom() | pid()) :: :ok

Stops the queue process gracefully.

Waiting readers will receive :stop and return []. New readers will also receive :stop. Returns :ok immediately.