# `PhoenixMicro.Consumer`
[🔗](https://github.com/iamkanishka/phoenix_micro/blob/v1.0.0/lib/phoenix_micro/consumer.ex#L1)

DSL for defining topic consumers with concurrency control, retry logic, and middleware.

## Example

    defmodule MyApp.Payments.CreatedConsumer do
      use PhoenixMicro.Consumer

      topic "payments.created"
      concurrency 10
      retry max_attempts: 5, base_delay: 500, max_delay: 30_000

      middleware [
        PhoenixMicro.Middleware.Logger,
        PhoenixMicro.Middleware.Metrics
      ]

      @impl PhoenixMicro.Consumer
      def handle(%PhoenixMicro.Message{} = message, _context) do
        %{"amount" => amount, "currency" => currency} = message.payload
        # ... process ...
        :ok
      end

      # Optional: override per-message error handling
      @impl PhoenixMicro.Consumer
      def handle_error(message, error, _context) do
        Logger.error("Payment processing failed", error: inspect(error))
        {:retry, message}
      end
    end

## Behaviour callbacks

Consumers must implement `handle/2`. `handle_error/3` is optional.

The return value of `handle/2` controls the ack/nack flow:
- `:ok`              — Ack the message.
- `{:ok, _result}`   — Ack.
- `{:error, reason}` — Nack; triggers retry if attempts remain, DLQ otherwise.
- `{:retry, message}`— Explicit retry request.
- `:nack`            — Nack without retry.

# `context`

```elixir
@type context() :: %{
  transport: atom(),
  topic: String.t(),
  attempt: pos_integer(),
  transport_mod: module(),
  message: PhoenixMicro.Message.t()
}
```

# `handle`

```elixir
@callback handle(PhoenixMicro.Message.t(), context()) ::
  :ok | {:ok, term()} | {:error, term()} | :nack
```

# `handle_error`
*optional* 

```elixir
@callback handle_error(PhoenixMicro.Message.t(), term(), context()) ::
  {:retry, PhoenixMicro.Message.t()} | :nack | :ok
```

# `ack`

```elixir
@spec ack(PhoenixMicro.Message.t(), context()) :: :ok
```

Manually acknowledges a message from within a handler.

Use this when you need to ack before the handler returns — for example,
when kicking off an async process and you want to release the broker slot
immediately.

    def handle(message, context) do
      PhoenixMicro.Consumer.ack(message, context)
      Task.start(fn -> do_slow_work(message.payload) end)
      :ok
    end

# `batch_size`
*macro* 

Number of messages Broadway will accumulate before calling `handle_batch/4`.
Set to `1` (default) to disable batching and process messages individually.

# `batch_timeout`
*macro* 

Maximum time in milliseconds to wait before flushing an incomplete batch.
Only relevant when `batch_size > 1`. Default: 1000ms.

# `concurrency`
*macro* 

# `config`

```elixir
@spec config(module()) :: map()
```

Returns the consumer configuration map for the given module.

# `dead_letter_topic`
*macro* 

# `dispatch`

```elixir
@spec dispatch(module(), PhoenixMicro.Message.t(), context()) ::
  :ok | {:error, term()}
```

Invokes the consumer's handle/2 through its middleware chain.

# `middleware`
*macro* 

# `nack`

```elixir
@spec nack(PhoenixMicro.Message.t(), context(), term()) :: :ok
```

Manually negatively-acknowledges a message from within a handler.

Use when you detect a permanent failure and want to route to DLQ
immediately without going through the retry count.

    def handle(message, context) do
      if invalid_payload?(message.payload) do
        PhoenixMicro.Consumer.nack(message, context, :invalid_payload)
        :nack
      else
        process(message.payload)
        :ok
      end
    end

# `pipeline`
*macro* 

Processing mode. `:broadway` (default) uses a full Broadway pipeline with
backpressure and demand-driven scheduling. `:simple` uses the legacy
`Task.start` dispatch — useful when Broadway is not a dependency.

# `queue_group`
*macro* 

# `retry`
*macro* 

# `start_link`

```elixir
@spec start_link(
  module(),
  keyword()
) :: GenServer.on_start()
```

Starts a supervised consumer worker for the given consumer module.
Called by `PhoenixMicro.Supervisor.ConsumerManager`.

# `topic`
*macro* 

# `transport`
*macro* 

---

*Consult [api-reference.md](api-reference.md) for complete listing*
