# `PartitionedBuffer.Queue`
[🔗](https://github.com/appcues/partitioned_buffer/blob/v0.4.2/lib/partitioned_buffer/queue.ex#L1)

ETS-based partitioned queue buffer for high-throughput data processing.

`PartitionedBuffer.Queue` buffers arbitrary data in insertion order and
periodically processes it using a configurable processor function.
It implements partitioning to reduce lock contention during high-throughput
writes, and uses double-buffering to ensure zero-downtime processing.

## Data Flow

```asciidoc
push(buffer, items)
       |
       v
+-------------------+
| Partition Routing |
| phash2(item, N)   |
+-------------------+
   |         |         |
   v         v         v
+-------+ +-------+ +-------+     ETS :ordered_set
| P 0   | | P 1   | | P N-1 |     Key: {monotonic_time, ref}
+-------+ +-------+ +-------+     Val: item
   |         |         |
   v         v         v
+--------------------------------------+
| processor(batch)                     |
| batch = [val1, val2, ...]            |
+--------------------------------------+
```

Items are routed to partitions via `phash2`, stored in
`:ordered_set` ETS tables keyed by `{monotonic_time, ref}`
(ensuring insertion-time ordering with uniqueness), and
periodically flushed to the processor in batches.

## Examples

### Standalone Usage

    # Start a queue buffer with a custom processor
    iex> {:ok, _sup_pid} =
    ...>   PartitionedBuffer.Queue.start_link(
    ...>     name: :my_buffer,
    ...>     processor: fn batch -> IO.inspect(batch) end
    ...>   )

    # Push a single item into the buffer
    iex> PartitionedBuffer.Queue.push(:my_buffer, "item1")
    :ok

    # Push a batch of items
    iex> PartitionedBuffer.Queue.push(:my_buffer, ["item2", "item3"])
    :ok

    # Check buffer size
    iex> PartitionedBuffer.Queue.size(:my_buffer)
    3

    # Stop the buffer gracefully (processes remaining items)
    iex> PartitionedBuffer.Queue.stop(:my_buffer)
    :ok

### Adding to a Supervision Tree

    children = [
      {PartitionedBuffer.Queue,
       name: :my_buffer,
       processor: &MyApp.EventProcessor.process_batch/1}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)

## Processor

The processor function receives a list of values
(the items pushed to the buffer):

    fn batch ->
      # batch is [value1, value2, ...]
      Enum.each(batch, fn value -> process(value) end)
    end

## Options

See `PartitionedBuffer` for
[start](`m:PartitionedBuffer#module-start-options`) and
[runtime](`m:PartitionedBuffer#module-runtime-options`) options.

# `buffer`

```elixir
@type buffer() :: PartitionedBuffer.buffer()
```

Proxy type for a buffer

# `item`

```elixir
@type item() :: any()
```

Any term that will be buffered and processed

# `child_spec`

```elixir
@spec child_spec(keyword()) :: Supervisor.child_spec()
```

Returns the queue buffer child spec.

# `push`

```elixir
@spec push(buffer(), item() | [item()], keyword()) :: :ok
```

Pushes an item or a batch of items into the buffer.

## Parameters

  * `buffer` - The buffer name (atom).
  * `item_or_batch` - A single item or a list of items to push.
  * `opts` - Optional runtime options.

## Options

See `PartitionedBuffer` for
[runtime](`m:PartitionedBuffer#module-runtime-options`) options.

## Examples

    # Simple push with default routing
    push(:my_buffer, "item1")
    push(:my_buffer, ["item2", "item3"])

    # Custom partition routing using function
    push(:my_buffer, user_event, partition_key: &(&1.user_id))

    # Custom partition routing using MFA tuple (item prepended to args)
    push(:my_buffer, event, partition_key: {MyApp.Router, :get_partition, []})

    # Custom partition routing with fixed key (all items to same partition)
    push(:my_buffer, log_entry, partition_key: :logs)

# `size`

```elixir
@spec size(buffer()) :: non_neg_integer()
```

Returns the queue size (total number of items across all partitions).

## Examples

    size(:my_buffer)

# `start_link`

```elixir
@spec start_link(keyword()) :: Supervisor.on_start()
```

Starts a new queue buffer.

## Options

See `PartitionedBuffer` for
[start](`m:PartitionedBuffer#module-start-options`) options.

## Examples

    PartitionedBuffer.Queue.start_link(name: :my_queue_buffer)

# `stop`

```elixir
@spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok
```

Stops a queue buffer gracefully.

## Examples

    PartitionedBuffer.Queue.stop(:my_queue_buffer)

# `update_options`

```elixir
@spec update_options(
  buffer(),
  keyword()
) :: :ok
```

Updates the options for the queue buffer.

## Options

See `PartitionedBuffer.update_options/2` for the updatable options.

## Examples

    # Update the processing interval to 100ms
    update_options(:my_buffer, processing_interval_ms: 100)

---

*Consult [api-reference.md](api-reference.md) for complete listing*
