SuperWorker.Supervisor.MapQueue (SuperWorker v0.3.6)

View Source

A simple queue implementation using a map for O(1) lookups.

This queue is designed for managing messages in chain workers where:

  • Fast lookups by message ID are required
  • Queue size limits need to be enforced
  • Messages can be removed in any order (not strictly FIFO)

Examples

iex> queue = MapQueue.new(:my_queue)
iex> {:ok, queue, msg_id} = MapQueue.add(queue, "hello")
iex> {:ok, "hello"} = MapQueue.get(queue, msg_id)
iex> false = MapQueue.is_empty?(queue)
iex> 1 = MapQueue.size(queue)

Summary

Functions

Adds a message to the queue.

Returns all messages currently in the queue as a list.

Clears all messages from the queue.

Retrieves a message from the queue by ID without removing it.

Checks if a message with the given ID exists in the queue.

Checks if the queue is empty.

Checks if the queue is full.

Returns all message IDs currently in the queue.

Creates a new queue with the given ID.

Returns the remaining capacity of the queue.

Removes a message from the queue by ID.

Returns the current number of messages in the queue.

Updates the queue length limit.

Types

message()

@type message() :: any()

msg_id()

@type msg_id() :: non_neg_integer()

t()

@type t() :: %SuperWorker.Supervisor.MapQueue{
  id: any(),
  last_msg_id: msg_id(),
  last_send: non_neg_integer(),
  msgs: %{required(msg_id()) => message()},
  queue_length: pos_integer()
}

Functions

add(queue, msg)

@spec add(t(), message()) :: {:ok, t(), msg_id()} | {:error, :queue_full}

Adds a message to the queue.

Returns {:ok, updated_queue, message_id} if successful, or {:error, :queue_full} if the queue is at capacity.

Examples

iex> queue = MapQueue.new(:test)
iex> {:ok, queue, 1} = MapQueue.add(queue, "message")
iex> {:ok, queue, 2} = MapQueue.add(queue, "another")

all_messages(map_queue)

@spec all_messages(t()) :: [message()]

Returns all messages currently in the queue as a list.

Examples

iex> queue = MapQueue.new(:test)
iex> {:ok, queue, _} = MapQueue.add(queue, "msg1")
iex> {:ok, queue, _} = MapQueue.add(queue, "msg2")
iex> messages = MapQueue.all_messages(queue)
iex> "msg1" in messages and "msg2" in messages
true

clear(queue)

@spec clear(t()) :: t()

Clears all messages from the queue.

Returns the queue with an empty message map.

Examples

iex> queue = MapQueue.new(:test)
iex> {:ok, queue, _} = MapQueue.add(queue, "msg1")
iex> {:ok, queue, _} = MapQueue.add(queue, "msg2")
iex> queue = MapQueue.clear(queue)
iex> true = MapQueue.is_empty?(queue)

get(queue, id)

@spec get(t(), msg_id()) :: {:ok, message()} | {:error, {:not_found, msg_id()}}

Retrieves a message from the queue by ID without removing it.

Returns {:ok, message} if found, or {:error, {:not_found, id}} otherwise.

Examples

iex> queue = MapQueue.new(:test)
iex> {:ok, queue, msg_id} = MapQueue.add(queue, "hello")
iex> {:ok, "hello"} = MapQueue.get(queue, msg_id)
iex> {:error, {:not_found, 999}} = MapQueue.get(queue, 999)

has_message?(map_queue, id)

@spec has_message?(t(), msg_id()) :: boolean()

Checks if a message with the given ID exists in the queue.

Examples

iex> queue = MapQueue.new(:test)
iex> {:ok, queue, msg_id} = MapQueue.add(queue, "msg")
iex> true = MapQueue.has_message?(queue, msg_id)
iex> false = MapQueue.has_message?(queue, 999)

is_empty?(map_queue)

@spec is_empty?(t()) :: boolean()

Checks if the queue is empty.

Returns true if the queue contains no messages.

Examples

iex> queue = MapQueue.new(:test)
iex> true = MapQueue.is_empty?(queue)
iex> {:ok, queue, _} = MapQueue.add(queue, "msg")
iex> false = MapQueue.is_empty?(queue)

is_full?(map_queue)

@spec is_full?(t()) :: boolean()

Checks if the queue is full.

Returns true if the queue has reached its maximum capacity.

Examples

iex> queue = MapQueue.new(:test, queue_length: 2)
iex> {:ok, queue, _} = MapQueue.add(queue, "msg1")
iex> {:ok, queue, _} = MapQueue.add(queue, "msg2")
iex> true = MapQueue.is_full?(queue)

message_ids(map_queue)

@spec message_ids(t()) :: [msg_id()]

Returns all message IDs currently in the queue.

Examples

iex> queue = MapQueue.new(:test)
iex> {:ok, queue, id1} = MapQueue.add(queue, "msg1")
iex> {:ok, queue, id2} = MapQueue.add(queue, "msg2")
iex> ids = MapQueue.message_ids(queue)
iex> id1 in ids and id2 in ids
true

new(id, opts \\ [])

@spec new(
  any(),
  keyword()
) :: t()

Creates a new queue with the given ID.

Options

  • :queue_length - Maximum number of messages in the queue (default: 50)

Examples

iex> MapQueue.new(:my_queue)
%MapQueue{id: :my_queue, queue_length: 50, msgs: %{}}

iex> MapQueue.new(:my_queue, queue_length: 100)
%MapQueue{id: :my_queue, queue_length: 100, msgs: %{}}

remaining_capacity(map_queue)

@spec remaining_capacity(t()) :: non_neg_integer()

Returns the remaining capacity of the queue.

Examples

iex> queue = MapQueue.new(:test, queue_length: 10)
iex> 10 = MapQueue.remaining_capacity(queue)
iex> {:ok, queue, _} = MapQueue.add(queue, "msg")
iex> 9 = MapQueue.remaining_capacity(queue)

remove(queue, id)

@spec remove(t(), msg_id()) :: {:ok, t()}

Removes a message from the queue by ID.

Returns {:ok, updated_queue} regardless of whether the message existed.

Examples

iex> queue = MapQueue.new(:test)
iex> {:ok, queue, msg_id} = MapQueue.add(queue, "hello")
iex> {:ok, queue} = MapQueue.remove(queue, msg_id)
iex> {:error, {:not_found, _}} = MapQueue.get(queue, msg_id)

size(map_queue)

@spec size(t()) :: non_neg_integer()

Returns the current number of messages in the queue.

Examples

iex> queue = MapQueue.new(:test)
iex> 0 = MapQueue.size(queue)
iex> {:ok, queue, _} = MapQueue.add(queue, "msg")
iex> 1 = MapQueue.size(queue)

update_queue_length(queue, new_length)

@spec update_queue_length(t(), pos_integer()) :: t()

Updates the queue length limit.

Note: This does not remove existing messages if the new limit is smaller than the current size.

Examples

iex> queue = MapQueue.new(:test, queue_length: 10)
iex> queue = MapQueue.update_queue_length(queue, 20)
iex> 20 = queue.queue_length