# `Redis.Consumer.Handler`
[🔗](https://github.com/joshrotenberg/redis_ex/blob/v0.7.1/lib/redis/consumer/handler.ex#L1)

Behaviour for handling messages from a Redis Streams consumer group.

## Example

    defmodule MyApp.NotificationHandler do
      @behaviour Redis.Consumer.Handler
      require Logger

      @impl true
      def handle_messages(messages, metadata) do
        for [stream, entries] <- messages, [id, fields] <- entries do
          fields_map = fields |> Enum.chunk_every(2) |> Map.new(fn [k, v] -> {k, v} end)

          case send_notification(fields_map) do
            :ok -> Logger.info("Sent notification #{id}")
            {:error, reason} -> Logger.error("Failed #{id}: #{inspect(reason)}")
          end
        end

        :ok
      end

      defp send_notification(fields), do: # ...
    end

## Selective Acknowledgement

Return `{:ok, ids}` to acknowledge only specific messages. Unacknowledged
messages remain pending and will be redelivered via XAUTOCLAIM:

    def handle_messages(messages, _metadata) do
      {succeeded, _failed} =
        messages
        |> Enum.flat_map(fn [_stream, entries] -> entries end)
        |> Enum.split_with(fn [_id, fields] -> process(fields) == :ok end)

      {:ok, Enum.map(succeeded, fn [id, _] -> id end)}
    end

## Return Values

  * `:ok` - all messages processed successfully, acknowledge all
  * `{:ok, ids}` - selectively acknowledge only the given message IDs
  * `{:error, reason}` - processing failed, messages will NOT be acknowledged
    and will be redelivered on the next XAUTOCLAIM cycle

## Metadata

The `metadata` map contains:

  * `:stream` - the stream key
  * `:group` - the consumer group name
  * `:claimed` - `true` when messages were recovered via XAUTOCLAIM
    (absent for normal deliveries)

# `entry`

```elixir
@type entry() :: {message_id(), [field()]}
```

# `field`

```elixir
@type field() :: String.t()
```

# `message_id`

```elixir
@type message_id() :: String.t()
```

# `metadata`

```elixir
@type metadata() :: map()
```

# `stream_messages`

```elixir
@type stream_messages() :: {String.t(), [entry()]}
```

# `handle_messages`

```elixir
@callback handle_messages([stream_messages()], metadata()) ::
  :ok | {:ok, [message_id()]} | {:error, term()}
```

Called when messages are received from the stream.

`messages` is a list of `{stream_key, entries}` tuples where each entry
is `{message_id, [field1, value1, field2, value2, ...]}`.

`metadata` contains `:stream`, `:group`, and optionally `:claimed` (true
when messages were recovered via XAUTOCLAIM).

---

*Consult [api-reference.md](api-reference.md) for complete listing*
