# `PartitionedBuffer`
[🔗](https://github.com/appcues/partitioned_buffer/blob/v0.4.2/lib/partitioned_buffer.ex#L1)

ETS-based partitioned buffer for high-throughput data processing.

`PartitionedBuffer` provides the shared infrastructure (routing, partition
lookup, lifecycle management) used by concrete buffer implementations:

  * `PartitionedBuffer.Queue` - Ordered queue buffer (insertion-time ordered).
  * `PartitionedBuffer.Map` - Key-value map buffer (last-write-wins semantics).

## Architecture

```asciidoc
                  [buffer_supervisor]
                          |
              +-----------+------------------+
              |           |                  |
          [Registry] [TaskSupervisor] [PartitionSupervisor]
                                             |
                              +--------------+-------------------+
                              |              |                   |
                    [partition.0]    [partition.1]   ...   [partition.N-1]
                          |                |                     |
                    [ETS table pair] [ETS table pair]      [ETS table pair]
```

## Start options

* `:name` (`t:atom/0`) - Required. The buffer name (used to identify the buffer).

* `:processor` - Required. A callback that processes batches of messages. Called with a list
  of accumulated messages and should handle the processing logic (e.g., send
  to external service, persist to database, etc.).

  Can be either:

    * A function of arity 1: `fn batch -> ... end` or `&MyModule.process/1`.
    * An MFA tuple `{Module, Function, Args}`: The batch is prepended to
      the arguments, e.g., `{MyModule, :process, [extra_arg]}` will call
      `MyModule.process(batch, extra_arg)`.

* `:partitions` (`t:non_neg_integer/0`) - Number of partitions to create. Each partition has its own buffer and
  processing cycle. Defaults to `System.schedulers_online()` to match the
  number of available schedulers. More partitions reduce lock contention
  but increase per-partition overhead.

* `:processing_interval_ms` (`t:pos_integer/0`) - How often (in milliseconds) each partition checks its buffer and initiates
  processing. Messages are processed at this interval if any are buffered.
  Lower values mean faster processing but more frequent task spawning. The default value is `5000`.

* `:processing_timeout_ms` (`t:timeout/0`) - Maximum time (in milliseconds) for a processing task to complete before
  being forcefully terminated. Used during graceful shutdown to wait for
  in-flight processing to complete. See `Task.Supervisor.async_nolink/3`
  for more information. The default value is `60000`.

* `:processing_batch_size` - Controls how buffered data is passed to the processor callback.

  Can be either:

    * A positive integer (default `10`): Messages are read from the ETS
      table in batches of up to this size using `ets:select` with
      continuations. The processor is called once per batch. This
      optimizes memory usage for large tables.

    * `:table`: The ETS table name (an atom) is passed directly to the
      processor instead of reading and batching the data. The processor
      has full control over how it reads and processes the table.

      After the processor returns, the buffer deletes the table and
      reclaims the name. If you want to **keep the table** for later
      processing, call `:ets.rename(table, new_name)` before returning
      to free the original name for the buffer to reuse. You can then
      optionally call `:ets.give_away/3` to transfer the renamed table
      to another process.

  The default value is `10`.

## Runtime options

The following runtime options are shared by `PartitionedBuffer.Queue` and
`PartitionedBuffer.Map`:

* `:partition_key` (`t:term/0`) - Determines what value is used as the routing key for partitioning
  messages.

  Can be one of four values:

    * `nil` (default): The message itself is used as the routing key.
      Messages with the same content are routed to the same partition.

    * A function of arity 1: Applied to each message to return the routing
      key. Allows grouping related messages together (e.g., by user ID or
      account ID) to keep them in the same partition.

    * An MFA tuple `{Module, Function, Args}`: The function is applied with
      the message prepended to the arguments. Useful for delegating routing
      logic to a module function while keeping configuration declarative.

    * Any static term: Used as the routing key for all messages, giving
      explicit control over which partition receives them (e.g., `:logs`,
      `:events`, or an identifier).

  Fundamentally, this option determines how messages are distributed across
  partitions. Use it to keep related messages together (for ordering or
  state locality) or spread unrelated messages across partitions
  (for parallelism).

  The default value is `nil`.

## Telemetry

`PartitionedBuffer` emits the following telemetry events.

  * `[:partitioned_buffer, :partition, :start]` - Dispatched when a partition
    is started.

    * Measurement: `%{system_time: integer}`
    * Metadata: `%{buffer: atom, partition: atom}`

  * `[:partitioned_buffer, :partition, :stop]` - Dispatched when a partition
    terminates (gracefully or abnormally).

    * Measurement: `%{duration: native_time}`
    * Metadata: `%{buffer: atom, partition: atom, reason: term}`

  * `[:partitioned_buffer, :partition, :processing, :start]` - Dispatched
    when a partition begins processing a batch of messages.

    * Measurement: `%{system_time: integer}`
    * Metadata: `%{buffer: atom, partition: atom}`

  * `[:partitioned_buffer, :partition, :processing, :stop]` - Dispatched
    when a partition completes processing a batch of messages.

    * Measurement: `%{duration: native_time, size: non_neg_integer}`
    * Metadata: `%{buffer: atom, partition: atom}`

  * `[:partitioned_buffer, :partition, :processing, :exception]` - Dispatched
    when an exception occurs during processing.

    * Measurement: `%{duration: native_time}`
    * Metadata:

    ```
    %{
      buffer: atom,
      partition: atom,
      kind: atom,
      reason: term,
      stacktrace: list
    }
    ```

  * `[:partitioned_buffer, :partition, :processing_failed]` - Dispatched
    when a processing task encounters an error and fails.

    * Measurement: `%{system_time: integer}`
    * Metadata: `%{buffer: atom, partition: atom, reason: any}`

# `buffer`

```elixir
@type buffer() :: atom()
```

Buffer name

# `ets_type`

```elixir
@callback ets_type() :: :ordered_set | :set | :bag | :duplicate_bag
```

Returns the ETS table type used by this buffer implementation.

# `buffer_size`

```elixir
@spec buffer_size(buffer()) :: non_neg_integer()
```

Returns the buffer size (total number of messages across all partitions).

## Examples

    iex> PartitionedBuffer.buffer_size(:my_buffer)
    10

# `get_partition`

```elixir
@spec get_partition(buffer(), any(), any()) :: atom()
```

Returns the partition based on the given arguments.

# `start_link`

```elixir
@spec start_link(keyword()) :: Supervisor.on_start()
```

Starts a new buffer.

> #### Prefer implementation-specific functions {: .tip}
>
> It is recommended to use `PartitionedBuffer.Queue.start_link/1` or
> `PartitionedBuffer.Map.start_link/1` instead, as they automatically
> set the `:module` option for you.

## Examples

    iex> PartitionedBuffer.start_link(
    ...>   module: PartitionedBuffer.Queue,
    ...>   name: :my_buffer
    ...> )
    {:ok, #PID<0.123.0>}

> Notice that the `:module` option must be set to `PartitionedBuffer.Queue` or
> `PartitionedBuffer.Map`.

# `stop`

```elixir
@spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok
```

Stops a buffer gracefully.

## Examples

    iex> PartitionedBuffer.stop(:my_buffer)
    :ok

# `update_options`

```elixir
@spec update_options(
  buffer(),
  keyword()
) :: :ok
```

Updates the options for the buffer.

## Options

* `:processing_interval_ms` (`t:pos_integer/0`) - How often (in milliseconds) each partition checks its buffer and initiates
  processing. Messages are processed at this interval if any are buffered.
  Lower values mean faster processing but more frequent task spawning. The default value is `5000`.

* `:processing_timeout_ms` (`t:timeout/0`) - Maximum time (in milliseconds) for a processing task to complete before
  being forcefully terminated. Used during graceful shutdown to wait for
  in-flight processing to complete. See `Task.Supervisor.async_nolink/3`
  for more information. The default value is `60000`.

* `:processing_batch_size` - Controls how buffered data is passed to the processor callback.

  Can be either:

    * A positive integer (default `10`): Messages are read from the ETS
      table in batches of up to this size using `ets:select` with
      continuations. The processor is called once per batch. This
      optimizes memory usage for large tables.

    * `:table`: The ETS table name (an atom) is passed directly to the
      processor instead of reading and batching the data. The processor
      has full control over how it reads and processes the table.

      After the processor returns, the buffer deletes the table and
      reclaims the name. If you want to **keep the table** for later
      processing, call `:ets.rename(table, new_name)` before returning
      to free the original name for the buffer to reuse. You can then
      optionally call `:ets.give_away/3` to transfer the renamed table
      to another process.

  The default value is `10`.

## Examples

    iex> PartitionedBuffer.update_options(:my_buffer, processing_interval_ms: 1000)
    :ok

> Notice that the options are updated for all partitions of the buffer.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
