# `PhoenixMicro.Pipeline`
[🔗](https://github.com/iamkanishka/phoenix_micro/blob/v1.0.0/lib/phoenix_micro/pipeline.ex#L1)

A Broadway-backed message-processing pipeline for `PhoenixMicro.Consumer`.

## Why Broadway instead of bare Tasks?

The previous `Consumer.Worker` dispatched messages using `Task.start/1`:
a fire-and-forget approach that offers no backpressure. Under load, the
broker can push messages faster than handlers can process them, causing:

- Unbounded process growth (OOM risk).
- No visibility into queue depth.
- All in-flight messages lost on node restart.

Broadway solves this with a **demand-driven pull model**:

```
Transport broker
      │   push (unbounded)
      ▼
BroadwayProducer  ←── demand ── Processor pool (N workers)
      │                              │
      │   pull (demand-gated)        ▼
      └─────────────────────── your handle/2
```

Processors only request more messages when they have capacity, creating
true end-to-end backpressure from handler back to broker.

## Pipeline topology

    BroadwayProducer (1 process)
          │
          ▼  (up to :concurrency messages in flight)
    Processor pool  ←── runs middleware + consumer.handle/2
          │
          ▼  (optional — only when :batch_size > 1)
    Batcher pool    ←── runs consumer.handle_batch/3 if defined

## Usage

`PhoenixMicro.Pipeline` is started automatically for every registered
consumer by `ConsumerManager`. You do not start it directly.

You configure it via the Consumer DSL:

    defmodule MyApp.PaymentsConsumer do
      use PhoenixMicro.Consumer

      topic       "payments.created"
      concurrency 10        # processor pool size
      batch_size  50        # group messages before handle_batch/3
      batch_timeout 1_000   # flush batch after this many ms even if not full

      def handle(message, _ctx), do: :ok

      # Optional batch handler — only called when batch_size > 1
      def handle_batch(_batch_name, messages, _batch_info, _ctx) do
        Enum.map(messages, fn msg -> Broadway.Message.ack(msg) end)
      end
    end

## Telemetry

In addition to the standard `[:phoenix_micro, :message, *]` events,
the pipeline emits:

- `[:phoenix_micro, :pipeline, :demand]`   — processor demand updates
- `[:phoenix_micro, :pipeline, :enqueued]` — message entered buffer
- `[:phoenix_micro, :pipeline, :buffer_full]` — buffer at max_demand cap
- `[:broadway, :processor, :message, :start/stop/exception]` — Broadway's own spans

# `child_spec`

```elixir
@spec child_spec({module(), keyword()}) :: Supervisor.child_spec()
```

Returns a child spec suitable for `DynamicSupervisor`.

# `start_link`

```elixir
@spec start_link(
  module(),
  keyword()
) :: GenServer.on_start()
```

Starts a supervised Broadway pipeline for the given consumer module.

Called automatically by `PhoenixMicro.Supervisor.ConsumerManager`.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
