BullMQ.Queue (BullMQ v1.0.1)

View Source

Queue management for BullMQ.

A Queue is the main entry point for adding jobs to be processed. It provides methods for adding single jobs, bulk jobs, and managing queue state.

Usage

The simplest way to use a queue is through the module functions:

# Add a job
{:ok, job} = BullMQ.Queue.add("my_queue", "email", %{to: "user@example.com"},
  connection: :my_redis)

# Add a delayed job
{:ok, job} = BullMQ.Queue.add("my_queue", "email", %{to: "user@example.com"},
  connection: :my_redis,
  delay: 60_000)  # 1 minute delay

# Add a prioritized job
{:ok, job} = BullMQ.Queue.add("my_queue", "email", %{to: "user@example.com"},
  connection: :my_redis,
  priority: 1)  # Higher priority (lower number = higher priority)

Using as a GenServer

For more control, you can start a Queue as a supervised process:

children = [
  {BullMQ.Queue, name: :my_queue, queue: "my_queue", connection: :my_redis}
]

# Then use it by name
{:ok, job} = BullMQ.Queue.add(:my_queue, "email", %{to: "user@example.com"})

Job Options

When adding jobs, you can specify various options:

  • :job_id - Custom job ID (auto-generated if not provided)
  • :delay - Delay in milliseconds before the job becomes active
  • :priority - Job priority (0 = highest, default)
  • :attempts - Max number of retry attempts (default: 1)
  • :backoff - Retry backoff configuration
  • :lifo - Use LIFO ordering instead of FIFO
  • :timeout - Job timeout in milliseconds
  • :remove_on_complete - Remove job after completion (true/false/age/count)
  • :remove_on_fail - Remove job after failure (true/false/age/count)
  • :repeat - Repeat/scheduling options
  • :parent - Parent job reference for job flows

Queue Operations

# Pause the queue
:ok = BullMQ.Queue.pause("my_queue", connection: :my_redis)

# Resume the queue
:ok = BullMQ.Queue.resume("my_queue", connection: :my_redis)

# Get queue counts
{:ok, counts} = BullMQ.Queue.get_counts("my_queue", connection: :my_redis)
# %{waiting: 10, active: 2, delayed: 5, completed: 100, failed: 3}

# Drain the queue (remove all waiting jobs)
:ok = BullMQ.Queue.drain("my_queue", connection: :my_redis)

Summary

Functions

Adds a job to the queue.

Adds multiple jobs to the queue atomically in a single operation.

Returns a specification to start this module under a supervisor.

Cleans jobs in a specific state older than a grace period.

Returns the number of jobs waiting to be processed.

Drains the queue, removing all waiting jobs.

Exports queue metrics in Prometheus format.

Returns jobs in the "active" state.

Returns the number of jobs in the "active" state.

Returns jobs in the "completed" state.

Returns the number of jobs in the "completed" state.

Gets job counts by state.

Gets the job ID from a deduplication identifier.

Returns jobs in the "delayed" state.

Returns the number of jobs in the "delayed" state.

Returns jobs in the "failed" state.

Returns the number of jobs in the "failed" state.

Gets the global concurrency value for the queue.

Gets the global rate limit configuration for the queue.

Gets total job count for specific types.

Gets job counts for specific types.

Returns the logs for a given job.

Gets the current state of a job.

Gets jobs in specific state(s).

Gets queue metadata.

Gets queue metrics for completed or failed jobs.

Returns jobs in the "prioritized" state.

Returns the number of jobs in the "prioritized" state.

Gets the time to live for a rate limited key in milliseconds.

Gets the BullMQ version stored in the queue's metadata.

Returns jobs in the "waiting" state.

Returns jobs in the "waiting-children" state.

Returns the number of jobs in the "waiting-children" state.

Returns the number of jobs in the "waiting" or "paused" states.

Gets the list of workers connected to the queue.

Gets the count of workers connected to the queue.

Pauses the queue.

Checks if the queue is paused.

Removes a job from the queue.

Resumes a paused queue.

Retries a failed job.

Starts a Queue GenServer.

Updates queue metadata (version and maxLenEvents) in Redis.

Types

t()

@type t() :: %BullMQ.Queue{
  connection: BullMQ.Types.redis_connection(),
  default_job_opts: map(),
  keys: BullMQ.Keys.queue_context(),
  name: String.t(),
  prefix: String.t(),
  telemetry: module() | nil
}

Functions

add(queue, name, data, opts \\ [])

@spec add(
  atom() | pid() | String.t(),
  BullMQ.Types.job_name(),
  BullMQ.Types.job_data(),
  keyword()
) ::
  {:ok, BullMQ.Job.t()} | {:error, term()}

Adds a job to the queue.

Parameters

  • queue - Queue name (string) or Queue GenServer name (atom/pid)
  • name - Job name/type
  • data - Job data payload
  • opts - Job and connection options

Options

  • :connection - Redis connection (required when using queue name string)
  • :prefix - Queue prefix (default: "bull")
  • :job_id - Custom job ID
  • :delay - Delay in milliseconds
  • :priority - Job priority (0 = highest)
  • :attempts - Max retry attempts
  • :backoff - Backoff configuration
  • :lifo - Use LIFO ordering
  • :remove_on_complete - Job removal after completion
  • :remove_on_fail - Job removal after failure

Examples

# Add a simple job
{:ok, job} = BullMQ.Queue.add("emails", "welcome", %{user_id: 123},
  connection: :redis)

# Add a delayed job
{:ok, job} = BullMQ.Queue.add("emails", "reminder", %{user_id: 123},
  connection: :redis,
  delay: :timer.hours(24))

# Add a job with retries
{:ok, job} = BullMQ.Queue.add("payments", "process", %{amount: 100},
  connection: :redis,
  attempts: 5,
  backoff: %{type: :exponential, delay: 1000})

add_bulk(queue, jobs, opts \\ [])

@spec add_bulk(
  atom() | pid() | String.t(),
  [{BullMQ.Types.job_name(), BullMQ.Types.job_data(), keyword()}],
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Adds multiple jobs to the queue atomically in a single operation.

This operation is atomic: either all jobs are added or none are. It uses Redis MULTI/EXEC transactions to ensure atomicity, which is significantly more efficient than calling add/4 multiple times and provides stronger guarantees. Achieves up to 10x throughput compared to sequential adds (~60,000 jobs/sec vs ~6,000 jobs/sec).

Arguments

  • queue - Queue name (string) or Queue GenServer reference (atom/pid)
  • jobs - List of job tuples: {name, data, opts}
  • opts - Bulk operation options

Options

  • :connection - Redis connection (required when using queue name string)
  • :prefix - Queue prefix (default: "bull")
  • :pipeline - Use transactional pipelining for efficiency (default: true)
  • :chunk_size - Number of jobs per transaction batch (default: 100)
  • :connection_pool - List of Redis connections for parallel processing
  • :concurrency - Max parallel tasks (default: 8, capped by pool size)

Performance Tips

For maximum throughput when adding large numbers of jobs:

  1. Transactional pipelining (default): Already enabled, ~4x faster than sequential
  2. Connection pool: Provide multiple connections for parallel chunk processing
  3. Chunk size: Default of 100 is optimal for most cases

Examples

# Basic bulk add (uses pipelining automatically)
jobs = [
  {"email", %{to: "user1@example.com"}, []},
  {"email", %{to: "user2@example.com"}, []},
  {"email", %{to: "user3@example.com"}, [priority: 1]}
]
{:ok, added_jobs} = BullMQ.Queue.add_bulk("emails", jobs, connection: :redis)

# High-performance bulk add with connection pool (~60K jobs/sec)
# First, create a pool of connections
pool = for i <- 1..8 do
  name = :"redis_pool_#{i}"
  {:ok, _} = Redix.start_link(host: "localhost", name: name)
  name
end

# Then use the pool for parallel processing
{:ok, jobs} = BullMQ.Queue.add_bulk("emails", large_job_list,
  connection: :redis,
  connection_pool: pool,
  chunk_size: 100
)

# Disable pipelining (sequential mode, slower)
{:ok, jobs} = BullMQ.Queue.add_bulk("emails", jobs,
  connection: :redis,
  pipeline: false
)

Notes

  • All jobs in a batch are added atomically (all or nothing)
  • Standard jobs (no delay or priority) use optimized transactional pipelining
  • Delayed and prioritized jobs fall back to sequential processing
  • Returns {:error, {:partial_failure, results}} if some jobs fail

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

clean(queue, state, grace, opts \\ [])

@spec clean(
  atom() | pid() | String.t(),
  BullMQ.Types.job_state(),
  non_neg_integer(),
  keyword()
) ::
  {:ok, [BullMQ.Types.job_id()]} | {:error, term()}

Cleans jobs in a specific state older than a grace period.

Parameters

  • queue - Queue name or GenServer
  • state - State to clean (:completed, :failed, :delayed, :waiting)
  • grace - Grace period in milliseconds

Options

  • :limit - Maximum number of jobs to clean (default: 1000)

Examples

# Clean completed jobs older than 1 hour
{:ok, cleaned_ids} = BullMQ.Queue.clean("my_queue", :completed, 3600_000, connection: :redis)

count(queue, opts \\ [])

@spec count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Returns the number of jobs waiting to be processed.

This includes jobs that are "waiting", "delayed", "prioritized", or "waiting-children".

Examples

{:ok, count} = BullMQ.Queue.count("my_queue", connection: :redis)
# 15

drain(queue, opts \\ [])

@spec drain(
  atom() | pid() | String.t(),
  keyword()
) :: :ok | {:error, term()}

Drains the queue, removing all waiting jobs.

Options

  • :delayed - Also remove delayed jobs (default: false)

Examples

:ok = BullMQ.Queue.drain("my_queue", connection: :redis)
:ok = BullMQ.Queue.drain("my_queue", connection: :redis, delayed: true)

export_prometheus_metrics(queue, opts \\ [])

@spec export_prometheus_metrics(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, String.t()} | {:error, term()}

Exports queue metrics in Prometheus format.

Automatically exports all the counts returned by get_job_counts/2.

Options

  • :global_variables - Additional labels to add to all metrics

Examples

{:ok, metrics} = BullMQ.Queue.export_prometheus_metrics("my_queue", connection: :redis)
# "# HELP bullmq_job_count Number of jobs in the queue by state\n..."

{:ok, metrics} = BullMQ.Queue.export_prometheus_metrics("my_queue",
  connection: :redis,
  global_variables: %{"env" => "production"})

get_active(queue, opts \\ [])

@spec get_active(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Returns jobs in the "active" state.

Examples

{:ok, jobs} = BullMQ.Queue.get_active("my_queue", connection: :redis)

get_active_count(queue, opts \\ [])

@spec get_active_count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Returns the number of jobs in the "active" state.

Examples

{:ok, count} = BullMQ.Queue.get_active_count("my_queue", connection: :redis)

get_completed(queue, opts \\ [])

@spec get_completed(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Returns jobs in the "completed" state.

Examples

{:ok, jobs} = BullMQ.Queue.get_completed("my_queue", connection: :redis)

get_completed_count(queue, opts \\ [])

@spec get_completed_count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Returns the number of jobs in the "completed" state.

Examples

{:ok, count} = BullMQ.Queue.get_completed_count("my_queue", connection: :redis)

get_counts(queue, opts \\ [])

@spec get_counts(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Gets job counts by state.

Examples

{:ok, counts} = BullMQ.Queue.get_counts("my_queue", connection: :redis)
# %{waiting: 10, active: 2, delayed: 5, completed: 100, failed: 3}

get_deduplication_job_id(queue, dedup_id, opts \\ [])

@spec get_deduplication_job_id(atom() | pid() | String.t(), String.t(), keyword()) ::
  {:ok, String.t() | nil} | {:error, term()}

Gets the job ID from a deduplication identifier.

Returns the job ID that started the deduplicated state, or nil if not found.

Examples

{:ok, job_id} = BullMQ.Queue.get_deduplication_job_id("my_queue", "dedup-123", connection: :redis)

get_delayed(queue, opts \\ [])

@spec get_delayed(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Returns jobs in the "delayed" state.

Examples

{:ok, jobs} = BullMQ.Queue.get_delayed("my_queue", connection: :redis)

get_delayed_count(queue, opts \\ [])

@spec get_delayed_count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Returns the number of jobs in the "delayed" state.

Examples

{:ok, count} = BullMQ.Queue.get_delayed_count("my_queue", connection: :redis)

get_failed(queue, opts \\ [])

@spec get_failed(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Returns jobs in the "failed" state.

Examples

{:ok, jobs} = BullMQ.Queue.get_failed("my_queue", connection: :redis)

get_failed_count(queue, opts \\ [])

@spec get_failed_count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Returns the number of jobs in the "failed" state.

Examples

{:ok, count} = BullMQ.Queue.get_failed_count("my_queue", connection: :redis)

get_global_concurrency(queue, opts \\ [])

@spec get_global_concurrency(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer() | nil} | {:error, term()}

Gets the global concurrency value for the queue.

Returns nil if no value is set.

Examples

{:ok, concurrency} = BullMQ.Queue.get_global_concurrency("my_queue", connection: :redis)
# {:ok, 10} or {:ok, nil}

get_global_rate_limit(queue, opts \\ [])

@spec get_global_rate_limit(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, %{max: integer(), duration: integer()} | nil} | {:error, term()}

Gets the global rate limit configuration for the queue.

Returns nil if no rate limit is set.

Examples

{:ok, rate_limit} = BullMQ.Queue.get_global_rate_limit("my_queue", connection: :redis)
# {:ok, %{max: 100, duration: 60000}} or {:ok, nil}

get_job(queue, job_id, opts \\ [])

@spec get_job(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) ::
  {:ok, BullMQ.Job.t() | nil} | {:error, term()}

Gets a job by ID.

Examples

{:ok, job} = BullMQ.Queue.get_job("my_queue", "123", connection: :redis)

get_job_count_by_types(queue, types, opts \\ [])

@spec get_job_count_by_types(
  atom() | pid() | String.t(),
  [BullMQ.Types.job_state()],
  keyword()
) ::
  {:ok, integer()} | {:error, term()}

Gets total job count for specific types.

Returns the sum of job counts for all specified types.

Examples

{:ok, total} = BullMQ.Queue.get_job_count_by_types("my_queue", [:waiting, :delayed], connection: :redis)
# 25

get_job_counts(queue, types, opts \\ [])

@spec get_job_counts(
  atom() | pid() | String.t(),
  [BullMQ.Types.job_state()],
  keyword()
) ::
  {:ok, map()} | {:error, term()}

Gets job counts for specific types.

Returns a map with job counts for each type specified.

Examples

{:ok, counts} = BullMQ.Queue.get_job_counts("my_queue", [:waiting, :completed], connection: :redis)
# %{waiting: 10, completed: 50}

get_job_logs(queue, job_id, opts \\ [])

@spec get_job_logs(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) ::
  {:ok, %{logs: [String.t()], count: integer()}} | {:error, term()}

Returns the logs for a given job.

Options

  • :start - Start index (default: 0)
  • :end - End index (default: -1)
  • :asc - If true, return logs in ascending order (default: true)

Examples

{:ok, %{logs: logs, count: count}} = BullMQ.Queue.get_job_logs("my_queue", "123", connection: :redis)

get_job_state(queue, job_id, opts \\ [])

@spec get_job_state(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) ::
  {:ok, BullMQ.Types.job_state()} | {:error, term()}

Gets the current state of a job.

Examples

{:ok, :waiting} = BullMQ.Queue.get_job_state("my_queue", "123", connection: :redis)

get_jobs(queue, states, opts \\ [])

@spec get_jobs(
  atom() | pid() | String.t(),
  BullMQ.Types.job_state() | [BullMQ.Types.job_state()],
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Gets jobs in specific state(s).

Accepts either a single state atom or a list of states.

Options

  • :start - Start index (default: 0)
  • :end - End index (default: -1, meaning all)
  • :asc - If true, return jobs in ascending order (default: false)

Examples

{:ok, jobs} = BullMQ.Queue.get_jobs("my_queue", :waiting, connection: :redis)
{:ok, jobs} = BullMQ.Queue.get_jobs("my_queue", [:waiting, :active], connection: :redis)
{:ok, jobs} = BullMQ.Queue.get_jobs("my_queue", :failed, connection: :redis, start: 0, end: 9)

get_meta(queue, opts \\ [])

@spec get_meta(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Gets queue metadata.

Returns metadata including paused state, version, concurrency, and rate limit settings.

Examples

{:ok, meta} = BullMQ.Queue.get_meta("my_queue", connection: :redis)
# %{paused: false, version: "bullmq:5.0.0", concurrency: 10, max: nil, duration: nil}

get_metrics(queue, type, opts \\ [])

@spec get_metrics(atom() | pid() | String.t(), :completed | :failed, keyword()) ::
  {:ok, map()} | {:error, term()}

Gets queue metrics for completed or failed jobs.

The metrics are represented as an array of job counts per unit of time (1 minute).

Parameters

  • queue - Queue name or GenServer
  • type - :completed or :failed
  • opts - Options

Options

  • :start - Start point of the metrics (default: 0, newest)
  • :end - End point of the metrics (default: -1, oldest)

Examples

{:ok, metrics} = BullMQ.Queue.get_metrics("my_queue", :completed, connection: :redis)
# %{
#   meta: %{count: 100, prev_ts: 1234567890, prev_count: 5},
#   data: [10, 15, 20, ...],
#   count: 60
# }

get_prioritized(queue, opts \\ [])

@spec get_prioritized(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Returns jobs in the "prioritized" state.

Examples

{:ok, jobs} = BullMQ.Queue.get_prioritized("my_queue", connection: :redis)

get_prioritized_count(queue, opts \\ [])

@spec get_prioritized_count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Returns the number of jobs in the "prioritized" state.

Examples

{:ok, count} = BullMQ.Queue.get_prioritized_count("my_queue", connection: :redis)

get_rate_limit_ttl(queue, opts \\ [])

@spec get_rate_limit_ttl(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Gets the time to live for a rate limited key in milliseconds.

Returns

  • -2 if the key does not exist
  • -1 if the key exists but has no associated expire
  • TTL in milliseconds otherwise

Examples

{:ok, ttl} = BullMQ.Queue.get_rate_limit_ttl("my_queue", connection: :redis)

get_version(queue, opts \\ [])

@spec get_version(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, String.t() | nil} | {:error, term()}

Gets the BullMQ version stored in the queue's metadata.

This version indicates the Lua script version/capabilities of the queue, which corresponds to the Node.js BullMQ version. This is important for frontend tools like Bull Board to match features.

Returns nil if no version has been set (e.g., the queue was never used or was created with skip_meta_update: true).

Examples

{:ok, version} = BullMQ.Queue.get_version("my_queue", connection: :redis)
# "bullmq:5.65.1"

See Also

get_waiting(queue, opts \\ [])

@spec get_waiting(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Returns jobs in the "waiting" state.

Parameters

  • queue - Queue name or GenServer
  • opts - Options

Options

  • :start - Start index (default: 0)
  • :end - End index (default: -1)
  • :connection - Redis connection (required for string queue)
  • :prefix - Queue prefix (default: "bull")

Examples

{:ok, jobs} = BullMQ.Queue.get_waiting("my_queue", connection: :redis)
{:ok, jobs} = BullMQ.Queue.get_waiting("my_queue", connection: :redis, start: 0, end: 9)

get_waiting_children(queue, opts \\ [])

@spec get_waiting_children(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}

Returns jobs in the "waiting-children" state.

These are parent jobs that have at least one child that has not completed yet.

Examples

{:ok, jobs} = BullMQ.Queue.get_waiting_children("my_queue", connection: :redis)

get_waiting_children_count(queue, opts \\ [])

@spec get_waiting_children_count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Returns the number of jobs in the "waiting-children" state.

Examples

{:ok, count} = BullMQ.Queue.get_waiting_children_count("my_queue", connection: :redis)

get_waiting_count(queue, opts \\ [])

@spec get_waiting_count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Returns the number of jobs in the "waiting" or "paused" states.

Examples

{:ok, count} = BullMQ.Queue.get_waiting_count("my_queue", connection: :redis)

get_workers(queue, opts \\ [])

@spec get_workers(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, [map()]} | {:error, term()}

Gets the list of workers connected to the queue.

Note: This may not work on all Redis providers (e.g., GCP doesn't support CLIENT LIST).

Examples

{:ok, workers} = BullMQ.Queue.get_workers("my_queue", connection: :redis)

get_workers_count(queue, opts \\ [])

@spec get_workers_count(
  atom() | pid() | String.t(),
  keyword()
) :: {:ok, integer()} | {:error, term()}

Gets the count of workers connected to the queue.

Examples

{:ok, count} = BullMQ.Queue.get_workers_count("my_queue", connection: :redis)

pause(queue, opts \\ [])

@spec pause(
  atom() | pid() | String.t(),
  keyword()
) :: :ok | {:error, term()}

Pauses the queue.

When paused, workers will not pick up new jobs. Active jobs will continue to completion.

Examples

:ok = BullMQ.Queue.pause("my_queue", connection: :redis)

paused?(queue, opts \\ [])

@spec paused?(
  atom() | pid() | String.t(),
  keyword()
) :: boolean()

Checks if the queue is paused.

remove_deduplication_key(queue, dedup_id, opts \\ [])

@spec remove_deduplication_key(atom() | pid() | String.t(), String.t(), keyword()) ::
  {:ok, non_neg_integer()} | {:error, term()}

Removes a deduplication key.

This allows new jobs with the same deduplication ID to be added immediately, even if the TTL hasn't expired or the original job hasn't completed.

Returns the number of keys removed (0 or 1).

Examples

# Stop deduplication for a specific ID
{:ok, 1} = BullMQ.Queue.remove_deduplication_key("my_queue", "dedup-123", connection: :redis)

# Key doesn't exist
{:ok, 0} = BullMQ.Queue.remove_deduplication_key("my_queue", "unknown", connection: :redis)

remove_job(queue, job_id, opts \\ [])

@spec remove_job(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) ::
  {:ok, integer()} | {:error, term()}

Removes a job from the queue.

Options

  • :remove_children - Also remove child jobs (default: false)

Examples

{:ok, 1} = BullMQ.Queue.remove_job("my_queue", "123", connection: :redis)

resume(queue, opts \\ [])

@spec resume(
  atom() | pid() | String.t(),
  keyword()
) :: :ok | {:error, term()}

Resumes a paused queue.

Examples

:ok = BullMQ.Queue.resume("my_queue", connection: :redis)

retry_job(queue, job_id, opts \\ [])

@spec retry_job(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) ::
  :ok | {:error, term()}

Retries a failed job.

Options

  • :lifo - Add to front of queue (default: false)

Examples

:ok = BullMQ.Queue.retry_job("my_queue", "123", connection: :redis)

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a Queue GenServer.

Options

  • :name - GenServer name (required)
  • :queue - Queue name in Redis (required)
  • :connection - Redis connection (required)
  • :prefix - Queue prefix (default: "bull")
  • :default_job_opts - Default options for all jobs

Examples

{:ok, pid} = BullMQ.Queue.start_link(
  name: :my_queue,
  queue: "my_queue",
  connection: :my_redis
)

update_meta(queue, opts)

@spec update_meta(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Updates queue metadata (version and maxLenEvents) in Redis.

This is automatically called when using a Queue GenServer, but for stateless usage you may want to call this explicitly to ensure the version is set. This is important for frontend tools like Bull Board to properly detect queue capabilities.

Options

  • :connection - Redis connection (required)
  • :prefix - Queue prefix (default: "bull")
  • :max_len_events - Maximum length of the event stream (default: 10_000)

Examples

:ok = BullMQ.Queue.update_meta("my_queue", connection: :redis)
:ok = BullMQ.Queue.update_meta("my_queue", connection: :redis, max_len_events: 50_000)

See Also