BullMQ.Worker (BullMQ v1.0.1)

View Source

Worker process for processing BullMQ jobs.

A Worker is responsible for fetching jobs from a queue and processing them. It supports configurable concurrency, automatic retries, rate limiting, and stalled job detection.

Usage

Add a worker to your supervision tree:

children = [
  {BullMQ.RedisConnection, name: :redis, url: "redis://localhost:6379"},
  {BullMQ.Worker,
    name: :my_worker,
    queue: "my_queue",
    connection: :redis,
    processor: &MyApp.Jobs.process/1,
    concurrency: 10}
]

Processor Function

The processor function receives a BullMQ.Job struct and should return:

  • {:ok, result} - Job completed successfully with result
  • :ok - Job completed successfully (no return value)
  • {:error, reason} - Job failed with reason
  • {:delay, milliseconds} - Delay job and retry later (does not increment attempts)
  • {:rate_limit, milliseconds} - Move job to delayed due to rate limiting
  • :waiting - Move job back to waiting queue
  • :waiting_children - Move job to waiting-children state (waits for child jobs)

Examples

defmodule MyApp.Jobs do
  def process(%BullMQ.Job{name: "email", data: data}) do
    case send_email(data) do
      :ok -> {:ok, %{sent: true}}
      {:error, reason} -> {:error, reason}
    end
  end

  def process(%BullMQ.Job{name: "heavy_task", data: data} = job) do
    # Update progress
    BullMQ.Worker.update_progress(job, 0)

    result = do_work(data)

    BullMQ.Worker.update_progress(job, 100)
    {:ok, result}
  end
end

Options

  • :name - Process name for registration (atom)
  • :queue - Queue name to process (required, string)
  • :connection - Redis connection name or pid (required)
  • :processor - Job processor function (required). Set to nil with autorun: false for manual processing
  • :prefix - Redis key prefix (default: "bull")
  • :concurrency - Max concurrent jobs (default: 1)
  • :lock_duration - Job lock TTL in ms (default: 30000)
  • :stalled_interval - Stalled check interval in ms (default: 30000)
  • :max_stalled_count - Max stalls before failure (default: 1)
  • :limiter - Rate limiter config %{max: n, duration: ms}
  • :autorun - Start processing immediately (default: true)

Event Callbacks

  • :on_completed - fn job, result -> ... end when a job completes
  • :on_failed - fn job, reason -> ... end when a job fails
  • :on_error - fn error -> ... end for worker errors
  • :on_active - fn job -> ... end when a job becomes active
  • :on_progress - fn job, progress -> ... end on progress updates
  • :on_stalled - fn job_id -> ... end when a job stalls

Concurrency

Unlike Node.js which uses a single thread with async operations, Elixir workers use true parallelism with multiple processes. Each concurrent job runs in its own process under the worker's supervision.

Summary

Functions

Gets the count of currently active jobs.

Gets a list of all active job IDs.

Cancels all jobs currently being processed by this worker.

Cancels a specific job currently being processed by this worker.

Returns a specification to start this module under a supervisor.

Closes the worker gracefully.

Manually fetches the next job from the queue.

Adds a log entry to a job.

Pauses the worker.

Checks if the worker is paused.

Resumes a paused worker.

Checks if the worker is running (processing jobs).

Starts a worker process.

Starts the stalled jobs checker timer.

Stops the stalled jobs checker timer.

Updates the data of a job.

Updates the progress of a job being processed.

Types

processor()

@type processor() ::
  (BullMQ.Job.t() ->
     {:ok, term()} | :ok | {:error, term()} | {:delay, non_neg_integer()})
  | (BullMQ.Job.t(), String.t() ->
       {:ok, term()} | :ok | {:error, term()} | {:delay, non_neg_integer()})
  | (BullMQ.Job.t(), String.t(), BullMQ.CancellationToken.t() ->
       {:ok, term()} | :ok | {:error, term()} | {:delay, non_neg_integer()})

t()

@type t() :: %BullMQ.Worker{
  active_jobs: map(),
  blocking_conn: pid() | nil,
  cancellation_tokens: map(),
  closing: boolean(),
  concurrency: pos_integer(),
  connection: BullMQ.Types.redis_connection(),
  keys: BullMQ.Keys.queue_context(),
  limiter: map() | nil,
  lock_duration: pos_integer(),
  lock_manager: pid() | nil,
  max_stalled_count: non_neg_integer(),
  name: atom() | nil,
  on_active: (BullMQ.Job.t() -> any()) | nil,
  on_completed: (BullMQ.Job.t(), term() -> any()) | nil,
  on_error: (term() -> any()) | nil,
  on_failed: (BullMQ.Job.t(), String.t() -> any()) | nil,
  on_lock_renewal_failed: ([String.t()] -> any()) | nil,
  on_progress: (BullMQ.Job.t(), term() -> any()) | nil,
  on_stalled: (String.t() -> any()) | nil,
  opts: map(),
  paused: boolean(),
  prefix: String.t(),
  processor: processor(),
  processor_supports_cancellation: boolean(),
  queue_name: String.t(),
  running: boolean(),
  stalled_interval: pos_integer(),
  stalled_timer: reference() | nil,
  telemetry: module() | nil,
  token: String.t()
}

Functions

active_count(worker)

@spec active_count(GenServer.server()) :: non_neg_integer()

Gets the count of currently active jobs.

active_job_ids(worker)

@spec active_job_ids(GenServer.server()) :: [String.t()]

Gets a list of all active job IDs.

cancel_all_jobs(worker, reason \\ nil)

@spec cancel_all_jobs(GenServer.server(), BullMQ.CancellationToken.reason()) :: :ok

Cancels all jobs currently being processed by this worker.

All active job processor functions will be notified of cancellation.

Parameters

  • worker - The worker process
  • reason - Optional reason for the cancellation (default: nil)

Example

BullMQ.Worker.cancel_all_jobs(worker, "System shutdown")

cancel_job(worker, job_id, reason \\ nil)

Cancels a specific job currently being processed by this worker.

The job's processor function can detect cancellation by:

  • Checking BullMQ.CancellationToken.cancelled?(cancel_token)
  • Subscribing with BullMQ.CancellationToken.subscribe(cancel_token)

Parameters

  • worker - The worker process
  • job_id - The ID of the job to cancel
  • reason - Optional reason for the cancellation (default: nil)

Returns

  • true if the job was found and cancelled
  • false if the job was not found (not active)

Example

BullMQ.Worker.cancel_job(worker, "job-123", "User requested cancellation")

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

close(worker, opts \\ [])

@spec close(
  GenServer.server(),
  keyword()
) :: :ok

Closes the worker gracefully.

Options

  • :force - Don't wait for active jobs (default: false)
  • :timeout - Max wait time in ms (default: 30000)

get_next_job(worker, token, opts \\ [])

@spec get_next_job(GenServer.server(), String.t(), keyword()) ::
  {:ok, BullMQ.Job.t() | nil} | {:error, term()}

Manually fetches the next job from the queue.

This is used for manual job processing where you want to control the job lifecycle yourself instead of using automatic worker processing.

When processing jobs manually:

  1. Create a worker without a processor (set autorun: false)
  2. Call get_next_job/2 to fetch jobs
  3. Process the job
  4. Call Job.move_to_completed/4 or Job.move_to_failed/4
  5. Optionally call start_stalled_check_timer/1 to handle stalled jobs

Parameters

  • worker - The worker process
  • token - A unique token representing this worker's ownership of the job

Options

  • :block - If true (default), uses BZPOPMIN to efficiently wait for a job.
           If `false`, returns immediately with `nil` if no job is available.
  • :timeout - Timeout in seconds for blocking wait (default: 5). Only used when
             `block: true`. After timeout, returns `{:ok, nil}`.

Returns

  • {:ok, job} - A job was fetched successfully
  • {:ok, nil} - No job available (timeout or block: false) or worker is paused/closing
  • {:error, reason} - An error occurred

Blocking Behavior

When block: true (the default), this function uses Redis's BZPOPMIN command to efficiently wait for jobs without polling. This is the same mechanism used by Node.js BullMQ. The function will:

  1. First try to fetch a job immediately
  2. If no job is available, wait using BZPOPMIN on the marker key
  3. When a job becomes available (marker is set), fetch and return it
  4. If timeout is reached, return {:ok, nil}

Token

The token represents ownership of the job's lock. Use a unique value (like a UUID) for each job. The same token must be passed to Job.move_to_completed/4, Job.move_to_failed/4, or Job.extend_lock/3.

Example

# Create a worker without automatic processing
{:ok, worker} = Worker.start_link(
  queue: "my-queue",
  connection: :redis,
  processor: nil,
  autorun: false
)

# Start stalled job checker
:ok = Worker.start_stalled_check_timer(worker)

# Processing loop - blocks up to 10 seconds waiting for jobs
token = UUID.uuid4()

case Worker.get_next_job(worker, token, timeout: 10) do
  {:ok, nil} ->
    # Timeout - no job available
    :ok

  {:ok, job} ->
    # Process the job
    case do_work(job.data) do
      {:ok, result} ->
        Job.move_to_completed(job, result, token)
      {:error, reason} ->
        Job.move_to_failed(job, reason, token)
    end
end

# Non-blocking fetch
case Worker.get_next_job(worker, token, block: false) do
  {:ok, nil} -> :no_job
  {:ok, job} -> process(job)
end

log(job, message)

@spec log(BullMQ.Job.t(), String.t()) :: :ok | {:error, term()}

Adds a log entry to a job.

Can be called from within the processor function.

Note: Consider using Job.log/3 instead, which provides the same functionality with additional options.

pause(worker, opts \\ [])

@spec pause(
  GenServer.server(),
  keyword()
) :: :ok

Pauses the worker.

When paused, the worker will not pick up new jobs but will continue processing active jobs until completion.

Options

  • :force - Don't wait for active jobs to complete (default: false)

paused?(worker)

@spec paused?(GenServer.server()) :: boolean()

Checks if the worker is paused.

resume(worker)

@spec resume(GenServer.server()) :: :ok

Resumes a paused worker.

running?(worker)

@spec running?(GenServer.server()) :: boolean()

Checks if the worker is running (processing jobs).

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a worker process.

Options

  • :name - Process name for registration
  • :queue - Queue name (required)
  • :connection - Redis connection (required)
  • :processor - Processor function (required)
  • :concurrency - Max concurrent jobs (default: 1)
  • :lock_duration - Lock TTL in ms (default: 30000)
  • :stalled_interval - Stalled check interval (default: 30000)
  • :max_stalled_count - Max stalls before failure (default: 1)
  • :limiter - Rate limiter config
  • :autorun - Start processing immediately (default: true)
  • :prefix - Queue prefix (default: "bull")

start_stalled_check_timer(worker)

@spec start_stalled_check_timer(GenServer.server()) :: :ok

Starts the stalled jobs checker timer.

When processing jobs manually, you should start this timer to ensure stalled jobs (whose locks have expired) are moved back to the waiting state or failed (if they've exceeded max stalled count).

The checker runs periodically based on the worker's stalled_interval option.

Example

{:ok, worker} = Worker.start_link(
  queue: "my-queue",
  connection: :redis,
  processor: nil,
  autorun: false,
  stalled_interval: 30_000
)

:ok = Worker.start_stalled_check_timer(worker)

stop_stalled_check_timer(worker)

@spec stop_stalled_check_timer(GenServer.server()) :: :ok

Stops the stalled jobs checker timer.

Example

:ok = Worker.stop_stalled_check_timer(worker)

update_data(job, data)

@spec update_data(BullMQ.Job.t(), term()) :: :ok | {:error, term()}

Updates the data of a job.

Can be called from within the processor function.

update_progress(job, progress)

@spec update_progress(BullMQ.Job.t(), BullMQ.Types.job_progress()) ::
  :ok | {:error, term()}

Updates the progress of a job being processed.

Can only be called from within the processor function. This updates the progress in Redis and emits a progress event that can be received by QueueEvents subscribers and triggers the worker's on_progress callback.