ETS-based partitioned buffer for high-throughput data processing.
PartitionedBuffer provides the shared infrastructure (routing, partition
lookup, lifecycle management) used by concrete buffer implementations:
PartitionedBuffer.Queue- Ordered queue buffer (insertion-time ordered).PartitionedBuffer.Map- Key-value map buffer (last-write-wins semantics).
Architecture
[buffer_supervisor]
|
+-----------+------------------+
| | |
[Registry] [TaskSupervisor] [PartitionSupervisor]
|
+--------------+-------------------+
| | |
[partition.0] [partition.1] ... [partition.N-1]
| | |
[ETS table pair] [ETS table pair] [ETS table pair]Start options
:name(atom/0) - Required. The buffer name (used to identify the buffer).:processor- Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).Can be either:
- A function of arity 1:
fn batch -> ... endor&MyModule.process/1. - An MFA tuple
{Module, Function, Args}: The batch is prepended to the arguments, e.g.,{MyModule, :process, [extra_arg]}will callMyModule.process(batch, extra_arg).
- A function of arity 1:
:partitions(non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults toSystem.schedulers_online()to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.:processing_interval_ms(pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is5000.:processing_timeout_ms(timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. SeeTask.Supervisor.async_nolink/3for more information. The default value is60000.:processing_batch_size(pos_integer/0) - Maximum number of messages to include in a single batch sent to the processor callback. Messages are processed in batches of up to this size to optimize memory usage and processor performance. The default value is10.
Runtime options
The following runtime options are shared by PartitionedBuffer.Queue and
PartitionedBuffer.Map:
:partition_key(term/0) - Determines what value is used as the routing key for partitioning messages.Can be one of four values:
nil(default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.
An MFA tuple
{Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g.,
:logs,:events, or an identifier).
Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).
The default value is
nil.
Telemetry
PartitionedBuffer emits the following telemetry events.
[:partitioned_buffer, :partition, :start]- Dispatched when a partition is started.- Measurement:
%{system_time: integer} - Metadata:
%{buffer: atom, partition: atom}
- Measurement:
[:partitioned_buffer, :partition, :stop]- Dispatched when a partition terminates (gracefully or abnormally).- Measurement:
%{duration: native_time} - Metadata:
%{buffer: atom, partition: atom, reason: term}
- Measurement:
[:partitioned_buffer, :partition, :processing, :start]- Dispatched when a partition begins processing a batch of messages.- Measurement:
%{system_time: integer} - Metadata:
%{buffer: atom, partition: atom}
- Measurement:
[:partitioned_buffer, :partition, :processing, :stop]- Dispatched when a partition completes processing a batch of messages.- Measurement:
%{duration: native_time, size: non_neg_integer} - Metadata:
%{buffer: atom, partition: atom}
- Measurement:
[:partitioned_buffer, :partition, :processing, :exception]- Dispatched when an exception occurs during the processing.- Measurement:
%{duration: native_time} - Metadata:
%{ buffer: atom, partition: atom, kind: atom, reason: term, stacktrace: list }- Measurement:
[:partitioned_buffer, :partition, :processing_failed]- Dispatched when a processing task encounters an error and fails.- Measurement:
%{system_time: integer} - Metadata:
%{buffer: atom, partition: atom, reason: any}
- Measurement:
Summary
Callbacks
Returns the ETS table type used by this buffer implementation.
Functions
Returns the buffer size (total number of messages across all partitions).
Returns the partition based on the given arguments.
Starts a new buffer.
Stops a buffer gracefully.
Updates the options for the buffer.
Types
@type buffer() :: atom()
Buffer name
Callbacks
Functions
@spec buffer_size(buffer()) :: non_neg_integer()
Returns the buffer size (total number of messages across all partitions).
Examples
iex> PartitionedBuffer.buffer_size(:my_buffer)
10
Returns the partition based on the given arguments.
@spec start_link(keyword()) :: Supervisor.on_start()
Starts a new buffer.
Prefer implementation-specific functions
It is recommended to use PartitionedBuffer.Queue.start_link/1 or
PartitionedBuffer.Map.start_link/1 instead, as they automatically
set the :module option for you.
Examples
iex> PartitionedBuffer.start_link(
...> module: PartitionedBuffer.Queue,
...> name: :my_buffer
...> )
{:ok, #PID<0.123.0>}Notice that the
:moduleoption must be set toPartitionedBuffer.QueueorPartitionedBuffer.Map.
Stops a buffer gracefully.
Examples
iex> PartitionedBuffer.stop(:my_buffer)
:ok
Updates the options for the buffer.
Options
:processing_interval_ms(pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is5000.:processing_timeout_ms(timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. SeeTask.Supervisor.async_nolink/3for more information. The default value is60000.:processing_batch_size(pos_integer/0) - Maximum number of messages to include in a single batch sent to the processor callback. Messages are processed in batches of up to this size to optimize memory usage and processor performance. The default value is10.
Examples
iex> PartitionedBuffer.update_options(:my_buffer, processing_interval_ms: 1000)
:okNotice that the options are updated for all partitions of the buffer.