BullMQ.Types (BullMQ v1.0.1)

View Source

Type definitions for BullMQ.

This module defines the core types used throughout the BullMQ library. All types are designed to be compatible with the Node.js BullMQ library.

Summary

Types

Backoff configuration.

Backoff strategy type.

Deduplication options.

Duration in milliseconds.

Error reason.

Worker failure behavior for parent jobs.

Finished job states.

Ignore dependency behavior.

Job data payload. Can be any JSON-serializable term.

Job identifier, typically a string representation of an integer or UUID.

Job JSON representation for Redis storage.

Job name/type identifier.

Job options for adding a job to the queue.

Job progress value - either a number (0-100) or custom progress data.

Job return value after processing.

Job state in the queue.

Job removal configuration.

Token used for job locking.

Metrics options.

Parent job reference options.

Priority level (0 = highest priority).

Processor result type.

Queue event types.

Queue name.

Queue options.

Queue settings.

Rate limiter configuration.

Redis connection specification.

Remove dependency behavior.

Repeat/scheduling options.

Result type with error.

Timestamp in milliseconds since Unix epoch.

Worker options.

Types

backoff_opts()

@type backoff_opts() :: %{
  optional(:type) => backoff_type(),
  optional(:delay) => duration_ms(),
  optional(:jitter) => float()
}

Backoff configuration.

backoff_type()

@type backoff_type() :: :fixed | :exponential | atom()

Backoff strategy type.

deduplication_opts()

@type deduplication_opts() :: %{
  :id => String.t(),
  optional(:ttl) => duration_ms(),
  optional(:extend) => boolean(),
  optional(:replace) => boolean()
}

Deduplication options.

Modes

  • Simple Mode: Only :id is provided. Jobs are deduplicated until completion or failure.
  • Throttle Mode: :id and :ttl provided. Jobs are deduplicated for the TTL duration.
  • Debounce Mode: :id, :ttl, :extend, and :replace all set. Each new job with the same ID extends the TTL and replaces the existing job data.

Options

  • :id - (required) Unique identifier for deduplication
  • :ttl - Time-to-live in milliseconds for the deduplication key
  • :extend - If true, extend the TTL on each duplicate job
  • :replace - If true, replace the job data when a duplicate is added (while delayed)

duration_ms()

@type duration_ms() :: non_neg_integer()

Duration in milliseconds.

error_reason()

@type error_reason() :: atom() | String.t() | Exception.t()

Error reason.

fail_parent_on_failure()

@type fail_parent_on_failure() :: boolean()

Worker failure behavior for parent jobs.

finished_status()

@type finished_status() :: :completed | :failed

Finished job states.

ignore_dependency()

@type ignore_dependency() :: boolean()

Ignore dependency behavior.

job_data()

@type job_data() :: map() | list() | String.t() | number() | boolean() | nil

Job data payload. Can be any JSON-serializable term.

job_id()

@type job_id() :: String.t()

Job identifier, typically a string representation of an integer or UUID.

job_json()

@type job_json() :: %{
  :id => job_id(),
  :name => job_name(),
  :data => String.t(),
  :opts => String.t(),
  :timestamp => timestamp_ms(),
  optional(:delay) => duration_ms(),
  optional(:priority) => priority(),
  optional(:processedOn) => timestamp_ms(),
  optional(:finishedOn) => timestamp_ms(),
  optional(:progress) => String.t(),
  optional(:returnvalue) => String.t(),
  optional(:failedReason) => String.t(),
  optional(:stacktrace) => String.t(),
  optional(:attemptsMade) => non_neg_integer(),
  optional(:attemptsStarted) => non_neg_integer(),
  optional(:stalledCounter) => non_neg_integer(),
  optional(:parentKey) => String.t(),
  optional(:parent) => String.t(),
  optional(:processedBy) => String.t(),
  optional(:rjk) => String.t(),
  optional(:deid) => String.t(),
  optional(:df) => String.t()
}

Job JSON representation for Redis storage.

job_name()

@type job_name() :: String.t()

Job name/type identifier.

job_opts()

@type job_opts() :: %{
  optional(:job_id) => job_id() | nil,
  optional(:priority) => priority(),
  optional(:delay) => duration_ms(),
  optional(:attempts) => pos_integer(),
  optional(:backoff) => backoff_opts(),
  optional(:lifo) => boolean(),
  optional(:timeout) => duration_ms(),
  optional(:remove_on_complete) => keep_jobs(),
  optional(:remove_on_fail) => keep_jobs(),
  optional(:timestamp) => timestamp_ms(),
  optional(:parent) => parent_opts(),
  optional(:repeat) => repeat_opts(),
  optional(:deduplication) => deduplication_opts(),
  optional(:fail_parent_on_failure) => fail_parent_on_failure(),
  optional(:ignore_dependency) => ignore_dependency(),
  optional(:remove_dependency) => remove_dependency(),
  optional(:telemetry_metadata) => String.t(),
  optional(:omit_context) => boolean()
}

Job options for adding a job to the queue.

job_progress()

@type job_progress() :: number() | map()

Job progress value - either a number (0-100) or custom progress data.

job_return_value()

@type job_return_value() :: term()

Job return value after processing.

job_state()

@type job_state() ::
  :waiting
  | :active
  | :delayed
  | :prioritized
  | :completed
  | :failed
  | :waiting_children
  | :unknown

Job state in the queue.

keep_jobs()

@type keep_jobs() ::
  boolean()
  | non_neg_integer()
  | %{age: duration_ms()}
  | %{count: non_neg_integer()}

Job removal configuration.

  • true - Remove immediately after completion/failure
  • false - Never remove
  • positive integer - Remove after this many milliseconds
  • %{age: ms} - Remove after job is older than this
  • %{count: n} - Keep only the last n jobs

lock_token()

@type lock_token() :: String.t()

Token used for job locking.

metrics_opts()

@type metrics_opts() :: %{optional(:max_data_points) => pos_integer()}

Metrics options.

parent_opts()

@type parent_opts() :: %{
  :id => job_id(),
  :queue => queue_name(),
  optional(:prefix) => String.t()
}

Parent job reference options.

priority()

@type priority() :: non_neg_integer()

Priority level (0 = highest priority).

processor_result()

@type processor_result() ::
  {:ok, job_return_value()}
  | :ok
  | {:error, term()}
  | {:delay, duration_ms()}
  | {:rate_limit, duration_ms()}
  | :waiting
  | :waiting_children

Processor result type.

Processors can return various tagged tuples to control job flow:

  • {:ok, result} - Job completed successfully with result
  • :ok - Job completed successfully (no result)
  • {:error, reason} - Job failed with error
  • {:delay, milliseconds} - Move job to delayed queue (does not increment attempts)
  • {:rate_limit, milliseconds} - Move job back to wait and pause worker for duration
  • :waiting - Move job back to waiting queue
  • :waiting_children - Move job to waiting-children state (wait for child jobs)

queue_event()

@type queue_event() ::
  :added
  | :waiting
  | :active
  | :progress
  | :completed
  | :failed
  | :delayed
  | :stalled
  | :removed
  | :drained
  | :paused
  | :resumed
  | :duplicated
  | :deduplicated
  | :retries_exhausted
  | :waiting_children
  | :cleaned

Queue event types.

queue_name()

@type queue_name() :: String.t()

Queue name.

queue_opts()

@type queue_opts() :: %{
  optional(:prefix) => String.t(),
  optional(:default_job_opts) => job_opts(),
  optional(:settings) => queue_settings(),
  optional(:telemetry) => module()
}

Queue options.

queue_settings()

@type queue_settings() :: %{
  optional(:stalled_interval) => duration_ms(),
  optional(:max_stalled_count) => non_neg_integer(),
  optional(:lock_duration) => duration_ms()
}

Queue settings.

rate_limiter_opts()

@type rate_limiter_opts() :: %{
  :max => pos_integer(),
  :duration => duration_ms(),
  optional(:group_key) => String.t()
}

Rate limiter configuration.

redis_connection()

@type redis_connection() :: atom() | pid() | Redix.connection()

Redis connection specification.

remove_dependency()

@type remove_dependency() :: boolean()

Remove dependency behavior.

repeat_opts()

@type repeat_opts() :: %{
  optional(:pattern) => String.t(),
  optional(:every) => duration_ms(),
  optional(:limit) => pos_integer(),
  optional(:start_date) => DateTime.t() | timestamp_ms(),
  optional(:end_date) => DateTime.t() | timestamp_ms(),
  optional(:tz) => String.t(),
  optional(:immediately) => boolean(),
  optional(:offset) => duration_ms(),
  optional(:count) => non_neg_integer()
}

Repeat/scheduling options.

result(ok_type)

@type result(ok_type) :: {:ok, ok_type} | {:error, error_reason()}

Result type with error.

timestamp_ms()

@type timestamp_ms() :: non_neg_integer()

Timestamp in milliseconds since Unix epoch.

worker_opts()

@type worker_opts() :: %{
  optional(:name) => atom() | String.t(),
  optional(:concurrency) => pos_integer(),
  optional(:lock_duration) => duration_ms(),
  optional(:lock_renew_time) => duration_ms(),
  optional(:stalled_interval) => duration_ms(),
  optional(:max_stalled_count) => non_neg_integer(),
  optional(:drain_delay) => duration_ms(),
  optional(:limiter) => rate_limiter_opts(),
  optional(:skip_stalled_check) => boolean(),
  optional(:remove_on_complete) => keep_jobs(),
  optional(:remove_on_fail) => keep_jobs(),
  optional(:autorun) => boolean(),
  optional(:prefix) => String.t(),
  optional(:metrics) => metrics_opts(),
  optional(:telemetry) => module(),
  optional(:on_completed) => (term(), term() -> any()),
  optional(:on_failed) => (term(), String.t() -> any()),
  optional(:on_error) => (term() -> any()),
  optional(:on_active) => (term() -> any()),
  optional(:on_progress) => (term(), term() -> any()),
  optional(:on_stalled) => (String.t() -> any()),
  optional(:on_lock_renewal_failed) => ([String.t()] -> any())
}

Worker options.

Most options have sensible defaults and don't need to be changed:

  • :lock_duration - Default: 30,000ms. Time before a job lock expires. Should normally not be changed unless you have jobs that legitimately take longer than 30 seconds between progress updates.

  • :stalled_interval - Default: 30,000ms. How often to check for stalled jobs. Should normally not be changed. Must be less than :lock_duration.

  • :max_stalled_count - Default: 1. Number of times a job can stall before being moved to failed. We consider stalled jobs a rare occurrence, so stalling more than once typically indicates a more serious issue (e.g., worker crashes, resource exhaustion). Increasing this value is not recommended unless you have a specific use case.