BullMQ.RedisConnection (BullMQ v1.0.1)

View Source

Redis connection management for BullMQ.

This module provides a supervised Redis connection pool using NimblePool. It handles connection lifecycle, reconnection, and provides a clean API for executing Redis commands.

Usage

Add to your supervision tree:

children = [
  {BullMQ.RedisConnection,
    name: :bullmq_redis,
    url: "redis://localhost:6379",
    pool_size: 10}
]

Then use it with queues and workers:

BullMQ.Queue.add("my_queue", "job", %{}, connection: :bullmq_redis)

Options

  • :name - The name to register the connection pool (required)
  • :url - Redis URL (e.g., "redis://localhost:6379")
  • :host - Redis host (default: "localhost")
  • :port - Redis port (default: 6379)
  • :password - Redis password (optional)
  • :database - Redis database number (default: 0)
  • :pool_size - Number of connections in the pool (default: 10)
  • :ssl - Enable SSL (default: false)
  • :socket_opts - Additional socket options
  • :timeout - Connection timeout in ms (default: 5000)

Summary

Functions

Creates a dedicated blocking connection for operations like BRPOPLPUSH or BZPOPMIN.

Returns a specification to start this module under a supervisor.

Closes a blocking connection.

Executes a Redis command.

Executes a Redis command, raising on error.

Disconnects a blocking connection for reconnection.

Executes a Lua script using EVALSHA (cached script). Falls back to EVAL if the script is not cached.

Gets the underlying redis options for creating new connections.

Executes a pipeline of Redis commands.

Executes a pipeline, raising on error.

Starts the Redis connection pool.

Executes multiple commands in a Redis transaction (MULTI/EXEC).

Types

command()

@type command() :: [binary() | integer()]

connection()

@type connection() :: atom() | pid()

pipeline()

@type pipeline() :: [command()]

Functions

blocking_connection(conn, opts \\ [])

@spec blocking_connection(
  connection(),
  keyword()
) :: {:ok, pid()} | {:error, term()}

Creates a dedicated blocking connection for operations like BRPOPLPUSH or BZPOPMIN.

Returns a connection that can be used for blocking operations without affecting the main pool.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

close_blocking(conn, pid)

@spec close_blocking(connection(), pid()) :: :ok

Closes a blocking connection.

command(conn, command, opts \\ [])

@spec command(connection(), command(), keyword()) :: {:ok, term()} | {:error, term()}

Executes a Redis command.

Examples

BullMQ.RedisConnection.command(:my_redis, ["SET", "key", "value"])
#=> {:ok, "OK"}

BullMQ.RedisConnection.command(:my_redis, ["GET", "key"])
#=> {:ok, "value"}

command!(conn, command, opts \\ [])

@spec command!(connection(), command(), keyword()) :: term()

Executes a Redis command, raising on error.

disconnect_blocking(pid)

@spec disconnect_blocking(pid()) :: :ok

Disconnects a blocking connection for reconnection.

eval(conn, script, keys, args, opts \\ [])

@spec eval(connection(), String.t(), [String.t()], [term()], keyword()) ::
  {:ok, term()} | {:error, term()}

Executes a Lua script.

Examples

BullMQ.RedisConnection.eval(:my_redis, "return KEYS[1]", ["mykey"], [])
#=> {:ok, "mykey"}

evalsha(conn, sha, script, keys, args, opts \\ [])

@spec evalsha(connection(), String.t(), String.t(), [String.t()], [term()], keyword()) ::
  {:ok, term()} | {:error, term()}

Executes a Lua script using EVALSHA (cached script). Falls back to EVAL if the script is not cached.

get_redis_opts(conn)

@spec get_redis_opts(connection()) :: keyword()

Gets the underlying redis options for creating new connections.

pipeline(conn, commands, opts \\ [])

@spec pipeline(connection(), pipeline(), keyword()) ::
  {:ok, [term()]} | {:error, term()}

Executes a pipeline of Redis commands.

Examples

BullMQ.RedisConnection.pipeline(:my_redis, [
  ["SET", "key1", "value1"],
  ["SET", "key2", "value2"],
  ["GET", "key1"]
])
#=> {:ok, ["OK", "OK", "value1"]}

pipeline!(conn, commands, opts \\ [])

@spec pipeline!(connection(), pipeline(), keyword()) :: [term()]

Executes a pipeline, raising on error.

start_link(opts)

@spec start_link(keyword()) :: Supervisor.on_start()

Starts the Redis connection pool.

transaction(conn, commands, opts \\ [])

@spec transaction(connection(), pipeline(), keyword()) ::
  {:ok, [term()]} | {:error, term()}

Executes multiple commands in a Redis transaction (MULTI/EXEC).

All commands are executed atomically - either all succeed or none do. Returns {:ok, results} where results is a list of command results, or {:error, reason} if the transaction fails.

Examples

BullMQ.RedisConnection.transaction(:my_redis, [
  ["SET", "key1", "value1"],
  ["SET", "key2", "value2"],
  ["GET", "key1"]
])
#=> {:ok, ["OK", "OK", "value1"]}