PartitionedBuffer.Queue (PartitionedBuffer v0.3.0)

Copy Markdown View Source

ETS-based partitioned queue buffer for high-throughput data processing.

PartitionedBuffer.Queue buffers arbitrary data in insertion order and periodically processes it using a configurable processor function. It implements partitioning to reduce lock contention during high-throughput writes, and uses double-buffering to ensure zero-downtime processing.

Examples

Standalone Usage

# Start a queue buffer with a custom processor
iex> {:ok, _sup_pid} =
...>   PartitionedBuffer.Queue.start_link(
...>     name: :my_buffer,
...>     processor: fn batch -> IO.inspect(batch) end
...>   )

# Push a single item into the buffer
iex> PartitionedBuffer.Queue.push(:my_buffer, "item1")
:ok

# Push a batch of items
iex> PartitionedBuffer.Queue.push(:my_buffer, ["item2", "item3"])
:ok

# Check buffer size
iex> PartitionedBuffer.Queue.size(:my_buffer)
3

# Stop the buffer gracefully (processes remaining items)
iex> PartitionedBuffer.Queue.stop(:my_buffer)
:ok

Adding to a Supervision Tree

children = [
  {PartitionedBuffer.Queue,
   name: :my_buffer,
   processor: &MyApp.EventProcessor.process_batch/1}
]

Supervisor.start_link(children, strategy: :one_for_one)

Processor

The processor function receives a list of values (the items pushed to the buffer):

fn batch ->
  # batch is [value1, value2, ...]
  Enum.each(batch, fn value -> process(value) end)
end

Options

See PartitionedBuffer for start and runtime options.

Summary

Types

Proxy type for a buffer

Any term that will be buffered and processed

Functions

Returns the queue buffer child spec.

Pushes an item or a batch of items into the buffer.

Returns the queue size (total number of items across all partitions).

Starts a new queue buffer.

Updates the options for the queue buffer.

Types

buffer()

@type buffer() :: PartitionedBuffer.buffer()

Proxy type for a buffer

item()

@type item() :: any()

Any term that will be buffered and processed

Functions

child_spec(opts)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns the queue buffer child spec.

push(buffer, item_or_batch, opts \\ [])

@spec push(buffer(), item() | [item()], keyword()) :: :ok

Pushes an item or a batch of items into the buffer.

Parameters

  • buffer - The buffer name (atom).
  • item_or_batch - A single item or a list of items to push.
  • opts - Optional runtime options.

Options

See PartitionedBuffer for runtime options.

Examples

# Simple push with default routing
push(:my_buffer, "item1")
push(:my_buffer, ["item2", "item3"])

# Custom partition routing using function
push(:my_buffer, user_event, partition_key: &(&1.user_id))

# Custom partition routing using MFA tuple (item prepended to args)
push(:my_buffer, event, partition_key: {MyApp.Router, :get_partition, []})

# Custom partition routing with fixed key (all items to same partition)
push(:my_buffer, log_entry, partition_key: :logs)

size(buffer)

@spec size(buffer()) :: non_neg_integer()

Returns the queue size (total number of items across all partitions).

Examples

size(:my_buffer)

start_link(opts \\ [])

@spec start_link(keyword()) :: Supervisor.on_start()

Starts a new queue buffer.

Options

See PartitionedBuffer for start options.

Examples

PartitionedBuffer.Queue.start_link(name: :my_queue_buffer)

stop(buffer, reason \\ :normal, timeout \\ :infinity)

@spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok

Stops a queue buffer gracefully.

Examples

PartitionedBuffer.Queue.stop(:my_queue_buffer)

update_options(buffer, opts)

@spec update_options(
  buffer(),
  keyword()
) :: :ok

Updates the options for the queue buffer.

Options

See PartitionedBuffer.update_options/2 for the updatable options.

Examples

# Update the processing interval to 100ms
update_options(:my_buffer, processing_interval_ms: 100)