Redis.Consumer (Redis v0.7.1)

Copy Markdown View Source

A GenServer for consuming Redis Streams via consumer groups.

Wraps XREADGROUP in a polling loop with automatic acknowledgement, pending message recovery via XAUTOCLAIM, and consumer group creation. The consumer group and stream are created automatically on startup if they don't already exist.

Defining a Handler

Implement the Redis.Consumer.Handler behaviour to process messages:

defmodule MyApp.OrderHandler do
  @behaviour Redis.Consumer.Handler

  @impl true
  def handle_messages(messages, metadata) do
    for [stream, entries] <- messages, [id, fields] <- entries do
      # fields is a flat list: ["field1", "value1", "field2", "value2", ...]
      order = fields_to_map(fields)
      process_order(order)
      Logger.info("Processed order #{id} from #{stream}")
    end

    :ok
  end

  defp fields_to_map(fields) do
    fields
    |> Enum.chunk_every(2)
    |> Map.new(fn [k, v] -> {k, v} end)
  end

  defp process_order(order), do: # ...
end

Return Values

The handler can control acknowledgement:

  • :ok - all messages are acknowledged automatically
  • {:ok, ids} - only the specified message IDs are acknowledged
  • {:error, reason} - no messages are acknowledged; they will be redelivered to this or another consumer via XAUTOCLAIM

Starting a Consumer

{:ok, consumer} = Redis.Consumer.start_link(
  conn: conn,
  stream: "orders",
  group: "processors",
  consumer: "proc-1",
  handler: MyApp.OrderHandler
)

Supervision

Add consumers to your supervision tree alongside the connection:

children = [
  {Redis.Connection, port: 6379, name: :redis},
  {Redis.Consumer,
   conn: :redis,
   stream: "orders",
   group: "processors",
   consumer: "proc-1",
   handler: MyApp.OrderHandler,
   name: :order_consumer}
]

Scale out by adding more consumers with different :consumer names. Redis distributes unacknowledged messages across consumers in the group.

Producing Messages

Any connection can write to the stream:

Redis.command(conn, ["XADD", "orders", "*",
  "user_id", "123",
  "item", "widget",
  "qty", "5"
])

Or use the command builder:

alias Redis.Commands.Stream
Redis.command(conn, Stream.xadd("orders", "*", [
  {"user_id", "123"},
  {"item", "widget"},
  {"qty", "5"}
]))

Options

  • :conn (required) - Redis connection (pid or registered name)
  • :stream (required) - stream key to consume from
  • :group (required) - consumer group name
  • :consumer (required) - consumer name within the group
  • :handler (required) - module implementing Redis.Consumer.Handler
  • :count - max entries per XREADGROUP call (default: 10)
  • :block - block timeout in ms waiting for new messages (default: 5000)
  • :claim_interval - ms between XAUTOCLAIM runs (default: 30_000)
  • :claim_min_idle - min idle time in ms for XAUTOCLAIM (default: 60_000)
  • :name - GenServer name registration

Message Recovery

On startup and periodically (every :claim_interval ms), the consumer runs XAUTOCLAIM to reclaim messages that were delivered to other consumers in the group but not acknowledged within :claim_min_idle ms. This handles consumer crashes and restarts gracefully -- if a consumer dies mid-processing, another consumer (or the restarted one) will pick up its pending messages.

Summary

Functions

Returns a specification to start this module under a supervisor.

Returns consumer info: stream, group, consumer name, handler.

Stops the consumer gracefully.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

info(consumer)

@spec info(GenServer.server()) :: map()

Returns consumer info: stream, group, consumer name, handler.

start_link(opts)

stop(consumer)

@spec stop(GenServer.server()) :: :ok

Stops the consumer gracefully.