ETS-based partitioned buffer for high-throughput data processing.
PartitionedBuffer provides the shared infrastructure (routing, partition
lookup, lifecycle management) used by concrete buffer implementations:
PartitionedBuffer.Queue- Ordered queue buffer (insertion-time ordered).PartitionedBuffer.Map- Key-value map buffer (last-write-wins semantics).
Architecture
[buffer_supervisor]
|
+-----------+------------------+
| | |
[Registry] [TaskSupervisor] [PartitionSupervisor]
|
+--------------+-------------------+
| | |
[partition.0] [partition.1] ... [partition.N-1]
| | |
[ETS table pair] [ETS table pair] [ETS table pair]Start options
:name(atom/0) - Required. The buffer name (used to identify the buffer).:processor- Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).Can be either:
- A function of arity 1:
fn batch -> ... endor&MyModule.process/1. - An MFA tuple
{Module, Function, Args}: The batch is prepended to the arguments, e.g.,{MyModule, :process, [extra_arg]}will callMyModule.process(batch, extra_arg).
- A function of arity 1:
:partitions(non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults toSystem.schedulers_online()to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.:processing_interval_ms(pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is5000.:processing_timeout_ms(timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. SeeTask.Supervisor.async_nolink/3for more information. The default value is60000.:processing_batch_size- Controls how buffered data is passed to the processor callback.Can be either:
A positive integer (default
10): Messages are read from the ETS table in batches of up to this size usingets:selectwith continuations. The processor is called once per batch. This optimizes memory usage for large tables.:table: The ETS table name (an atom) is passed directly to the processor instead of reading and batching the data. The processor has full control over how it reads and processes the table.After the processor returns, the buffer deletes the table and reclaims the name. If you want to keep the table for later processing, call
:ets.rename(table, new_name)before returning to free the original name for the buffer to reuse. You can then optionally call:ets.give_away/3to transfer the renamed table to another process.
The default value is
10.
Runtime options
The following runtime options are shared by PartitionedBuffer.Queue and
PartitionedBuffer.Map:
:partition_key(term/0) - Determines what value is used as the routing key for partitioning messages.Can be one of four values:
nil(default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.
An MFA tuple
{Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g.,
:logs,:events, or an identifier).
Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).
The default value is
nil.
Telemetry
PartitionedBuffer emits the following telemetry events.
[:partitioned_buffer, :partition, :start]- Dispatched when a partition is started.- Measurement:
%{system_time: integer} - Metadata:
%{buffer: atom, partition: atom}
- Measurement:
[:partitioned_buffer, :partition, :stop]- Dispatched when a partition terminates (gracefully or abnormally).- Measurement:
%{duration: native_time} - Metadata:
%{buffer: atom, partition: atom, reason: term}
- Measurement:
[:partitioned_buffer, :partition, :processing, :start]- Dispatched when a partition begins processing a batch of messages.- Measurement:
%{system_time: integer} - Metadata:
%{buffer: atom, partition: atom}
- Measurement:
[:partitioned_buffer, :partition, :processing, :stop]- Dispatched when a partition completes processing a batch of messages.- Measurement:
%{duration: native_time, size: non_neg_integer} - Metadata:
%{buffer: atom, partition: atom}
- Measurement:
[:partitioned_buffer, :partition, :processing, :exception]- Dispatched when an exception occurs during processing.- Measurement:
%{duration: native_time} - Metadata:
%{ buffer: atom, partition: atom, kind: atom, reason: term, stacktrace: list }- Measurement:
[:partitioned_buffer, :partition, :processing_failed]- Dispatched when a processing task encounters an error and fails.- Measurement:
%{system_time: integer} - Metadata:
%{buffer: atom, partition: atom, reason: any}
- Measurement:
Summary
Callbacks
Returns the ETS table type used by this buffer implementation.
Functions
Returns the buffer size (total number of messages across all partitions).
Returns the partition based on the given arguments.
Starts a new buffer.
Stops a buffer gracefully.
Updates the options for the buffer.
Types
@type buffer() :: atom()
Buffer name
Callbacks
Functions
@spec buffer_size(buffer()) :: non_neg_integer()
Returns the buffer size (total number of messages across all partitions).
Examples
iex> PartitionedBuffer.buffer_size(:my_buffer)
10
Returns the partition based on the given arguments.
@spec start_link(keyword()) :: Supervisor.on_start()
Starts a new buffer.
Prefer implementation-specific functions
It is recommended to use PartitionedBuffer.Queue.start_link/1 or
PartitionedBuffer.Map.start_link/1 instead, as they automatically
set the :module option for you.
Examples
iex> PartitionedBuffer.start_link(
...> module: PartitionedBuffer.Queue,
...> name: :my_buffer
...> )
{:ok, #PID<0.123.0>}Notice that the
:moduleoption must be set toPartitionedBuffer.QueueorPartitionedBuffer.Map.
Stops a buffer gracefully.
Examples
iex> PartitionedBuffer.stop(:my_buffer)
:ok
Updates the options for the buffer.
Options
:processing_interval_ms(pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is5000.:processing_timeout_ms(timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. SeeTask.Supervisor.async_nolink/3for more information. The default value is60000.:processing_batch_size- Controls how buffered data is passed to the processor callback.Can be either:
A positive integer (default
10): Messages are read from the ETS table in batches of up to this size usingets:selectwith continuations. The processor is called once per batch. This optimizes memory usage for large tables.:table: The ETS table name (an atom) is passed directly to the processor instead of reading and batching the data. The processor has full control over how it reads and processes the table.After the processor returns, the buffer deletes the table and reclaims the name. If you want to keep the table for later processing, call
:ets.rename(table, new_name)before returning to free the original name for the buffer to reuse. You can then optionally call:ets.give_away/3to transfer the renamed table to another process.
The default value is
10.
Examples
iex> PartitionedBuffer.update_options(:my_buffer, processing_interval_ms: 1000)
:okNotice that the options are updated for all partitions of the buffer.