BullMQ.RedisConnection (BullMQ v1.2.7)
View SourceRedis connection management for BullMQ.
This module provides a supervised Redis connection pool using NimblePool. It handles connection lifecycle, reconnection, and provides a clean API for executing Redis commands.
Connection Behavior
This module follows the same philosophy as Redix:
Supervised connections - Connections are started under a supervisor and automatically reconnect when the TCP connection drops.
Fail-fast - Commands fail immediately with
{:error, reason}if the connection is unavailable. No hidden retries are performed.Caller handles retries - It's the caller's responsibility to decide whether to retry failed commands. BullMQ's Worker handles retries for job processing automatically.
This design follows Erlang/OTP principles: let the connection supervision handle reconnection, and let callers decide retry policy based on their specific needs.
Error Handling
Commands return {:error, reason} on failure. Common errors include:
%Redix.ConnectionError{reason: :closed}- Connection is down%Redix.ConnectionError{reason: :timeout}- Command timed out%Redix.Error{}- Redis returned an error (e.g., wrong type)
Example handling:
case BullMQ.RedisConnection.command(conn, ["GET", "key"]) do
{:ok, value} -> value
{:error, %Redix.ConnectionError{}} -> :connection_error
{:error, %Redix.Error{message: msg}} -> {:redis_error, msg}
{:error, reason} -> {:error, reason}
endUsage
Add to your supervision tree:
children = [
{BullMQ.RedisConnection,
name: :bullmq_redis,
url: "redis://localhost:6379",
pool_size: 10}
]Then use it with queues and workers:
BullMQ.Queue.add("my_queue", "job", %{}, connection: :bullmq_redis)Lua Script Loading
BullMQ uses Lua scripts for atomic Redis operations. All scripts are automatically loaded into Redis's script cache when the connection starts. This ensures the connection is fully ready for BullMQ operations (Worker, Queue, QueueEvents, etc.) before it's used.
Unlike Node.js BullMQ which uses ioredis's defineCommand to register scripts
on the client, the Elixir version loads scripts via SCRIPT LOAD during
initialization and uses EVALSHA for execution with automatic EVAL fallback
on NOSCRIPT errors (in case Redis was restarted and lost its script cache).
Options
:name- The name to register the connection pool (required):url- Redis URL (e.g., "redis://localhost:6379"):host- Redis host (default: "localhost"):port- Redis port (default: 6379):password- Redis password (optional):database- Redis database number (default: 0):pool_size- Number of connections in the pool (default: 10):ssl- Enable SSL (default: false):socket_opts- Additional socket options:timeout- Connection timeout in ms (default: 5000)
Summary
Functions
Creates a dedicated blocking connection for operations like BRPOPLPUSH or BZPOPMIN.
Returns a specification to start this module under a supervisor.
Closes the Redis connection pool.
Closes a blocking connection.
Executes a Redis command.
Executes a Redis command, raising on error.
Disconnects a blocking connection for reconnection.
Executes a Lua script.
Executes a Lua script using EVALSHA (cached script). Falls back to EVAL if the script is not cached.
Gets the underlying redis options for creating new connections.
Executes a pipeline of Redis commands.
Executes a pipeline, raising on error.
Sets the Redis client name on a connection or pid.
Starts the Redis connection pool.
Executes multiple commands in a Redis transaction (MULTI/EXEC).
Types
Functions
@spec blocking_connection( connection(), keyword() ) :: {:ok, pid()} | {:error, term()}
Creates a dedicated blocking connection for operations like BRPOPLPUSH or BZPOPMIN.
Returns a connection that can be used for blocking operations without affecting the main pool.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec close(connection(), timeout()) :: :ok | {:error, term()}
Closes the Redis connection pool.
@spec close_blocking(connection(), pid()) :: :ok
Closes a blocking connection.
@spec command(connection(), command(), keyword()) :: {:ok, term()} | {:error, term()}
Executes a Redis command.
Examples
BullMQ.RedisConnection.command(:my_redis, ["SET", "key", "value"])
#=> {:ok, "OK"}
BullMQ.RedisConnection.command(:my_redis, ["GET", "key"])
#=> {:ok, "value"}
@spec command!(connection(), command(), keyword()) :: term()
Executes a Redis command, raising on error.
@spec disconnect_blocking(pid()) :: :ok
Disconnects a blocking connection for reconnection.
@spec eval(connection(), String.t(), [String.t()], [term()], keyword()) :: {:ok, term()} | {:error, term()}
Executes a Lua script.
Examples
BullMQ.RedisConnection.eval(:my_redis, "return KEYS[1]", ["mykey"], [])
#=> {:ok, "mykey"}
@spec evalsha(connection(), String.t(), String.t(), [String.t()], [term()], keyword()) :: {:ok, term()} | {:error, term()}
Executes a Lua script using EVALSHA (cached script). Falls back to EVAL if the script is not cached.
@spec get_redis_opts(connection()) :: keyword()
Gets the underlying redis options for creating new connections.
@spec pipeline(connection(), pipeline(), keyword()) :: {:ok, [term()]} | {:error, term()}
Executes a pipeline of Redis commands.
Examples
BullMQ.RedisConnection.pipeline(:my_redis, [
["SET", "key1", "value1"],
["SET", "key2", "value2"],
["GET", "key1"]
])
#=> {:ok, ["OK", "OK", "value1"]}
@spec pipeline!(connection(), pipeline(), keyword()) :: [term()]
Executes a pipeline, raising on error.
@spec set_client_name(connection() | pid(), String.t()) :: :ok | {:error, term()}
Sets the Redis client name on a connection or pid.
@spec start_link(keyword()) :: Supervisor.on_start()
Starts the Redis connection pool.
@spec transaction(connection(), pipeline(), keyword()) :: {:ok, [term()]} | {:error, term()}
Executes multiple commands in a Redis transaction (MULTI/EXEC).
All commands are executed atomically - either all succeed or none do.
Returns {:ok, results} where results is a list of command results,
or {:error, reason} if the transaction fails.
Examples
BullMQ.RedisConnection.transaction(:my_redis, [
["SET", "key1", "value1"],
["SET", "key2", "value2"],
["GET", "key1"]
])
#=> {:ok, ["OK", "OK", "value1"]}