PartitionedBuffer behaviour (PartitionedBuffer v0.3.0)

Copy Markdown View Source

ETS-based partitioned buffer for high-throughput data processing.

PartitionedBuffer provides the shared infrastructure (routing, partition lookup, lifecycle management) used by concrete buffer implementations:

Architecture

                  [buffer_supervisor]
                          |
              +-----------+------------------+
              |           |                  |
          [Registry] [TaskSupervisor] [PartitionSupervisor]
                                             |
                              +--------------+-------------------+
                              |              |                   |
                    [partition.0]    [partition.1]   ...   [partition.N-1]
                          |                |                     |
                    [ETS table pair] [ETS table pair]      [ETS table pair]

Start options

  • :name (atom/0) - Required. The buffer name (used to identify the buffer).

  • :processor - Required. A callback that processes batches of messages. Called with a list of accumulated messages and should handle the processing logic (e.g., send to external service, persist to database, etc.).

    Can be either:

    • A function of arity 1: fn batch -> ... end or &MyModule.process/1.
    • An MFA tuple {Module, Function, Args}: The batch is prepended to the arguments, e.g., {MyModule, :process, [extra_arg]} will call MyModule.process(batch, extra_arg).
  • :partitions (non_neg_integer/0) - Number of partitions to create. Each partition has its own buffer and processing cycle. Defaults to System.schedulers_online() to match the number of available schedulers. More partitions reduce lock contention but increase per-partition overhead.

  • :processing_interval_ms (pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is 5000.

  • :processing_timeout_ms (timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. See Task.Supervisor.async_nolink/3 for more information. The default value is 60000.

  • :processing_batch_size (pos_integer/0) - Maximum number of messages to include in a single batch sent to the processor callback. Messages are processed in batches of up to this size to optimize memory usage and processor performance. The default value is 10.

Runtime options

The following runtime options are shared by PartitionedBuffer.Queue and PartitionedBuffer.Map:

  • :partition_key (term/0) - Determines what value is used as the routing key for partitioning messages.

    Can be one of four values:

    • nil (default): The message itself is used as the routing key. Messages with the same content are routed to the same partition.

    • A function of arity 1: Applied to each message to return the routing key. Allows grouping related messages together (e.g., by user ID or account ID) to keep them in the same partition.

    • An MFA tuple {Module, Function, Args}: The function is applied with the message prepended to the arguments. Useful for delegating routing logic to a module function while keeping configuration declarative.

    • Any static term: Used as the routing key for all messages, giving explicit control over which partition receives them (e.g., :logs, :events, or an identifier).

    Fundamentally, this option determines how messages are distributed across partitions. Use it to keep related messages together (for ordering or state locality) or spread unrelated messages across partitions (for parallelism).

    The default value is nil.

Telemetry

PartitionedBuffer emits the following telemetry events.

  • [:partitioned_buffer, :partition, :start] - Dispatched when a partition is started.

    • Measurement: %{system_time: integer}
    • Metadata: %{buffer: atom, partition: atom}
  • [:partitioned_buffer, :partition, :stop] - Dispatched when a partition terminates (gracefully or abnormally).

    • Measurement: %{duration: native_time}
    • Metadata: %{buffer: atom, partition: atom, reason: term}
  • [:partitioned_buffer, :partition, :processing, :start] - Dispatched when a partition begins processing a batch of messages.

    • Measurement: %{system_time: integer}
    • Metadata: %{buffer: atom, partition: atom}
  • [:partitioned_buffer, :partition, :processing, :stop] - Dispatched when a partition completes processing a batch of messages.

    • Measurement: %{duration: native_time, size: non_neg_integer}
    • Metadata: %{buffer: atom, partition: atom}
  • [:partitioned_buffer, :partition, :processing, :exception] - Dispatched when an exception occurs during the processing.

    • Measurement: %{duration: native_time}
    • Metadata:
    %{
      buffer: atom,
      partition: atom,
      kind: atom,
      reason: term,
      stacktrace: list
    }
  • [:partitioned_buffer, :partition, :processing_failed] - Dispatched when a processing task encounters an error and fails.

    • Measurement: %{system_time: integer}
    • Metadata: %{buffer: atom, partition: atom, reason: any}

Summary

Types

Buffer name

Callbacks

Returns the ETS table type used by this buffer implementation.

Functions

Returns the buffer size (total number of messages across all partitions).

Returns the partition based on the given arguments.

Starts a new buffer.

Stops a buffer gracefully.

Updates the options for the buffer.

Types

buffer()

@type buffer() :: atom()

Buffer name

Callbacks

ets_type()

@callback ets_type() :: :ordered_set | :set | :bag | :duplicate_bag

Returns the ETS table type used by this buffer implementation.

Functions

buffer_size(buffer)

@spec buffer_size(buffer()) :: non_neg_integer()

Returns the buffer size (total number of messages across all partitions).

Examples

iex> PartitionedBuffer.buffer_size(:my_buffer)
10

get_partition(buffer, partition_key, object)

@spec get_partition(buffer(), any(), any()) :: atom()

Returns the partition based on the given arguments.

start_link(opts)

@spec start_link(keyword()) :: Supervisor.on_start()

Starts a new buffer.

Prefer implementation-specific functions

It is recommended to use PartitionedBuffer.Queue.start_link/1 or PartitionedBuffer.Map.start_link/1 instead, as they automatically set the :module option for you.

Examples

iex> PartitionedBuffer.start_link(
...>   module: PartitionedBuffer.Queue,
...>   name: :my_buffer
...> )
{:ok, #PID<0.123.0>}

Notice that the :module option must be set to PartitionedBuffer.Queue or PartitionedBuffer.Map.

stop(buffer, reason, timeout)

@spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok

Stops a buffer gracefully.

Examples

iex> PartitionedBuffer.stop(:my_buffer)
:ok

update_options(buffer, opts)

@spec update_options(
  buffer(),
  keyword()
) :: :ok

Updates the options for the buffer.

Options

  • :processing_interval_ms (pos_integer/0) - How often (in milliseconds) each partition checks its buffer and initiates processing. Messages are processed at this interval if any are buffered. Lower values mean faster processing but more frequent task spawning. The default value is 5000.

  • :processing_timeout_ms (timeout/0) - Maximum time (in milliseconds) for a processing task to complete before being forcefully terminated. Used during graceful shutdown to wait for in-flight processing to complete. See Task.Supervisor.async_nolink/3 for more information. The default value is 60000.

  • :processing_batch_size (pos_integer/0) - Maximum number of messages to include in a single batch sent to the processor callback. Messages are processed in batches of up to this size to optimize memory usage and processor performance. The default value is 10.

Examples

iex> PartitionedBuffer.update_options(:my_buffer, processing_interval_ms: 1000)
:ok

Notice that the options are updated for all partitions of the buffer.