BullMQ.Job (BullMQ v1.0.1)
View SourceRepresents a job in a BullMQ queue.
A job contains the data to be processed, along with metadata about its state, attempts, progress, and results. Jobs are persisted in Redis and can be processed by workers across multiple nodes.
Job Lifecycle
Jobs transition through the following states:
- waiting - Job is in the queue waiting to be processed
- active - Job is currently being processed by a worker
- delayed - Job is scheduled to be processed at a future time
- prioritized - Job is in the priority queue
- completed - Job finished successfully
- failed - Job failed after exhausting all retries
- waiting-children - Parent job waiting for child jobs to complete
Examples
# Jobs are typically created through the Queue module
{:ok, job} = BullMQ.Queue.add("my_queue", "email", %{to: "user@example.com"})
# Access job properties
job.id #=> "1"
job.name #=> "email"
job.data #=> %{to: "user@example.com"}
job.state #=> :waiting
# In a worker, you receive the job and can update progress
defmodule MyWorker do
def process(job) do
BullMQ.Job.update_progress(job, 50)
result = do_work(job.data)
BullMQ.Job.update_progress(job, 100)
result
end
endFlow Methods
When processing parent jobs in a flow, you can access child results:
def process(job) do
# Get children that completed successfully
{:ok, children_values} = BullMQ.Job.get_children_values(job)
# Get children that failed but were ignored
{:ok, ignored_failures} = BullMQ.Job.get_ignored_children_failures(job)
# Get pending dependencies
{:ok, deps} = BullMQ.Job.get_dependencies(job)
{:ok, aggregate(children_values)}
end
Summary
Functions
Checks if the job is currently being processed.
Calculates the backoff delay for the next retry attempt.
Checks if the job has been completed.
Calculates the delay before the job should be processed.
Checks if the job is delayed.
Returns the estimated state of the job based on its properties.
Extends the lock on a job.
Checks if the job has failed.
Formats the job for logging/display.
Reconstructs a job from Redis hash data.
Gets the return values of this job's children.
Gets the pending dependencies (unprocessed children) of this job.
Gets the count of pending dependencies for this job.
Gets the failures of child jobs that were explicitly ignored.
Checks if the job has a parent job (is a child in a flow).
Increments the attempts made counter.
Returns the Redis key for this job's hash.
Returns the Redis key for this job's lock.
Adds a log entry to the job.
Returns the Redis key for this job's logs.
Marks the job as started processing.
Marks the job as completed with a return value.
Marks the job as failed with an error.
Moves the job to the completed state.
Moves the job to the failed state.
Moves the job back to the wait state.
Creates a new job struct with the given parameters.
Checks if the job should be retried based on attempts configuration.
Converts a job to a map suitable for Redis storage.
Updates the job progress.
Types
@type t() :: %BullMQ.Job{ attempts_made: non_neg_integer(), attempts_started: non_neg_integer(), connection: BullMQ.Types.redis_connection() | nil, data: BullMQ.Types.job_data(), deduplication_id: String.t() | nil, deferred_failure: String.t() | nil, delay: BullMQ.Types.duration_ms(), failed_reason: String.t() | nil, finished_on: BullMQ.Types.timestamp_ms() | nil, id: BullMQ.Types.job_id(), name: BullMQ.Types.job_name(), opts: map(), parent: map() | nil, parent_key: String.t() | nil, prefix: String.t(), priority: BullMQ.Types.priority(), processed_by: String.t() | nil, processed_on: BullMQ.Types.timestamp_ms() | nil, progress: BullMQ.Types.job_progress(), queue_name: BullMQ.Types.queue_name(), repeat_job_key: String.t() | nil, return_value: term(), stacktrace: [String.t()], stalled_counter: non_neg_integer(), timestamp: BullMQ.Types.timestamp_ms(), token: BullMQ.Types.lock_token() | nil, worker: pid() | nil }
Functions
Checks if the job is currently being processed.
@spec calculate_backoff(t()) :: BullMQ.Types.duration_ms()
Calculates the backoff delay for the next retry attempt.
Checks if the job has been completed.
@spec delay_until(t()) :: BullMQ.Types.timestamp_ms()
Calculates the delay before the job should be processed.
Takes into account the job's delay option and timestamp.
Checks if the job is delayed.
@spec estimated_state(t()) :: BullMQ.Types.job_state()
Returns the estimated state of the job based on its properties.
Note: For accurate state, use BullMQ.Queue.get_job_state/2 which
checks Redis directly.
@spec extend_lock(t(), String.t(), non_neg_integer()) :: {:ok, term()} | {:error, term()}
Extends the lock on a job.
When manually processing jobs, locks are not automatically renewed. Call this method to extend the lock if processing takes longer than the lock duration.
Parameters
job- The job struct (must haveconnectionset)token- The lock tokenduration- Duration in milliseconds to extend the lock
Returns
{:ok, result}- Lock extended successfully{:error, reason}- Failed to extend lock
Examples
# Extend lock by 30 seconds
{:ok, _} = Job.extend_lock(job, token, 30_000)
Checks if the job has failed.
Formats the job for logging/display.
@spec from_redis(BullMQ.Types.job_id(), BullMQ.Types.queue_name(), map(), keyword()) :: t()
Reconstructs a job from Redis hash data.
Parameters
job_id- The job IDqueue_name- The queue namedata- Map of field-value pairs from Redis HGETALLopts- Additional options like connection and prefix
Gets the return values of this job's children.
When a parent job is processed, it can access the results of all its completed children using this method.
Parameters
job- The job struct (must haveconnectionset)
Returns
{:ok, map}- Map of child job keys to their return values{:error, reason}- Failed to get children values
Examples
def process(job) do
{:ok, children_values} = BullMQ.Job.get_children_values(job)
# children_values: %{"bull:queue:123" => %{result: "done"}, ...}
{:ok, aggregate(children_values)}
end
Gets the pending dependencies (unprocessed children) of this job.
Returns a list of child job keys that haven't completed yet.
Parameters
job- The job struct (must haveconnectionset)
Returns
{:ok, list}- List of pending child job keys{:error, reason}- Failed to get dependencies
Examples
{:ok, deps} = BullMQ.Job.get_dependencies(job)
# deps: ["bull:queue:789", "bull:queue:790"]
@spec get_dependencies_count(t()) :: {:ok, non_neg_integer()} | {:error, term()}
Gets the count of pending dependencies for this job.
Parameters
job- The job struct (must haveconnectionset)
Returns
{:ok, count}- Number of pending child jobs{:error, reason}- Failed to get count
Examples
{:ok, count} = BullMQ.Job.get_dependencies_count(job)
# count: 3
Gets the failures of child jobs that were explicitly ignored.
When using the ignore_dependency_on_failure option, failed children
don't fail the parent. This method retrieves those ignored failures.
Parameters
job- The job struct (must haveconnectionset)
Returns
{:ok, map}- Map of child job keys to their failure reasons{:error, reason}- Failed to get ignored failures
Examples
{:ok, ignored} = BullMQ.Job.get_ignored_children_failures(job)
# ignored: %{"bull:queue:456" => "Timeout error", ...}
Checks if the job has a parent job (is a child in a flow).
Increments the attempts made counter.
Returns the Redis key for this job's hash.
Returns the Redis key for this job's lock.
Adds a log entry to the job.
Logs are stored in Redis and can be retrieved later. This is useful for tracking the progress of long-running jobs or debugging.
Example
def process(job) do
Job.log(job, "Starting processing")
result = do_work(job.data)
Job.log(job, "Completed with result: #{inspect(result)}")
{:ok, result}
endOptions
:keep_logs- Maximum number of log entries to keep. Older entries will be removed when this limit is exceeded. If not provided, all logs are kept.
Returns the total number of log entries for this job.
Returns the Redis key for this job's logs.
@spec mark_as_active(t(), BullMQ.Types.lock_token()) :: t()
Marks the job as started processing.
Marks the job as completed with a return value.
Marks the job as failed with an error.
@spec move_to_completed(t(), term(), String.t(), keyword()) :: {:ok, nil | {list(), String.t()}} | {:error, term()}
Moves the job to the completed state.
This is used when manually processing jobs. The job will be marked as completed with the given return value.
Parameters
job- The job struct (must haveconnectionandtokenset)return_value- The result to store with the completed jobopts- Options::fetch_next- Iftrue, returns the next job data (default:true):remove_on_complete- Job removal settings
Returns
{:ok, nil}- Job completed, no next job available{:ok, {job_data, job_id}}- Job completed, next job available (whenfetch_next: true){:error, reason}- Failed to move job
Examples
# Complete job and get next job
{:ok, next} = Job.move_to_completed(job, %{result: "done"}, token)
# Complete job without fetching next
{:ok, nil} = Job.move_to_completed(job, %{result: "done"}, token, fetch_next: false)
@spec move_to_failed(t(), term(), String.t(), keyword()) :: {:ok, nil | {list(), String.t()}} | {:error, term()}
Moves the job to the failed state.
This is used when manually processing jobs. The job will be marked as failed with the given error.
Parameters
job- The job struct (must haveconnectionandtokenset)error- The error (can be an Exception or a string/term)token- The lock tokenopts- Options::fetch_next- Iftrue, returns the next job data (default:false):remove_on_fail- Job removal settings
Returns
{:ok, nil}- Job failed, no next job available{:ok, {job_data, job_id}}- Job failed, next job available (whenfetch_next: true){:error, reason}- Failed to move job
Examples
# Fail job
{:ok, nil} = Job.move_to_failed(job, "Processing error", token)
# Fail job with exception
{:ok, nil} = Job.move_to_failed(job, %RuntimeError{message: "oops"}, token)
@spec move_to_wait(t(), String.t()) :: {:ok, non_neg_integer()} | {:error, term()}
Moves the job back to the wait state.
This is useful when you need to release a job back to the queue, for example when rate limiting is applied. The job will be available for processing again.
Parameters
job- The job struct (must haveconnectionset)token- The lock token (use "0" if no token)
Returns
{:ok, pttl}- Job moved back to wait, returns the rate limit TTL (or 0){:error, reason}- Failed to move job
Examples
# Move job back to wait due to rate limiting
await Queue.rate_limit(queue, 60_000)
{:ok, _pttl} = Job.move_to_wait(job, token)
@spec new( BullMQ.Types.queue_name(), BullMQ.Types.job_name(), BullMQ.Types.job_data(), keyword() | map() ) :: t()
Creates a new job struct with the given parameters.
Parameters
queue_name- The name of the queuename- The job type/name identifierdata- The job payload dataopts- Job options (seeBullMQ.Types.job_opts())
Examples
iex> job = BullMQ.Job.new("my_queue", "email", %{to: "test@example.com"})
iex> job.name
"email"
iex> job = BullMQ.Job.new("my_queue", "email", %{to: "test@example.com"},
...> job_id: "custom-id", priority: 5)
iex> job.id
"custom-id"
iex> job.priority
5
Checks if the job should be retried based on attempts configuration.
Converts a job to a map suitable for Redis storage.
@spec update_progress(t(), BullMQ.Types.job_progress()) :: t()
Updates the job progress.