A GenServer for consuming Redis Streams via consumer groups.
Wraps XREADGROUP in a polling loop with automatic acknowledgement,
pending message recovery via XAUTOCLAIM, and consumer group creation.
The consumer group and stream are created automatically on startup if
they don't already exist.
Defining a Handler
Implement the Redis.Consumer.Handler behaviour to process messages:
defmodule MyApp.OrderHandler do
@behaviour Redis.Consumer.Handler
@impl true
def handle_messages(messages, metadata) do
for [stream, entries] <- messages, [id, fields] <- entries do
# fields is a flat list: ["field1", "value1", "field2", "value2", ...]
order = fields_to_map(fields)
process_order(order)
Logger.info("Processed order #{id} from #{stream}")
end
:ok
end
defp fields_to_map(fields) do
fields
|> Enum.chunk_every(2)
|> Map.new(fn [k, v] -> {k, v} end)
end
defp process_order(order), do: # ...
endReturn Values
The handler can control acknowledgement:
:ok- all messages are acknowledged automatically{:ok, ids}- only the specified message IDs are acknowledged{:error, reason}- no messages are acknowledged; they will be redelivered to this or another consumer via XAUTOCLAIM
Starting a Consumer
{:ok, consumer} = Redis.Consumer.start_link(
conn: conn,
stream: "orders",
group: "processors",
consumer: "proc-1",
handler: MyApp.OrderHandler
)Supervision
Add consumers to your supervision tree alongside the connection:
children = [
{Redis.Connection, port: 6379, name: :redis},
{Redis.Consumer,
conn: :redis,
stream: "orders",
group: "processors",
consumer: "proc-1",
handler: MyApp.OrderHandler,
name: :order_consumer}
]Scale out by adding more consumers with different :consumer names.
Redis distributes unacknowledged messages across consumers in the group.
Producing Messages
Any connection can write to the stream:
Redis.command(conn, ["XADD", "orders", "*",
"user_id", "123",
"item", "widget",
"qty", "5"
])Or use the command builder:
alias Redis.Commands.Stream
Redis.command(conn, Stream.xadd("orders", "*", [
{"user_id", "123"},
{"item", "widget"},
{"qty", "5"}
]))Options
:conn(required) - Redis connection (pid or registered name):stream(required) - stream key to consume from:group(required) - consumer group name:consumer(required) - consumer name within the group:handler(required) - module implementingRedis.Consumer.Handler:count- max entries per XREADGROUP call (default: 10):block- block timeout in ms waiting for new messages (default: 5000):claim_interval- ms between XAUTOCLAIM runs (default: 30_000):claim_min_idle- min idle time in ms for XAUTOCLAIM (default: 60_000):name- GenServer name registration
Message Recovery
On startup and periodically (every :claim_interval ms), the consumer
runs XAUTOCLAIM to reclaim messages that were delivered to other consumers
in the group but not acknowledged within :claim_min_idle ms. This handles
consumer crashes and restarts gracefully -- if a consumer dies mid-processing,
another consumer (or the restarted one) will pick up its pending messages.
Summary
Functions
Returns a specification to start this module under a supervisor.
Returns consumer info: stream, group, consumer name, handler.
Stops the consumer gracefully.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec info(GenServer.server()) :: map()
Returns consumer info: stream, group, consumer name, handler.
@spec stop(GenServer.server()) :: :ok
Stops the consumer gracefully.