BullMQ.Job (BullMQ v1.0.1)

View Source

Represents a job in a BullMQ queue.

A job contains the data to be processed, along with metadata about its state, attempts, progress, and results. Jobs are persisted in Redis and can be processed by workers across multiple nodes.

Job Lifecycle

Jobs transition through the following states:

  1. waiting - Job is in the queue waiting to be processed
  2. active - Job is currently being processed by a worker
  3. delayed - Job is scheduled to be processed at a future time
  4. prioritized - Job is in the priority queue
  5. completed - Job finished successfully
  6. failed - Job failed after exhausting all retries
  7. waiting-children - Parent job waiting for child jobs to complete

Examples

# Jobs are typically created through the Queue module
{:ok, job} = BullMQ.Queue.add("my_queue", "email", %{to: "user@example.com"})

# Access job properties
job.id         #=> "1"
job.name       #=> "email"
job.data       #=> %{to: "user@example.com"}
job.state      #=> :waiting

# In a worker, you receive the job and can update progress
defmodule MyWorker do
  def process(job) do
    BullMQ.Job.update_progress(job, 50)
    result = do_work(job.data)
    BullMQ.Job.update_progress(job, 100)
    result
  end
end

Flow Methods

When processing parent jobs in a flow, you can access child results:

def process(job) do
  # Get children that completed successfully
  {:ok, children_values} = BullMQ.Job.get_children_values(job)

  # Get children that failed but were ignored
  {:ok, ignored_failures} = BullMQ.Job.get_ignored_children_failures(job)

  # Get pending dependencies
  {:ok, deps} = BullMQ.Job.get_dependencies(job)

  {:ok, aggregate(children_values)}
end

Summary

Functions

Checks if the job is currently being processed.

Calculates the backoff delay for the next retry attempt.

Checks if the job has been completed.

Calculates the delay before the job should be processed.

Checks if the job is delayed.

Returns the estimated state of the job based on its properties.

Extends the lock on a job.

Checks if the job has failed.

Formats the job for logging/display.

Reconstructs a job from Redis hash data.

Gets the return values of this job's children.

Gets the pending dependencies (unprocessed children) of this job.

Gets the count of pending dependencies for this job.

Gets the failures of child jobs that were explicitly ignored.

Checks if the job has a parent job (is a child in a flow).

Increments the attempts made counter.

Returns the Redis key for this job's hash.

Returns the Redis key for this job's lock.

Adds a log entry to the job.

Returns the Redis key for this job's logs.

Marks the job as started processing.

Marks the job as completed with a return value.

Marks the job as failed with an error.

Moves the job to the completed state.

Moves the job to the failed state.

Moves the job back to the wait state.

Creates a new job struct with the given parameters.

Checks if the job should be retried based on attempts configuration.

Converts a job to a map suitable for Redis storage.

Updates the job progress.

Types

t()

@type t() :: %BullMQ.Job{
  attempts_made: non_neg_integer(),
  attempts_started: non_neg_integer(),
  connection: BullMQ.Types.redis_connection() | nil,
  data: BullMQ.Types.job_data(),
  deduplication_id: String.t() | nil,
  deferred_failure: String.t() | nil,
  delay: BullMQ.Types.duration_ms(),
  failed_reason: String.t() | nil,
  finished_on: BullMQ.Types.timestamp_ms() | nil,
  id: BullMQ.Types.job_id(),
  name: BullMQ.Types.job_name(),
  opts: map(),
  parent: map() | nil,
  parent_key: String.t() | nil,
  prefix: String.t(),
  priority: BullMQ.Types.priority(),
  processed_by: String.t() | nil,
  processed_on: BullMQ.Types.timestamp_ms() | nil,
  progress: BullMQ.Types.job_progress(),
  queue_name: BullMQ.Types.queue_name(),
  repeat_job_key: String.t() | nil,
  return_value: term(),
  stacktrace: [String.t()],
  stalled_counter: non_neg_integer(),
  timestamp: BullMQ.Types.timestamp_ms(),
  token: BullMQ.Types.lock_token() | nil,
  worker: pid() | nil
}

Functions

active?(arg1)

@spec active?(t()) :: boolean()

Checks if the job is currently being processed.

calculate_backoff(job)

@spec calculate_backoff(t()) :: BullMQ.Types.duration_ms()

Calculates the backoff delay for the next retry attempt.

completed?(arg1)

@spec completed?(t()) :: boolean()

Checks if the job has been completed.

delay_until(job)

@spec delay_until(t()) :: BullMQ.Types.timestamp_ms()

Calculates the delay before the job should be processed.

Takes into account the job's delay option and timestamp.

delayed?(arg1)

@spec delayed?(t()) :: boolean()

Checks if the job is delayed.

estimated_state(job)

@spec estimated_state(t()) :: BullMQ.Types.job_state()

Returns the estimated state of the job based on its properties.

Note: For accurate state, use BullMQ.Queue.get_job_state/2 which checks Redis directly.

extend_lock(job, token, duration)

@spec extend_lock(t(), String.t(), non_neg_integer()) ::
  {:ok, term()} | {:error, term()}

Extends the lock on a job.

When manually processing jobs, locks are not automatically renewed. Call this method to extend the lock if processing takes longer than the lock duration.

Parameters

  • job - The job struct (must have connection set)
  • token - The lock token
  • duration - Duration in milliseconds to extend the lock

Returns

  • {:ok, result} - Lock extended successfully
  • {:error, reason} - Failed to extend lock

Examples

# Extend lock by 30 seconds
{:ok, _} = Job.extend_lock(job, token, 30_000)

failed?(arg1)

@spec failed?(t()) :: boolean()

Checks if the job has failed.

format(job)

@spec format(t()) :: String.t()

Formats the job for logging/display.

from_redis(job_id, queue_name, data, opts \\ [])

@spec from_redis(BullMQ.Types.job_id(), BullMQ.Types.queue_name(), map(), keyword()) ::
  t()

Reconstructs a job from Redis hash data.

Parameters

  • job_id - The job ID
  • queue_name - The queue name
  • data - Map of field-value pairs from Redis HGETALL
  • opts - Additional options like connection and prefix

get_children_values(job)

@spec get_children_values(t()) :: {:ok, map()} | {:error, term()}

Gets the return values of this job's children.

When a parent job is processed, it can access the results of all its completed children using this method.

Parameters

  • job - The job struct (must have connection set)

Returns

  • {:ok, map} - Map of child job keys to their return values
  • {:error, reason} - Failed to get children values

Examples

def process(job) do
  {:ok, children_values} = BullMQ.Job.get_children_values(job)
  # children_values: %{"bull:queue:123" => %{result: "done"}, ...}
  {:ok, aggregate(children_values)}
end

get_dependencies(job)

@spec get_dependencies(t()) :: {:ok, [String.t()]} | {:error, term()}

Gets the pending dependencies (unprocessed children) of this job.

Returns a list of child job keys that haven't completed yet.

Parameters

  • job - The job struct (must have connection set)

Returns

  • {:ok, list} - List of pending child job keys
  • {:error, reason} - Failed to get dependencies

Examples

{:ok, deps} = BullMQ.Job.get_dependencies(job)
# deps: ["bull:queue:789", "bull:queue:790"]

get_dependencies_count(job)

@spec get_dependencies_count(t()) :: {:ok, non_neg_integer()} | {:error, term()}

Gets the count of pending dependencies for this job.

Parameters

  • job - The job struct (must have connection set)

Returns

  • {:ok, count} - Number of pending child jobs
  • {:error, reason} - Failed to get count

Examples

{:ok, count} = BullMQ.Job.get_dependencies_count(job)
# count: 3

get_ignored_children_failures(job)

@spec get_ignored_children_failures(t()) :: {:ok, map()} | {:error, term()}

Gets the failures of child jobs that were explicitly ignored.

When using the ignore_dependency_on_failure option, failed children don't fail the parent. This method retrieves those ignored failures.

Parameters

  • job - The job struct (must have connection set)

Returns

  • {:ok, map} - Map of child job keys to their failure reasons
  • {:error, reason} - Failed to get ignored failures

Examples

{:ok, ignored} = BullMQ.Job.get_ignored_children_failures(job)
# ignored: %{"bull:queue:456" => "Timeout error", ...}

has_parent?(arg1)

@spec has_parent?(t()) :: boolean()

Checks if the job has a parent job (is a child in a flow).

increment_attempts(job)

@spec increment_attempts(t()) :: t()

Increments the attempts made counter.

key(job)

@spec key(t()) :: String.t()

Returns the Redis key for this job's hash.

lock_key(job)

@spec lock_key(t()) :: String.t()

Returns the Redis key for this job's lock.

log(job, message, opts \\ [])

@spec log(t(), String.t(), keyword()) :: {:ok, integer()} | {:error, term()}

Adds a log entry to the job.

Logs are stored in Redis and can be retrieved later. This is useful for tracking the progress of long-running jobs or debugging.

Example

def process(job) do
  Job.log(job, "Starting processing")
  result = do_work(job.data)
  Job.log(job, "Completed with result: #{inspect(result)}")
  {:ok, result}
end

Options

  • :keep_logs - Maximum number of log entries to keep. Older entries will be removed when this limit is exceeded. If not provided, all logs are kept.

Returns the total number of log entries for this job.

logs_key(job)

@spec logs_key(t()) :: String.t()

Returns the Redis key for this job's logs.

mark_as_active(job, token)

@spec mark_as_active(t(), BullMQ.Types.lock_token()) :: t()

Marks the job as started processing.

mark_as_completed(job, return_value)

@spec mark_as_completed(t(), term()) :: t()

Marks the job as completed with a return value.

mark_as_failed(job, reason, stacktrace \\ [])

@spec mark_as_failed(t(), String.t(), [String.t()]) :: t()

Marks the job as failed with an error.

move_to_completed(job, return_value, token, opts \\ [])

@spec move_to_completed(t(), term(), String.t(), keyword()) ::
  {:ok, nil | {list(), String.t()}} | {:error, term()}

Moves the job to the completed state.

This is used when manually processing jobs. The job will be marked as completed with the given return value.

Parameters

  • job - The job struct (must have connection and token set)
  • return_value - The result to store with the completed job
  • opts - Options:
    • :fetch_next - If true, returns the next job data (default: true)
    • :remove_on_complete - Job removal settings

Returns

  • {:ok, nil} - Job completed, no next job available
  • {:ok, {job_data, job_id}} - Job completed, next job available (when fetch_next: true)
  • {:error, reason} - Failed to move job

Examples

# Complete job and get next job
{:ok, next} = Job.move_to_completed(job, %{result: "done"}, token)

# Complete job without fetching next
{:ok, nil} = Job.move_to_completed(job, %{result: "done"}, token, fetch_next: false)

move_to_failed(job, error, token, opts \\ [])

@spec move_to_failed(t(), term(), String.t(), keyword()) ::
  {:ok, nil | {list(), String.t()}} | {:error, term()}

Moves the job to the failed state.

This is used when manually processing jobs. The job will be marked as failed with the given error.

Parameters

  • job - The job struct (must have connection and token set)
  • error - The error (can be an Exception or a string/term)
  • token - The lock token
  • opts - Options:
    • :fetch_next - If true, returns the next job data (default: false)
    • :remove_on_fail - Job removal settings

Returns

  • {:ok, nil} - Job failed, no next job available
  • {:ok, {job_data, job_id}} - Job failed, next job available (when fetch_next: true)
  • {:error, reason} - Failed to move job

Examples

# Fail job
{:ok, nil} = Job.move_to_failed(job, "Processing error", token)

# Fail job with exception
{:ok, nil} = Job.move_to_failed(job, %RuntimeError{message: "oops"}, token)

move_to_wait(job, token \\ "0")

@spec move_to_wait(t(), String.t()) :: {:ok, non_neg_integer()} | {:error, term()}

Moves the job back to the wait state.

This is useful when you need to release a job back to the queue, for example when rate limiting is applied. The job will be available for processing again.

Parameters

  • job - The job struct (must have connection set)
  • token - The lock token (use "0" if no token)

Returns

  • {:ok, pttl} - Job moved back to wait, returns the rate limit TTL (or 0)
  • {:error, reason} - Failed to move job

Examples

# Move job back to wait due to rate limiting
await Queue.rate_limit(queue, 60_000)
{:ok, _pttl} = Job.move_to_wait(job, token)

new(queue_name, name, data, opts \\ [])

Creates a new job struct with the given parameters.

Parameters

  • queue_name - The name of the queue
  • name - The job type/name identifier
  • data - The job payload data
  • opts - Job options (see BullMQ.Types.job_opts())

Examples

iex> job = BullMQ.Job.new("my_queue", "email", %{to: "test@example.com"})
iex> job.name
"email"

iex> job = BullMQ.Job.new("my_queue", "email", %{to: "test@example.com"},
...>   job_id: "custom-id", priority: 5)
iex> job.id
"custom-id"
iex> job.priority
5

should_retry?(job)

@spec should_retry?(t()) :: boolean()

Checks if the job should be retried based on attempts configuration.

to_redis(job)

@spec to_redis(t()) :: map()

Converts a job to a map suitable for Redis storage.

update_progress(job, progress)

@spec update_progress(t(), BullMQ.Types.job_progress()) :: t()

Updates the job progress.