BullMQ.Worker (BullMQ v1.0.1)
View SourceWorker process for processing BullMQ jobs.
A Worker is responsible for fetching jobs from a queue and processing them. It supports configurable concurrency, automatic retries, rate limiting, and stalled job detection.
Usage
Add a worker to your supervision tree:
children = [
{BullMQ.RedisConnection, name: :redis, url: "redis://localhost:6379"},
{BullMQ.Worker,
name: :my_worker,
queue: "my_queue",
connection: :redis,
processor: &MyApp.Jobs.process/1,
concurrency: 10}
]Processor Function
The processor function receives a BullMQ.Job struct and should return:
{:ok, result}- Job completed successfully with result:ok- Job completed successfully (no return value){:error, reason}- Job failed with reason{:delay, milliseconds}- Delay job and retry later (does not increment attempts){:rate_limit, milliseconds}- Move job to delayed due to rate limiting:waiting- Move job back to waiting queue:waiting_children- Move job to waiting-children state (waits for child jobs)
Examples
defmodule MyApp.Jobs do
def process(%BullMQ.Job{name: "email", data: data}) do
case send_email(data) do
:ok -> {:ok, %{sent: true}}
{:error, reason} -> {:error, reason}
end
end
def process(%BullMQ.Job{name: "heavy_task", data: data} = job) do
# Update progress
BullMQ.Worker.update_progress(job, 0)
result = do_work(data)
BullMQ.Worker.update_progress(job, 100)
{:ok, result}
end
endOptions
:name- Process name for registration (atom):queue- Queue name to process (required, string):connection- Redis connection name or pid (required):processor- Job processor function (required). Set tonilwithautorun: falsefor manual processing:prefix- Redis key prefix (default: "bull"):concurrency- Max concurrent jobs (default: 1):lock_duration- Job lock TTL in ms (default: 30000):stalled_interval- Stalled check interval in ms (default: 30000):max_stalled_count- Max stalls before failure (default: 1):limiter- Rate limiter config%{max: n, duration: ms}:autorun- Start processing immediately (default: true)
Event Callbacks
:on_completed-fn job, result -> ... endwhen a job completes:on_failed-fn job, reason -> ... endwhen a job fails:on_error-fn error -> ... endfor worker errors:on_active-fn job -> ... endwhen a job becomes active:on_progress-fn job, progress -> ... endon progress updates:on_stalled-fn job_id -> ... endwhen a job stalls
Concurrency
Unlike Node.js which uses a single thread with async operations, Elixir workers use true parallelism with multiple processes. Each concurrent job runs in its own process under the worker's supervision.
Summary
Functions
Gets the count of currently active jobs.
Gets a list of all active job IDs.
Cancels all jobs currently being processed by this worker.
Cancels a specific job currently being processed by this worker.
Returns a specification to start this module under a supervisor.
Closes the worker gracefully.
Manually fetches the next job from the queue.
Adds a log entry to a job.
Pauses the worker.
Checks if the worker is paused.
Resumes a paused worker.
Checks if the worker is running (processing jobs).
Starts a worker process.
Starts the stalled jobs checker timer.
Stops the stalled jobs checker timer.
Updates the data of a job.
Updates the progress of a job being processed.
Types
@type processor() :: (BullMQ.Job.t() -> {:ok, term()} | :ok | {:error, term()} | {:delay, non_neg_integer()}) | (BullMQ.Job.t(), String.t() -> {:ok, term()} | :ok | {:error, term()} | {:delay, non_neg_integer()}) | (BullMQ.Job.t(), String.t(), BullMQ.CancellationToken.t() -> {:ok, term()} | :ok | {:error, term()} | {:delay, non_neg_integer()})
@type t() :: %BullMQ.Worker{ active_jobs: map(), blocking_conn: pid() | nil, cancellation_tokens: map(), closing: boolean(), concurrency: pos_integer(), connection: BullMQ.Types.redis_connection(), keys: BullMQ.Keys.queue_context(), limiter: map() | nil, lock_duration: pos_integer(), lock_manager: pid() | nil, max_stalled_count: non_neg_integer(), name: atom() | nil, on_active: (BullMQ.Job.t() -> any()) | nil, on_completed: (BullMQ.Job.t(), term() -> any()) | nil, on_error: (term() -> any()) | nil, on_failed: (BullMQ.Job.t(), String.t() -> any()) | nil, on_lock_renewal_failed: ([String.t()] -> any()) | nil, on_progress: (BullMQ.Job.t(), term() -> any()) | nil, on_stalled: (String.t() -> any()) | nil, opts: map(), paused: boolean(), prefix: String.t(), processor: processor(), processor_supports_cancellation: boolean(), queue_name: String.t(), running: boolean(), stalled_interval: pos_integer(), stalled_timer: reference() | nil, telemetry: module() | nil, token: String.t() }
Functions
@spec active_count(GenServer.server()) :: non_neg_integer()
Gets the count of currently active jobs.
@spec active_job_ids(GenServer.server()) :: [String.t()]
Gets a list of all active job IDs.
@spec cancel_all_jobs(GenServer.server(), BullMQ.CancellationToken.reason()) :: :ok
Cancels all jobs currently being processed by this worker.
All active job processor functions will be notified of cancellation.
Parameters
worker- The worker processreason- Optional reason for the cancellation (default: nil)
Example
BullMQ.Worker.cancel_all_jobs(worker, "System shutdown")
@spec cancel_job(GenServer.server(), String.t(), BullMQ.CancellationToken.reason()) :: boolean()
Cancels a specific job currently being processed by this worker.
The job's processor function can detect cancellation by:
- Checking
BullMQ.CancellationToken.cancelled?(cancel_token) - Subscribing with
BullMQ.CancellationToken.subscribe(cancel_token)
Parameters
worker- The worker processjob_id- The ID of the job to cancelreason- Optional reason for the cancellation (default: nil)
Returns
trueif the job was found and cancelledfalseif the job was not found (not active)
Example
BullMQ.Worker.cancel_job(worker, "job-123", "User requested cancellation")
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec close( GenServer.server(), keyword() ) :: :ok
Closes the worker gracefully.
Options
:force- Don't wait for active jobs (default: false):timeout- Max wait time in ms (default: 30000)
@spec get_next_job(GenServer.server(), String.t(), keyword()) :: {:ok, BullMQ.Job.t() | nil} | {:error, term()}
Manually fetches the next job from the queue.
This is used for manual job processing where you want to control the job lifecycle yourself instead of using automatic worker processing.
When processing jobs manually:
- Create a worker without a processor (set
autorun: false) - Call
get_next_job/2to fetch jobs - Process the job
- Call
Job.move_to_completed/4orJob.move_to_failed/4 - Optionally call
start_stalled_check_timer/1to handle stalled jobs
Parameters
worker- The worker processtoken- A unique token representing this worker's ownership of the job
Options
:block- Iftrue(default), usesBZPOPMINto efficiently wait for a job.If `false`, returns immediately with `nil` if no job is available.:timeout- Timeout in seconds for blocking wait (default: 5). Only used when`block: true`. After timeout, returns `{:ok, nil}`.
Returns
{:ok, job}- A job was fetched successfully{:ok, nil}- No job available (timeout orblock: false) or worker is paused/closing{:error, reason}- An error occurred
Blocking Behavior
When block: true (the default), this function uses Redis's BZPOPMIN command
to efficiently wait for jobs without polling. This is the same mechanism used
by Node.js BullMQ. The function will:
- First try to fetch a job immediately
- If no job is available, wait using
BZPOPMINon the marker key - When a job becomes available (marker is set), fetch and return it
- If timeout is reached, return
{:ok, nil}
Token
The token represents ownership of the job's lock. Use a unique value (like a UUID)
for each job. The same token must be passed to Job.move_to_completed/4,
Job.move_to_failed/4, or Job.extend_lock/3.
Example
# Create a worker without automatic processing
{:ok, worker} = Worker.start_link(
queue: "my-queue",
connection: :redis,
processor: nil,
autorun: false
)
# Start stalled job checker
:ok = Worker.start_stalled_check_timer(worker)
# Processing loop - blocks up to 10 seconds waiting for jobs
token = UUID.uuid4()
case Worker.get_next_job(worker, token, timeout: 10) do
{:ok, nil} ->
# Timeout - no job available
:ok
{:ok, job} ->
# Process the job
case do_work(job.data) do
{:ok, result} ->
Job.move_to_completed(job, result, token)
{:error, reason} ->
Job.move_to_failed(job, reason, token)
end
end
# Non-blocking fetch
case Worker.get_next_job(worker, token, block: false) do
{:ok, nil} -> :no_job
{:ok, job} -> process(job)
end
@spec log(BullMQ.Job.t(), String.t()) :: :ok | {:error, term()}
Adds a log entry to a job.
Can be called from within the processor function.
Note: Consider using Job.log/3 instead, which provides the same functionality
with additional options.
@spec pause( GenServer.server(), keyword() ) :: :ok
Pauses the worker.
When paused, the worker will not pick up new jobs but will continue processing active jobs until completion.
Options
:force- Don't wait for active jobs to complete (default: false)
@spec paused?(GenServer.server()) :: boolean()
Checks if the worker is paused.
@spec resume(GenServer.server()) :: :ok
Resumes a paused worker.
@spec running?(GenServer.server()) :: boolean()
Checks if the worker is running (processing jobs).
@spec start_link(keyword()) :: GenServer.on_start()
Starts a worker process.
Options
:name- Process name for registration:queue- Queue name (required):connection- Redis connection (required):processor- Processor function (required):concurrency- Max concurrent jobs (default: 1):lock_duration- Lock TTL in ms (default: 30000):stalled_interval- Stalled check interval (default: 30000):max_stalled_count- Max stalls before failure (default: 1):limiter- Rate limiter config:autorun- Start processing immediately (default: true):prefix- Queue prefix (default: "bull")
@spec start_stalled_check_timer(GenServer.server()) :: :ok
Starts the stalled jobs checker timer.
When processing jobs manually, you should start this timer to ensure stalled jobs (whose locks have expired) are moved back to the waiting state or failed (if they've exceeded max stalled count).
The checker runs periodically based on the worker's stalled_interval option.
Example
{:ok, worker} = Worker.start_link(
queue: "my-queue",
connection: :redis,
processor: nil,
autorun: false,
stalled_interval: 30_000
)
:ok = Worker.start_stalled_check_timer(worker)
@spec stop_stalled_check_timer(GenServer.server()) :: :ok
Stops the stalled jobs checker timer.
Example
:ok = Worker.stop_stalled_check_timer(worker)
@spec update_data(BullMQ.Job.t(), term()) :: :ok | {:error, term()}
Updates the data of a job.
Can be called from within the processor function.
@spec update_progress(BullMQ.Job.t(), BullMQ.Types.job_progress()) :: :ok | {:error, term()}
Updates the progress of a job being processed.
Can only be called from within the processor function.
This updates the progress in Redis and emits a progress event that can be
received by QueueEvents subscribers and triggers the worker's on_progress callback.