ETS-based partitioned queue buffer for high-throughput data processing.
PartitionedBuffer.Queue buffers arbitrary data in insertion order and
periodically processes it using a configurable processor function.
It implements partitioning to reduce lock contention during high-throughput
writes, and uses double-buffering to ensure zero-downtime processing.
Examples
Standalone Usage
# Start a queue buffer with a custom processor
iex> {:ok, _sup_pid} =
...> PartitionedBuffer.Queue.start_link(
...> name: :my_buffer,
...> processor: fn batch -> IO.inspect(batch) end
...> )
# Push a single item into the buffer
iex> PartitionedBuffer.Queue.push(:my_buffer, "item1")
:ok
# Push a batch of items
iex> PartitionedBuffer.Queue.push(:my_buffer, ["item2", "item3"])
:ok
# Check buffer size
iex> PartitionedBuffer.Queue.size(:my_buffer)
3
# Stop the buffer gracefully (processes remaining items)
iex> PartitionedBuffer.Queue.stop(:my_buffer)
:okAdding to a Supervision Tree
children = [
{PartitionedBuffer.Queue,
name: :my_buffer,
processor: &MyApp.EventProcessor.process_batch/1}
]
Supervisor.start_link(children, strategy: :one_for_one)Processor
The processor function receives a list of values (the items pushed to the buffer):
fn batch ->
# batch is [value1, value2, ...]
Enum.each(batch, fn value -> process(value) end)
endOptions
See PartitionedBuffer for
start and
runtime options.
Summary
Functions
Returns the queue buffer child spec.
Pushes an item or a batch of items into the buffer.
Returns the queue size (total number of items across all partitions).
Starts a new queue buffer.
Stops a queue buffer gracefully.
Updates the options for the queue buffer.
Types
@type buffer() :: PartitionedBuffer.buffer()
Proxy type for a buffer
@type item() :: any()
Any term that will be buffered and processed
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns the queue buffer child spec.
Pushes an item or a batch of items into the buffer.
Parameters
buffer- The buffer name (atom).item_or_batch- A single item or a list of items to push.opts- Optional runtime options.
Options
See PartitionedBuffer for
runtime options.
Examples
# Simple push with default routing
push(:my_buffer, "item1")
push(:my_buffer, ["item2", "item3"])
# Custom partition routing using function
push(:my_buffer, user_event, partition_key: &(&1.user_id))
# Custom partition routing using MFA tuple (item prepended to args)
push(:my_buffer, event, partition_key: {MyApp.Router, :get_partition, []})
# Custom partition routing with fixed key (all items to same partition)
push(:my_buffer, log_entry, partition_key: :logs)
@spec size(buffer()) :: non_neg_integer()
Returns the queue size (total number of items across all partitions).
Examples
size(:my_buffer)
@spec start_link(keyword()) :: Supervisor.on_start()
Starts a new queue buffer.
Options
See PartitionedBuffer for
start options.
Examples
PartitionedBuffer.Queue.start_link(name: :my_queue_buffer)
Stops a queue buffer gracefully.
Examples
PartitionedBuffer.Queue.stop(:my_queue_buffer)
Updates the options for the queue buffer.
Options
See PartitionedBuffer.update_options/2 for the updatable options.
Examples
# Update the processing interval to 100ms
update_options(:my_buffer, processing_interval_ms: 100)