BullMQ.Scripts (BullMQ v1.0.1)

View Source

Manages Lua scripts for BullMQ Redis operations.

This module loads Lua scripts from priv/scripts at compile time, extracting the number of keys from the filename pattern scriptName-numberOfKeys.lua.

All scripts are loaded and cached for efficient execution using Redis EVALSHA.

Script Location

Scripts are copied from the root rawScripts/ directory to priv/scripts/ before compilation. Run mix scripts.copy to update the scripts, or they will be copied automatically during CI builds.

Summary

Functions

Adds a delayed job to the queue.

Adds a parent job to the queue (waiting-children state).

Adds a prioritized job to the queue.

Adds a standard job to the queue.

Adds multiple standard jobs atomically in a single transaction (MULTI/EXEC). Much more efficient than calling add_standard_job multiple times.

Builds a command for adding a parent job without executing it. Used for building flow transactions where all jobs are added atomically.

Builds a command for adding a standard job without executing it. Used for pipelining multiple job additions.

Builds a Redis command for a Lua script without executing it. Useful for pipelining multiple script calls.

Drains the queue (removes all jobs).

Ensures scripts are loaded into Redis cache by executing a dummy SCRIPT LOAD. Call this before using pipelined operations to avoid NOSCRIPT errors.

Executes a Lua script against Redis by name.

Executes multiple script commands in a pipeline. Returns a list of results in the same order as the commands.

Executes a raw Lua script against Redis.

Executes multiple script commands in a Redis transaction (MULTI/EXEC). All commands are executed atomically - either all succeed or none do.

Checks if a script exists.

Extends locks for multiple jobs in a single call.

Returns the script content and number of keys for a given script name.

Returns the script content for a given script name.

Gets job counts for the queue.

Returns the number of keys for a given script.

Gets queue metrics for completed or failed jobs.

Gets the state of a job.

Checks if the queue is at its max limit.

Lists all available script names.

Moves a job from active state back to wait.

Moves a job to the active state for processing.

Moves a job to delayed state (for retry with delay).

Moves a job to finished (completed/failed) state.

Moves a job from active to waiting-children state.

Obliterates the queue (removes everything including meta).

Pauses or resumes the queue.

Promotes a delayed job to wait.

Releases the lock on a job.

Removes a job from the queue.

Updates the data of a job.

Updates the progress of a job.

Types

queue_context()

@type queue_context() :: BullMQ.Keys.queue_context()

script_name()

@type script_name() :: atom()

script_result()

@type script_result() :: {:ok, any()} | {:error, any()}

Functions

add_delayed_job(conn, ctx, job, opts)

@spec add_delayed_job(atom(), queue_context(), map() | struct(), map()) ::
  script_result()

Adds a delayed job to the queue.

add_log(conn, ctx, job_id, log_message, keep_logs)

@spec add_log(
  atom(),
  queue_context(),
  String.t(),
  String.t(),
  non_neg_integer() | nil
) ::
  script_result()

Adds a log entry to a job.

add_parent_job(conn, ctx, job, opts)

@spec add_parent_job(atom(), queue_context(), map() | struct(), map()) ::
  script_result()

Adds a parent job to the queue (waiting-children state).

Parent jobs are added in waiting-children state until all children complete. Used by FlowProducer to create job hierarchies.

add_prioritized_job(conn, ctx, job, opts)

@spec add_prioritized_job(atom(), queue_context(), map() | struct(), map()) ::
  script_result()

Adds a prioritized job to the queue.

add_standard_job(conn, ctx, job, opts)

@spec add_standard_job(atom(), queue_context(), map() | struct(), map()) ::
  script_result()

Adds a standard job to the queue.

add_standard_jobs_pipelined(conn, ctx, jobs_with_opts)

@spec add_standard_jobs_pipelined(atom(), queue_context(), [{map() | struct(), map()}]) ::
  {:ok, [String.t()]} | {:error, term()}

Adds multiple standard jobs atomically in a single transaction (MULTI/EXEC). Much more efficient than calling add_standard_job multiple times.

This operation is atomic - all jobs are added or none are.

Returns {:ok, job_ids} on success or {:error, reason} on failure.

build_add_parent_job_command(ctx, job, opts)

@spec build_add_parent_job_command(queue_context(), map() | struct(), map()) ::
  {:ok, [String.t()]}

Builds a command for adding a parent job without executing it. Used for building flow transactions where all jobs are added atomically.

build_add_standard_job_command(ctx, job, opts)

@spec build_add_standard_job_command(queue_context(), map() | struct(), map()) ::
  {:ok, [String.t()]}

Builds a command for adding a standard job without executing it. Used for pipelining multiple job additions.

build_command(script_name, keys, args)

@spec build_command(script_name(), [String.t()], [any()]) ::
  {:ok, [String.t()]} | {:error, term()}

Builds a Redis command for a Lua script without executing it. Useful for pipelining multiple script calls.

Returns {:ok, command} where command is a list that can be passed to Redis pipeline, or {:error, reason} if the script is not found.

drain(conn, ctx, delayed?)

@spec drain(atom(), queue_context(), boolean()) :: script_result()

Drains the queue (removes all jobs).

ensure_scripts_loaded(conn, script_names)

@spec ensure_scripts_loaded(atom(), [script_name()]) :: :ok | {:error, term()}

Ensures scripts are loaded into Redis cache by executing a dummy SCRIPT LOAD. Call this before using pipelined operations to avoid NOSCRIPT errors.

execute(conn, script_name, keys, args)

@spec execute(atom(), script_name(), [String.t()], [any()]) :: script_result()

Executes a Lua script against Redis by name.

Uses EVALSHA for efficiency, falling back to EVAL if the script is not yet cached in Redis.

Parameters

  • conn - The Redis connection pool name
  • script_name - The script name as an atom
  • keys - List of Redis keys
  • args - List of arguments to pass to the script

Returns

  • {:ok, result} on success
  • {:error, reason} on failure

execute_pipeline(conn, commands)

@spec execute_pipeline(atom(), [[String.t()]]) :: {:ok, [any()]} | {:error, term()}

Executes multiple script commands in a pipeline. Returns a list of results in the same order as the commands.

Note: If any script is not cached (NOSCRIPT error), this will fail. Use ensure_scripts_loaded/2 first to cache scripts.

execute_raw(conn, script, keys, args)

@spec execute_raw(atom(), String.t(), [String.t()], [any()]) :: script_result()

Executes a raw Lua script against Redis.

execute_transaction(conn, commands)

@spec execute_transaction(atom(), [[String.t()]]) :: {:ok, [any()]} | {:error, term()}

Executes multiple script commands in a Redis transaction (MULTI/EXEC). All commands are executed atomically - either all succeed or none do.

Returns {:ok, results} on success or {:error, reason} on failure.

Note: If any script is not cached (NOSCRIPT error), this will fail. Use ensure_scripts_loaded/2 first to cache scripts.

exists?(name)

@spec exists?(script_name()) :: boolean()

Checks if a script exists.

extend_lock(conn, ctx, job_id, token, duration)

@spec extend_lock(atom(), queue_context(), String.t(), String.t(), non_neg_integer()) ::
  script_result()

Extends the lock on a job.

extend_locks(conn, ctx, job_ids, tokens, duration)

@spec extend_locks(
  atom(),
  queue_context(),
  [String.t()],
  [String.t()],
  non_neg_integer()
) ::
  script_result()

Extends locks for multiple jobs in a single call.

This is more efficient than calling extend_lock multiple times when processing many concurrent jobs, as it uses a single Redis call.

Returns a list of results (1 for success, job_id for failures).

get(name)

@spec get(script_name()) :: {String.t(), non_neg_integer()} | nil

Returns the script content and number of keys for a given script name.

Parameters

  • name - The script name as an atom (e.g., :extend_lock, :move_to_active)

Returns

  • {content, key_count} tuple if script exists
  • nil if script not found

Examples

iex> {content, keys} = BullMQ.Scripts.get(:extend_lock)
iex> is_binary(content) and is_integer(keys)
true

get_content(name)

@spec get_content(script_name()) :: String.t() | nil

Returns the script content for a given script name.

get_counts(conn, ctx)

@spec get_counts(atom(), queue_context()) :: script_result()

Gets job counts for the queue.

get_key_count(name)

@spec get_key_count(script_name()) :: non_neg_integer() | nil

Returns the number of keys for a given script.

get_metrics(conn, ctx, type, start_idx \\ 0, end_idx \\ -1)

@spec get_metrics(atom(), queue_context(), :completed | :failed, integer(), integer()) ::
  script_result()

Gets queue metrics for completed or failed jobs.

Returns metrics data including count, previous timestamp, previous count, data points, and total number of points.

get_rate_limit_ttl(conn, ctx, opts \\ [])

@spec get_rate_limit_ttl(atom(), queue_context(), keyword()) :: script_result()

Gets the rate limit TTL.

Options

  • :max_jobs - Maximum jobs for rate limit (default: 0, uses meta key)

get_state(conn, ctx, job_id)

@spec get_state(atom(), queue_context(), String.t()) :: script_result()

Gets the state of a job.

is_maxed(conn, ctx)

@spec is_maxed(atom(), queue_context()) :: script_result()

Checks if the queue is at its max limit.

list_scripts()

@spec list_scripts() :: [script_name()]

Lists all available script names.

move_job_from_active_to_wait(conn, ctx, job_id, token \\ "0")

@spec move_job_from_active_to_wait(atom(), queue_context(), String.t(), String.t()) ::
  script_result()

Moves a job from active state back to wait.

This is useful when manually processing jobs and you need to release a job back to the queue (e.g., due to rate limiting).

Returns

  • {:ok, pttl} - The rate limit TTL in milliseconds (0 if no rate limit)

move_stalled_jobs_to_wait(conn, ctx, max_stalled_count, opts \\ [])

@spec move_stalled_jobs_to_wait(atom(), queue_context(), non_neg_integer(), keyword()) ::
  script_result()

Moves stalled jobs back to wait.

move_to_active(conn, ctx, token, opts \\ [])

@spec move_to_active(atom(), queue_context(), String.t(), keyword()) ::
  script_result()

Moves a job to the active state for processing.

move_to_completed(conn, ctx, job_id, token, return_value, opts \\ [])

@spec move_to_completed(
  atom(),
  queue_context(),
  String.t(),
  String.t(),
  any(),
  keyword()
) ::
  script_result()

Moves a job to completed state.

move_to_delayed(conn, ctx, job_id, token, delay, opts \\ [])

@spec move_to_delayed(
  atom(),
  queue_context(),
  String.t(),
  String.t(),
  non_neg_integer(),
  keyword()
) :: script_result()

Moves a job to delayed state (for retry with delay).

move_to_failed(conn, ctx, job_id, token, error, opts \\ [])

@spec move_to_failed(
  atom(),
  queue_context(),
  String.t(),
  String.t(),
  any(),
  keyword()
) ::
  script_result()

Moves a job to failed state.

move_to_finished(conn, ctx, job_id, token, result, target, opts \\ [])

@spec move_to_finished(
  atom(),
  queue_context(),
  String.t(),
  String.t(),
  any(),
  atom(),
  keyword()
) ::
  script_result()

Moves a job to finished (completed/failed) state.

move_to_waiting_children(conn, ctx, job_id, token, opts \\ [])

@spec move_to_waiting_children(
  atom(),
  queue_context(),
  String.t(),
  String.t(),
  keyword()
) ::
  script_result()

Moves a job from active to waiting-children state.

This is used when a job needs to wait for its child jobs to complete before continuing. The job will be automatically moved back to waiting when all children complete.

Returns

  • {:ok, 0} - Successfully moved to waiting-children
  • {:ok, 1} - No pending dependencies
  • {:ok, -1} - Missing job
  • {:ok, -2} - Missing lock
  • {:ok, -3} - Job not in active set
  • {:ok, -9} - Job has failed children

obliterate(conn, ctx, count)

@spec obliterate(atom(), queue_context(), non_neg_integer()) :: script_result()

Obliterates the queue (removes everything including meta).

pause(conn, ctx, paused?)

@spec pause(atom(), queue_context(), boolean()) :: script_result()

Pauses or resumes the queue.

promote(conn, ctx, job_id)

@spec promote(atom(), queue_context(), String.t()) :: script_result()

Promotes a delayed job to wait.

release_lock(conn, ctx, job_id, token)

@spec release_lock(atom(), queue_context(), String.t(), String.t()) :: script_result()

Releases the lock on a job.

remove_job(conn, ctx, job_id, remove_children)

@spec remove_job(atom(), queue_context(), String.t(), boolean()) :: script_result()

Removes a job from the queue.

retry_job(conn, ctx, job_id, lifo, token)

@spec retry_job(atom(), queue_context(), String.t(), boolean(), String.t()) ::
  script_result()

Retries a failed job.

update_data(conn, ctx, job_id, data)

@spec update_data(atom(), queue_context(), String.t(), map()) :: script_result()

Updates the data of a job.

update_job_scheduler(conn, ctx, scheduler_id, next_millis, template_data, job_opts, producer_id)

@spec update_job_scheduler(
  atom(),
  queue_context(),
  String.t(),
  non_neg_integer(),
  String.t(),
  binary(),
  String.t()
) :: script_result()

Updates a job scheduler and adds the next delayed job.

Called by the worker after completing a repeatable job to schedule the next iteration.

Parameters

  • conn - Redis connection
  • ctx - Queue context (keys structure)
  • scheduler_id - The job scheduler ID (repeat_job_key)
  • next_millis - Next execution time in milliseconds
  • template_data - JSON-encoded job data
  • job_opts - Msgpacked job options
  • producer_id - The ID of the job that produced this iteration

Returns

  • {:ok, job_id} - The ID of the next scheduled job
  • {:ok, nil} - Scheduler doesn't exist or duplicate
  • {:error, reason} - Error

update_progress(conn, ctx, job_id, progress)

@spec update_progress(atom(), queue_context(), String.t(), any()) :: script_result()

Updates the progress of a job.