# `Redis.Consumer`
[🔗](https://github.com/joshrotenberg/redis_ex/blob/v0.7.1/lib/redis/consumer.ex#L1)

A GenServer for consuming Redis Streams via consumer groups.

Wraps `XREADGROUP` in a polling loop with automatic acknowledgement,
pending message recovery via `XAUTOCLAIM`, and consumer group creation.
The consumer group and stream are created automatically on startup if
they don't already exist.

## Defining a Handler

Implement the `Redis.Consumer.Handler` behaviour to process messages:

    defmodule MyApp.OrderHandler do
      @behaviour Redis.Consumer.Handler

      @impl true
      def handle_messages(messages, metadata) do
        for [stream, entries] <- messages, [id, fields] <- entries do
          # fields is a flat list: ["field1", "value1", "field2", "value2", ...]
          order = fields_to_map(fields)
          process_order(order)
          Logger.info("Processed order #{id} from #{stream}")
        end

        :ok
      end

      defp fields_to_map(fields) do
        fields
        |> Enum.chunk_every(2)
        |> Map.new(fn [k, v] -> {k, v} end)
      end

      defp process_order(order), do: # ...
    end

## Return Values

The handler can control acknowledgement:

  * `:ok` - all messages are acknowledged automatically
  * `{:ok, ids}` - only the specified message IDs are acknowledged
  * `{:error, reason}` - no messages are acknowledged; they will be
    redelivered to this or another consumer via XAUTOCLAIM

## Starting a Consumer

    {:ok, consumer} = Redis.Consumer.start_link(
      conn: conn,
      stream: "orders",
      group: "processors",
      consumer: "proc-1",
      handler: MyApp.OrderHandler
    )

## Supervision

Add consumers to your supervision tree alongside the connection:

    children = [
      {Redis.Connection, port: 6379, name: :redis},
      {Redis.Consumer,
       conn: :redis,
       stream: "orders",
       group: "processors",
       consumer: "proc-1",
       handler: MyApp.OrderHandler,
       name: :order_consumer}
    ]

Scale out by adding more consumers with different `:consumer` names.
Redis distributes unacknowledged messages across consumers in the group.

## Producing Messages

Any connection can write to the stream:

    Redis.command(conn, ["XADD", "orders", "*",
      "user_id", "123",
      "item", "widget",
      "qty", "5"
    ])

Or use the command builder:

    alias Redis.Commands.Stream
    Redis.command(conn, Stream.xadd("orders", "*", [
      {"user_id", "123"},
      {"item", "widget"},
      {"qty", "5"}
    ]))

## Options

  * `:conn` (required) - Redis connection (pid or registered name)
  * `:stream` (required) - stream key to consume from
  * `:group` (required) - consumer group name
  * `:consumer` (required) - consumer name within the group
  * `:handler` (required) - module implementing `Redis.Consumer.Handler`
  * `:count` - max entries per XREADGROUP call (default: 10)
  * `:block` - block timeout in ms waiting for new messages (default: 5000)
  * `:claim_interval` - ms between XAUTOCLAIM runs (default: 30_000)
  * `:claim_min_idle` - min idle time in ms for XAUTOCLAIM (default: 60_000)
  * `:name` - GenServer name registration

## Message Recovery

On startup and periodically (every `:claim_interval` ms), the consumer
runs `XAUTOCLAIM` to reclaim messages that were delivered to other consumers
in the group but not acknowledged within `:claim_min_idle` ms. This handles
consumer crashes and restarts gracefully -- if a consumer dies mid-processing,
another consumer (or the restarted one) will pick up its pending messages.

# `child_spec`

Returns a specification to start this module under a supervisor.

See `Supervisor`.

# `info`

```elixir
@spec info(GenServer.server()) :: map()
```

Returns consumer info: stream, group, consumer name, handler.

# `start_link`

# `stop`

```elixir
@spec stop(GenServer.server()) :: :ok
```

Stops the consumer gracefully.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
