Redis.Cluster (Redis v0.7.1)

Copy Markdown View Source

Redis Cluster client.

Maintains topology awareness, routes commands to the correct node based on hash slot, and handles MOVED/ASK redirects transparently.

Usage

{:ok, cluster} = Redis.Cluster.start_link(
  nodes: [{"127.0.0.1", 7000}, {"127.0.0.1", 7001}],
  password: "secret"
)

{:ok, "OK"} = Redis.Cluster.command(cluster, ["SET", "mykey", "myvalue"])
{:ok, "myvalue"} = Redis.Cluster.command(cluster, ["GET", "mykey"])

Options

  • :nodes - list of seed nodes as {host, port} tuples or "host:port" strings
  • :password - Redis password (applied to all nodes)
  • :name - GenServer name registration
  • :timeout - command timeout ms (default: 5_000)
  • :max_redirects - max MOVED/ASK redirects before failing (default: 5)

Summary

Functions

Returns a specification to start this module under a supervisor.

Sends a command, routed to the correct node by key hash slot.

Returns cluster info: nodes, slot coverage.

Sends a pipeline of commands. If all commands target the same hash slot, they are sent as a single pipeline. Otherwise, commands are automatically split by slot, sent in parallel to the correct nodes, and results are reassembled in the original command order.

Forces a topology refresh.

Executes a MULTI/EXEC transaction in cluster mode.

Executes a WATCH-based optimistic locking transaction in cluster mode.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

command(cluster, args, opts \\ [])

@spec command(GenServer.server(), [String.t()], keyword()) ::
  {:ok, term()} | {:error, term()}

Sends a command, routed to the correct node by key hash slot.

info(cluster)

@spec info(GenServer.server()) :: map()

Returns cluster info: nodes, slot coverage.

pipeline(cluster, commands, opts \\ [])

@spec pipeline(GenServer.server(), [[String.t()]], keyword()) ::
  {:ok, [term()]} | {:error, term()}

Sends a pipeline of commands. If all commands target the same hash slot, they are sent as a single pipeline. Otherwise, commands are automatically split by slot, sent in parallel to the correct nodes, and results are reassembled in the original command order.

refresh(cluster)

@spec refresh(GenServer.server()) :: :ok | {:error, term()}

Forces a topology refresh.

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

stop(cluster)

@spec stop(GenServer.server()) :: :ok

transaction(cluster, commands, opts \\ [])

@spec transaction(GenServer.server(), [[String.t()]], keyword()) ::
  {:ok, [term()]} | {:error, term()}

Executes a MULTI/EXEC transaction in cluster mode.

All commands must target the same hash slot. Returns {:error, :cross_slot} if commands span multiple slots. Use hash tags (e.g. {user}.name) to ensure related keys land in the same slot.

Cluster.transaction(cluster, [
  ["SET", "{user:1}.name", "Alice"],
  ["SET", "{user:1}.email", "alice@example.com"]
])

watch_transaction(cluster, keys, fun, opts \\ [])

@spec watch_transaction(
  GenServer.server(),
  [String.t()],
  (GenServer.server() -> [[String.t()]] | {:abort, term()}),
  keyword()
) :: {:ok, [term()]} | {:error, term()}

Executes a WATCH-based optimistic locking transaction in cluster mode.

All watched keys and all commands produced by the function must target the same hash slot. Returns {:error, :cross_slot} if they don't.

Cluster.watch_transaction(cluster, ["{acct:1}.balance"], fn conn ->
  {:ok, bal} = Redis.Connection.command(conn, ["GET", "{acct:1}.balance"])
  new_bal = String.to_integer(bal) + 100
  [["SET", "{acct:1}.balance", to_string(new_bal)]]
end)