BullMQ.Queue (BullMQ v1.0.1)
View SourceQueue management for BullMQ.
A Queue is the main entry point for adding jobs to be processed. It provides methods for adding single jobs, bulk jobs, and managing queue state.
Usage
The simplest way to use a queue is through the module functions:
# Add a job
{:ok, job} = BullMQ.Queue.add("my_queue", "email", %{to: "user@example.com"},
connection: :my_redis)
# Add a delayed job
{:ok, job} = BullMQ.Queue.add("my_queue", "email", %{to: "user@example.com"},
connection: :my_redis,
delay: 60_000) # 1 minute delay
# Add a prioritized job
{:ok, job} = BullMQ.Queue.add("my_queue", "email", %{to: "user@example.com"},
connection: :my_redis,
priority: 1) # Higher priority (lower number = higher priority)Using as a GenServer
For more control, you can start a Queue as a supervised process:
children = [
{BullMQ.Queue, name: :my_queue, queue: "my_queue", connection: :my_redis}
]
# Then use it by name
{:ok, job} = BullMQ.Queue.add(:my_queue, "email", %{to: "user@example.com"})Job Options
When adding jobs, you can specify various options:
:job_id- Custom job ID (auto-generated if not provided):delay- Delay in milliseconds before the job becomes active:priority- Job priority (0 = highest, default):attempts- Max number of retry attempts (default: 1):backoff- Retry backoff configuration:lifo- Use LIFO ordering instead of FIFO:timeout- Job timeout in milliseconds:remove_on_complete- Remove job after completion (true/false/age/count):remove_on_fail- Remove job after failure (true/false/age/count):repeat- Repeat/scheduling options:parent- Parent job reference for job flows
Queue Operations
# Pause the queue
:ok = BullMQ.Queue.pause("my_queue", connection: :my_redis)
# Resume the queue
:ok = BullMQ.Queue.resume("my_queue", connection: :my_redis)
# Get queue counts
{:ok, counts} = BullMQ.Queue.get_counts("my_queue", connection: :my_redis)
# %{waiting: 10, active: 2, delayed: 5, completed: 100, failed: 3}
# Drain the queue (remove all waiting jobs)
:ok = BullMQ.Queue.drain("my_queue", connection: :my_redis)
Summary
Functions
Adds a job to the queue.
Adds multiple jobs to the queue atomically in a single operation.
Returns a specification to start this module under a supervisor.
Cleans jobs in a specific state older than a grace period.
Returns the number of jobs waiting to be processed.
Drains the queue, removing all waiting jobs.
Exports queue metrics in Prometheus format.
Returns jobs in the "active" state.
Returns the number of jobs in the "active" state.
Returns jobs in the "completed" state.
Returns the number of jobs in the "completed" state.
Gets job counts by state.
Gets the job ID from a deduplication identifier.
Returns jobs in the "delayed" state.
Returns the number of jobs in the "delayed" state.
Returns jobs in the "failed" state.
Returns the number of jobs in the "failed" state.
Gets the global concurrency value for the queue.
Gets the global rate limit configuration for the queue.
Gets a job by ID.
Gets total job count for specific types.
Gets job counts for specific types.
Returns the logs for a given job.
Gets the current state of a job.
Gets jobs in specific state(s).
Gets queue metadata.
Gets queue metrics for completed or failed jobs.
Returns jobs in the "prioritized" state.
Returns the number of jobs in the "prioritized" state.
Gets the time to live for a rate limited key in milliseconds.
Gets the BullMQ version stored in the queue's metadata.
Returns jobs in the "waiting" state.
Returns jobs in the "waiting-children" state.
Returns the number of jobs in the "waiting-children" state.
Returns the number of jobs in the "waiting" or "paused" states.
Gets the list of workers connected to the queue.
Gets the count of workers connected to the queue.
Pauses the queue.
Checks if the queue is paused.
Removes a deduplication key.
Removes a job from the queue.
Resumes a paused queue.
Retries a failed job.
Starts a Queue GenServer.
Updates queue metadata (version and maxLenEvents) in Redis.
Types
@type t() :: %BullMQ.Queue{ connection: BullMQ.Types.redis_connection(), default_job_opts: map(), keys: BullMQ.Keys.queue_context(), name: String.t(), prefix: String.t(), telemetry: module() | nil }
Functions
@spec add( atom() | pid() | String.t(), BullMQ.Types.job_name(), BullMQ.Types.job_data(), keyword() ) :: {:ok, BullMQ.Job.t()} | {:error, term()}
Adds a job to the queue.
Parameters
queue- Queue name (string) or Queue GenServer name (atom/pid)name- Job name/typedata- Job data payloadopts- Job and connection options
Options
:connection- Redis connection (required when using queue name string):prefix- Queue prefix (default: "bull"):job_id- Custom job ID:delay- Delay in milliseconds:priority- Job priority (0 = highest):attempts- Max retry attempts:backoff- Backoff configuration:lifo- Use LIFO ordering:remove_on_complete- Job removal after completion:remove_on_fail- Job removal after failure
Examples
# Add a simple job
{:ok, job} = BullMQ.Queue.add("emails", "welcome", %{user_id: 123},
connection: :redis)
# Add a delayed job
{:ok, job} = BullMQ.Queue.add("emails", "reminder", %{user_id: 123},
connection: :redis,
delay: :timer.hours(24))
# Add a job with retries
{:ok, job} = BullMQ.Queue.add("payments", "process", %{amount: 100},
connection: :redis,
attempts: 5,
backoff: %{type: :exponential, delay: 1000})
@spec add_bulk( atom() | pid() | String.t(), [{BullMQ.Types.job_name(), BullMQ.Types.job_data(), keyword()}], keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Adds multiple jobs to the queue atomically in a single operation.
This operation is atomic: either all jobs are added or none are. It uses
Redis MULTI/EXEC transactions to ensure atomicity, which is significantly
more efficient than calling add/4 multiple times and provides stronger
guarantees. Achieves up to 10x throughput compared to sequential adds
(~60,000 jobs/sec vs ~6,000 jobs/sec).
Arguments
queue- Queue name (string) or Queue GenServer reference (atom/pid)jobs- List of job tuples:{name, data, opts}opts- Bulk operation options
Options
:connection- Redis connection (required when using queue name string):prefix- Queue prefix (default: "bull"):pipeline- Use transactional pipelining for efficiency (default:true):chunk_size- Number of jobs per transaction batch (default:100):connection_pool- List of Redis connections for parallel processing:concurrency- Max parallel tasks (default:8, capped by pool size)
Performance Tips
For maximum throughput when adding large numbers of jobs:
- Transactional pipelining (default): Already enabled, ~4x faster than sequential
- Connection pool: Provide multiple connections for parallel chunk processing
- Chunk size: Default of 100 is optimal for most cases
Examples
# Basic bulk add (uses pipelining automatically)
jobs = [
{"email", %{to: "user1@example.com"}, []},
{"email", %{to: "user2@example.com"}, []},
{"email", %{to: "user3@example.com"}, [priority: 1]}
]
{:ok, added_jobs} = BullMQ.Queue.add_bulk("emails", jobs, connection: :redis)
# High-performance bulk add with connection pool (~60K jobs/sec)
# First, create a pool of connections
pool = for i <- 1..8 do
name = :"redis_pool_#{i}"
{:ok, _} = Redix.start_link(host: "localhost", name: name)
name
end
# Then use the pool for parallel processing
{:ok, jobs} = BullMQ.Queue.add_bulk("emails", large_job_list,
connection: :redis,
connection_pool: pool,
chunk_size: 100
)
# Disable pipelining (sequential mode, slower)
{:ok, jobs} = BullMQ.Queue.add_bulk("emails", jobs,
connection: :redis,
pipeline: false
)Notes
- All jobs in a batch are added atomically (all or nothing)
- Standard jobs (no delay or priority) use optimized transactional pipelining
- Delayed and prioritized jobs fall back to sequential processing
- Returns
{:error, {:partial_failure, results}}if some jobs fail
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec clean( atom() | pid() | String.t(), BullMQ.Types.job_state(), non_neg_integer(), keyword() ) :: {:ok, [BullMQ.Types.job_id()]} | {:error, term()}
Cleans jobs in a specific state older than a grace period.
Parameters
queue- Queue name or GenServerstate- State to clean (:completed, :failed, :delayed, :waiting)grace- Grace period in milliseconds
Options
:limit- Maximum number of jobs to clean (default: 1000)
Examples
# Clean completed jobs older than 1 hour
{:ok, cleaned_ids} = BullMQ.Queue.clean("my_queue", :completed, 3600_000, connection: :redis)
Returns the number of jobs waiting to be processed.
This includes jobs that are "waiting", "delayed", "prioritized", or "waiting-children".
Examples
{:ok, count} = BullMQ.Queue.count("my_queue", connection: :redis)
# 15
Drains the queue, removing all waiting jobs.
Options
:delayed- Also remove delayed jobs (default: false)
Examples
:ok = BullMQ.Queue.drain("my_queue", connection: :redis)
:ok = BullMQ.Queue.drain("my_queue", connection: :redis, delayed: true)
@spec export_prometheus_metrics( atom() | pid() | String.t(), keyword() ) :: {:ok, String.t()} | {:error, term()}
Exports queue metrics in Prometheus format.
Automatically exports all the counts returned by get_job_counts/2.
Options
:global_variables- Additional labels to add to all metrics
Examples
{:ok, metrics} = BullMQ.Queue.export_prometheus_metrics("my_queue", connection: :redis)
# "# HELP bullmq_job_count Number of jobs in the queue by state\n..."
{:ok, metrics} = BullMQ.Queue.export_prometheus_metrics("my_queue",
connection: :redis,
global_variables: %{"env" => "production"})
@spec get_active( atom() | pid() | String.t(), keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Returns jobs in the "active" state.
Examples
{:ok, jobs} = BullMQ.Queue.get_active("my_queue", connection: :redis)
@spec get_active_count( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Returns the number of jobs in the "active" state.
Examples
{:ok, count} = BullMQ.Queue.get_active_count("my_queue", connection: :redis)
@spec get_completed( atom() | pid() | String.t(), keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Returns jobs in the "completed" state.
Examples
{:ok, jobs} = BullMQ.Queue.get_completed("my_queue", connection: :redis)
@spec get_completed_count( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Returns the number of jobs in the "completed" state.
Examples
{:ok, count} = BullMQ.Queue.get_completed_count("my_queue", connection: :redis)
Gets job counts by state.
Examples
{:ok, counts} = BullMQ.Queue.get_counts("my_queue", connection: :redis)
# %{waiting: 10, active: 2, delayed: 5, completed: 100, failed: 3}
@spec get_deduplication_job_id(atom() | pid() | String.t(), String.t(), keyword()) :: {:ok, String.t() | nil} | {:error, term()}
Gets the job ID from a deduplication identifier.
Returns the job ID that started the deduplicated state, or nil if not found.
Examples
{:ok, job_id} = BullMQ.Queue.get_deduplication_job_id("my_queue", "dedup-123", connection: :redis)
@spec get_delayed( atom() | pid() | String.t(), keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Returns jobs in the "delayed" state.
Examples
{:ok, jobs} = BullMQ.Queue.get_delayed("my_queue", connection: :redis)
@spec get_delayed_count( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Returns the number of jobs in the "delayed" state.
Examples
{:ok, count} = BullMQ.Queue.get_delayed_count("my_queue", connection: :redis)
@spec get_failed( atom() | pid() | String.t(), keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Returns jobs in the "failed" state.
Examples
{:ok, jobs} = BullMQ.Queue.get_failed("my_queue", connection: :redis)
@spec get_failed_count( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Returns the number of jobs in the "failed" state.
Examples
{:ok, count} = BullMQ.Queue.get_failed_count("my_queue", connection: :redis)
@spec get_global_concurrency( atom() | pid() | String.t(), keyword() ) :: {:ok, integer() | nil} | {:error, term()}
Gets the global concurrency value for the queue.
Returns nil if no value is set.
Examples
{:ok, concurrency} = BullMQ.Queue.get_global_concurrency("my_queue", connection: :redis)
# {:ok, 10} or {:ok, nil}
@spec get_global_rate_limit( atom() | pid() | String.t(), keyword() ) :: {:ok, %{max: integer(), duration: integer()} | nil} | {:error, term()}
Gets the global rate limit configuration for the queue.
Returns nil if no rate limit is set.
Examples
{:ok, rate_limit} = BullMQ.Queue.get_global_rate_limit("my_queue", connection: :redis)
# {:ok, %{max: 100, duration: 60000}} or {:ok, nil}
@spec get_job(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) :: {:ok, BullMQ.Job.t() | nil} | {:error, term()}
Gets a job by ID.
Examples
{:ok, job} = BullMQ.Queue.get_job("my_queue", "123", connection: :redis)
@spec get_job_count_by_types( atom() | pid() | String.t(), [BullMQ.Types.job_state()], keyword() ) :: {:ok, integer()} | {:error, term()}
Gets total job count for specific types.
Returns the sum of job counts for all specified types.
Examples
{:ok, total} = BullMQ.Queue.get_job_count_by_types("my_queue", [:waiting, :delayed], connection: :redis)
# 25
@spec get_job_counts( atom() | pid() | String.t(), [BullMQ.Types.job_state()], keyword() ) :: {:ok, map()} | {:error, term()}
Gets job counts for specific types.
Returns a map with job counts for each type specified.
Examples
{:ok, counts} = BullMQ.Queue.get_job_counts("my_queue", [:waiting, :completed], connection: :redis)
# %{waiting: 10, completed: 50}
@spec get_job_logs(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) :: {:ok, %{logs: [String.t()], count: integer()}} | {:error, term()}
Returns the logs for a given job.
Options
:start- Start index (default: 0):end- End index (default: -1):asc- If true, return logs in ascending order (default: true)
Examples
{:ok, %{logs: logs, count: count}} = BullMQ.Queue.get_job_logs("my_queue", "123", connection: :redis)
@spec get_job_state(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) :: {:ok, BullMQ.Types.job_state()} | {:error, term()}
Gets the current state of a job.
Examples
{:ok, :waiting} = BullMQ.Queue.get_job_state("my_queue", "123", connection: :redis)
@spec get_jobs( atom() | pid() | String.t(), BullMQ.Types.job_state() | [BullMQ.Types.job_state()], keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Gets jobs in specific state(s).
Accepts either a single state atom or a list of states.
Options
:start- Start index (default: 0):end- End index (default: -1, meaning all):asc- If true, return jobs in ascending order (default: false)
Examples
{:ok, jobs} = BullMQ.Queue.get_jobs("my_queue", :waiting, connection: :redis)
{:ok, jobs} = BullMQ.Queue.get_jobs("my_queue", [:waiting, :active], connection: :redis)
{:ok, jobs} = BullMQ.Queue.get_jobs("my_queue", :failed, connection: :redis, start: 0, end: 9)
Gets queue metadata.
Returns metadata including paused state, version, concurrency, and rate limit settings.
Examples
{:ok, meta} = BullMQ.Queue.get_meta("my_queue", connection: :redis)
# %{paused: false, version: "bullmq:5.0.0", concurrency: 10, max: nil, duration: nil}
@spec get_metrics(atom() | pid() | String.t(), :completed | :failed, keyword()) :: {:ok, map()} | {:error, term()}
Gets queue metrics for completed or failed jobs.
The metrics are represented as an array of job counts per unit of time (1 minute).
Parameters
queue- Queue name or GenServertype-:completedor:failedopts- Options
Options
:start- Start point of the metrics (default: 0, newest):end- End point of the metrics (default: -1, oldest)
Examples
{:ok, metrics} = BullMQ.Queue.get_metrics("my_queue", :completed, connection: :redis)
# %{
# meta: %{count: 100, prev_ts: 1234567890, prev_count: 5},
# data: [10, 15, 20, ...],
# count: 60
# }
@spec get_prioritized( atom() | pid() | String.t(), keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Returns jobs in the "prioritized" state.
Examples
{:ok, jobs} = BullMQ.Queue.get_prioritized("my_queue", connection: :redis)
@spec get_prioritized_count( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Returns the number of jobs in the "prioritized" state.
Examples
{:ok, count} = BullMQ.Queue.get_prioritized_count("my_queue", connection: :redis)
@spec get_rate_limit_ttl( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Gets the time to live for a rate limited key in milliseconds.
Returns
-2if the key does not exist-1if the key exists but has no associated expire- TTL in milliseconds otherwise
Examples
{:ok, ttl} = BullMQ.Queue.get_rate_limit_ttl("my_queue", connection: :redis)
@spec get_version( atom() | pid() | String.t(), keyword() ) :: {:ok, String.t() | nil} | {:error, term()}
Gets the BullMQ version stored in the queue's metadata.
This version indicates the Lua script version/capabilities of the queue, which corresponds to the Node.js BullMQ version. This is important for frontend tools like Bull Board to match features.
Returns nil if no version has been set (e.g., the queue was never used
or was created with skip_meta_update: true).
Examples
{:ok, version} = BullMQ.Queue.get_version("my_queue", connection: :redis)
# "bullmq:5.65.1"See Also
BullMQ.Version- Module containing the BullMQ version constants
@spec get_waiting( atom() | pid() | String.t(), keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Returns jobs in the "waiting" state.
Parameters
queue- Queue name or GenServeropts- Options
Options
:start- Start index (default: 0):end- End index (default: -1):connection- Redis connection (required for string queue):prefix- Queue prefix (default: "bull")
Examples
{:ok, jobs} = BullMQ.Queue.get_waiting("my_queue", connection: :redis)
{:ok, jobs} = BullMQ.Queue.get_waiting("my_queue", connection: :redis, start: 0, end: 9)
@spec get_waiting_children( atom() | pid() | String.t(), keyword() ) :: {:ok, [BullMQ.Job.t()]} | {:error, term()}
Returns jobs in the "waiting-children" state.
These are parent jobs that have at least one child that has not completed yet.
Examples
{:ok, jobs} = BullMQ.Queue.get_waiting_children("my_queue", connection: :redis)
@spec get_waiting_children_count( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Returns the number of jobs in the "waiting-children" state.
Examples
{:ok, count} = BullMQ.Queue.get_waiting_children_count("my_queue", connection: :redis)
@spec get_waiting_count( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Returns the number of jobs in the "waiting" or "paused" states.
Examples
{:ok, count} = BullMQ.Queue.get_waiting_count("my_queue", connection: :redis)
Gets the list of workers connected to the queue.
Note: This may not work on all Redis providers (e.g., GCP doesn't support CLIENT LIST).
Examples
{:ok, workers} = BullMQ.Queue.get_workers("my_queue", connection: :redis)
@spec get_workers_count( atom() | pid() | String.t(), keyword() ) :: {:ok, integer()} | {:error, term()}
Gets the count of workers connected to the queue.
Examples
{:ok, count} = BullMQ.Queue.get_workers_count("my_queue", connection: :redis)
Pauses the queue.
When paused, workers will not pick up new jobs. Active jobs will continue to completion.
Examples
:ok = BullMQ.Queue.pause("my_queue", connection: :redis)
Checks if the queue is paused.
@spec remove_deduplication_key(atom() | pid() | String.t(), String.t(), keyword()) :: {:ok, non_neg_integer()} | {:error, term()}
Removes a deduplication key.
This allows new jobs with the same deduplication ID to be added immediately, even if the TTL hasn't expired or the original job hasn't completed.
Returns the number of keys removed (0 or 1).
Examples
# Stop deduplication for a specific ID
{:ok, 1} = BullMQ.Queue.remove_deduplication_key("my_queue", "dedup-123", connection: :redis)
# Key doesn't exist
{:ok, 0} = BullMQ.Queue.remove_deduplication_key("my_queue", "unknown", connection: :redis)
@spec remove_job(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) :: {:ok, integer()} | {:error, term()}
Removes a job from the queue.
Options
:remove_children- Also remove child jobs (default: false)
Examples
{:ok, 1} = BullMQ.Queue.remove_job("my_queue", "123", connection: :redis)
Resumes a paused queue.
Examples
:ok = BullMQ.Queue.resume("my_queue", connection: :redis)
@spec retry_job(atom() | pid() | String.t(), BullMQ.Types.job_id(), keyword()) :: :ok | {:error, term()}
Retries a failed job.
Options
:lifo- Add to front of queue (default: false)
Examples
:ok = BullMQ.Queue.retry_job("my_queue", "123", connection: :redis)
@spec start_link(keyword()) :: GenServer.on_start()
Starts a Queue GenServer.
Options
:name- GenServer name (required):queue- Queue name in Redis (required):connection- Redis connection (required):prefix- Queue prefix (default: "bull"):default_job_opts- Default options for all jobs
Examples
{:ok, pid} = BullMQ.Queue.start_link(
name: :my_queue,
queue: "my_queue",
connection: :my_redis
)
Updates queue metadata (version and maxLenEvents) in Redis.
This is automatically called when using a Queue GenServer, but for stateless usage you may want to call this explicitly to ensure the version is set. This is important for frontend tools like Bull Board to properly detect queue capabilities.
Options
:connection- Redis connection (required):prefix- Queue prefix (default: "bull"):max_len_events- Maximum length of the event stream (default: 10_000)
Examples
:ok = BullMQ.Queue.update_meta("my_queue", connection: :redis)
:ok = BullMQ.Queue.update_meta("my_queue", connection: :redis, max_len_events: 50_000)See Also
BullMQ.Version- The BullMQ version that will be set