BullMQ.Scripts (BullMQ v1.0.1)
View SourceManages Lua scripts for BullMQ Redis operations.
This module loads Lua scripts from priv/scripts at compile time,
extracting the number of keys from the filename pattern scriptName-numberOfKeys.lua.
All scripts are loaded and cached for efficient execution using Redis EVALSHA.
Script Location
Scripts are copied from the root rawScripts/ directory to priv/scripts/
before compilation. Run mix scripts.copy to update the scripts, or they
will be copied automatically during CI builds.
Summary
Functions
Adds a delayed job to the queue.
Adds a log entry to a job.
Adds a parent job to the queue (waiting-children state).
Adds a prioritized job to the queue.
Adds a standard job to the queue.
Adds multiple standard jobs atomically in a single transaction (MULTI/EXEC). Much more efficient than calling add_standard_job multiple times.
Builds a command for adding a parent job without executing it. Used for building flow transactions where all jobs are added atomically.
Builds a command for adding a standard job without executing it. Used for pipelining multiple job additions.
Builds a Redis command for a Lua script without executing it. Useful for pipelining multiple script calls.
Drains the queue (removes all jobs).
Ensures scripts are loaded into Redis cache by executing a dummy SCRIPT LOAD. Call this before using pipelined operations to avoid NOSCRIPT errors.
Executes a Lua script against Redis by name.
Executes multiple script commands in a pipeline. Returns a list of results in the same order as the commands.
Executes a raw Lua script against Redis.
Executes multiple script commands in a Redis transaction (MULTI/EXEC). All commands are executed atomically - either all succeed or none do.
Checks if a script exists.
Extends the lock on a job.
Extends locks for multiple jobs in a single call.
Returns the script content and number of keys for a given script name.
Returns the script content for a given script name.
Gets job counts for the queue.
Returns the number of keys for a given script.
Gets queue metrics for completed or failed jobs.
Gets the rate limit TTL.
Gets the state of a job.
Checks if the queue is at its max limit.
Lists all available script names.
Moves a job from active state back to wait.
Moves stalled jobs back to wait.
Moves a job to the active state for processing.
Moves a job to completed state.
Moves a job to delayed state (for retry with delay).
Moves a job to failed state.
Moves a job to finished (completed/failed) state.
Moves a job from active to waiting-children state.
Obliterates the queue (removes everything including meta).
Pauses or resumes the queue.
Promotes a delayed job to wait.
Releases the lock on a job.
Removes a job from the queue.
Retries a failed job.
Updates the data of a job.
Updates a job scheduler and adds the next delayed job.
Updates the progress of a job.
Types
Functions
@spec add_delayed_job(atom(), queue_context(), map() | struct(), map()) :: script_result()
Adds a delayed job to the queue.
@spec add_log( atom(), queue_context(), String.t(), String.t(), non_neg_integer() | nil ) :: script_result()
Adds a log entry to a job.
@spec add_parent_job(atom(), queue_context(), map() | struct(), map()) :: script_result()
Adds a parent job to the queue (waiting-children state).
Parent jobs are added in waiting-children state until all children complete. Used by FlowProducer to create job hierarchies.
@spec add_prioritized_job(atom(), queue_context(), map() | struct(), map()) :: script_result()
Adds a prioritized job to the queue.
@spec add_standard_job(atom(), queue_context(), map() | struct(), map()) :: script_result()
Adds a standard job to the queue.
@spec add_standard_jobs_pipelined(atom(), queue_context(), [{map() | struct(), map()}]) :: {:ok, [String.t()]} | {:error, term()}
Adds multiple standard jobs atomically in a single transaction (MULTI/EXEC). Much more efficient than calling add_standard_job multiple times.
This operation is atomic - all jobs are added or none are.
Returns {:ok, job_ids} on success or {:error, reason} on failure.
@spec build_add_parent_job_command(queue_context(), map() | struct(), map()) :: {:ok, [String.t()]}
Builds a command for adding a parent job without executing it. Used for building flow transactions where all jobs are added atomically.
@spec build_add_standard_job_command(queue_context(), map() | struct(), map()) :: {:ok, [String.t()]}
Builds a command for adding a standard job without executing it. Used for pipelining multiple job additions.
@spec build_command(script_name(), [String.t()], [any()]) :: {:ok, [String.t()]} | {:error, term()}
Builds a Redis command for a Lua script without executing it. Useful for pipelining multiple script calls.
Returns {:ok, command} where command is a list that can be passed to Redis pipeline,
or {:error, reason} if the script is not found.
@spec drain(atom(), queue_context(), boolean()) :: script_result()
Drains the queue (removes all jobs).
@spec ensure_scripts_loaded(atom(), [script_name()]) :: :ok | {:error, term()}
Ensures scripts are loaded into Redis cache by executing a dummy SCRIPT LOAD. Call this before using pipelined operations to avoid NOSCRIPT errors.
@spec execute(atom(), script_name(), [String.t()], [any()]) :: script_result()
Executes a Lua script against Redis by name.
Uses EVALSHA for efficiency, falling back to EVAL if the script is not yet cached in Redis.
Parameters
conn- The Redis connection pool namescript_name- The script name as an atomkeys- List of Redis keysargs- List of arguments to pass to the script
Returns
{:ok, result}on success{:error, reason}on failure
Executes multiple script commands in a pipeline. Returns a list of results in the same order as the commands.
Note: If any script is not cached (NOSCRIPT error), this will fail.
Use ensure_scripts_loaded/2 first to cache scripts.
@spec execute_raw(atom(), String.t(), [String.t()], [any()]) :: script_result()
Executes a raw Lua script against Redis.
Executes multiple script commands in a Redis transaction (MULTI/EXEC). All commands are executed atomically - either all succeed or none do.
Returns {:ok, results} on success or {:error, reason} on failure.
Note: If any script is not cached (NOSCRIPT error), this will fail.
Use ensure_scripts_loaded/2 first to cache scripts.
@spec exists?(script_name()) :: boolean()
Checks if a script exists.
@spec extend_lock(atom(), queue_context(), String.t(), String.t(), non_neg_integer()) :: script_result()
Extends the lock on a job.
@spec extend_locks( atom(), queue_context(), [String.t()], [String.t()], non_neg_integer() ) :: script_result()
Extends locks for multiple jobs in a single call.
This is more efficient than calling extend_lock multiple times when processing many concurrent jobs, as it uses a single Redis call.
Returns a list of results (1 for success, job_id for failures).
@spec get(script_name()) :: {String.t(), non_neg_integer()} | nil
Returns the script content and number of keys for a given script name.
Parameters
name- The script name as an atom (e.g.,:extend_lock,:move_to_active)
Returns
{content, key_count}tuple if script existsnilif script not found
Examples
iex> {content, keys} = BullMQ.Scripts.get(:extend_lock)
iex> is_binary(content) and is_integer(keys)
true
@spec get_content(script_name()) :: String.t() | nil
Returns the script content for a given script name.
@spec get_counts(atom(), queue_context()) :: script_result()
Gets job counts for the queue.
@spec get_key_count(script_name()) :: non_neg_integer() | nil
Returns the number of keys for a given script.
@spec get_metrics(atom(), queue_context(), :completed | :failed, integer(), integer()) :: script_result()
Gets queue metrics for completed or failed jobs.
Returns metrics data including count, previous timestamp, previous count, data points, and total number of points.
@spec get_rate_limit_ttl(atom(), queue_context(), keyword()) :: script_result()
Gets the rate limit TTL.
Options
:max_jobs- Maximum jobs for rate limit (default: 0, uses meta key)
@spec get_state(atom(), queue_context(), String.t()) :: script_result()
Gets the state of a job.
@spec is_maxed(atom(), queue_context()) :: script_result()
Checks if the queue is at its max limit.
@spec list_scripts() :: [script_name()]
Lists all available script names.
@spec move_job_from_active_to_wait(atom(), queue_context(), String.t(), String.t()) :: script_result()
Moves a job from active state back to wait.
This is useful when manually processing jobs and you need to release a job back to the queue (e.g., due to rate limiting).
Returns
{:ok, pttl}- The rate limit TTL in milliseconds (0 if no rate limit)
@spec move_stalled_jobs_to_wait(atom(), queue_context(), non_neg_integer(), keyword()) :: script_result()
Moves stalled jobs back to wait.
@spec move_to_active(atom(), queue_context(), String.t(), keyword()) :: script_result()
Moves a job to the active state for processing.
@spec move_to_completed( atom(), queue_context(), String.t(), String.t(), any(), keyword() ) :: script_result()
Moves a job to completed state.
@spec move_to_delayed( atom(), queue_context(), String.t(), String.t(), non_neg_integer(), keyword() ) :: script_result()
Moves a job to delayed state (for retry with delay).
@spec move_to_failed( atom(), queue_context(), String.t(), String.t(), any(), keyword() ) :: script_result()
Moves a job to failed state.
@spec move_to_finished( atom(), queue_context(), String.t(), String.t(), any(), atom(), keyword() ) :: script_result()
Moves a job to finished (completed/failed) state.
@spec move_to_waiting_children( atom(), queue_context(), String.t(), String.t(), keyword() ) :: script_result()
Moves a job from active to waiting-children state.
This is used when a job needs to wait for its child jobs to complete before continuing. The job will be automatically moved back to waiting when all children complete.
Returns
{:ok, 0}- Successfully moved to waiting-children{:ok, 1}- No pending dependencies{:ok, -1}- Missing job{:ok, -2}- Missing lock{:ok, -3}- Job not in active set{:ok, -9}- Job has failed children
@spec obliterate(atom(), queue_context(), non_neg_integer()) :: script_result()
Obliterates the queue (removes everything including meta).
@spec pause(atom(), queue_context(), boolean()) :: script_result()
Pauses or resumes the queue.
@spec promote(atom(), queue_context(), String.t()) :: script_result()
Promotes a delayed job to wait.
@spec release_lock(atom(), queue_context(), String.t(), String.t()) :: script_result()
Releases the lock on a job.
@spec remove_job(atom(), queue_context(), String.t(), boolean()) :: script_result()
Removes a job from the queue.
@spec retry_job(atom(), queue_context(), String.t(), boolean(), String.t()) :: script_result()
Retries a failed job.
@spec update_data(atom(), queue_context(), String.t(), map()) :: script_result()
Updates the data of a job.
@spec update_job_scheduler( atom(), queue_context(), String.t(), non_neg_integer(), String.t(), binary(), String.t() ) :: script_result()
Updates a job scheduler and adds the next delayed job.
Called by the worker after completing a repeatable job to schedule the next iteration.
Parameters
conn- Redis connectionctx- Queue context (keys structure)scheduler_id- The job scheduler ID (repeat_job_key)next_millis- Next execution time in millisecondstemplate_data- JSON-encoded job datajob_opts- Msgpacked job optionsproducer_id- The ID of the job that produced this iteration
Returns
{:ok, job_id}- The ID of the next scheduled job{:ok, nil}- Scheduler doesn't exist or duplicate{:error, reason}- Error
@spec update_progress(atom(), queue_context(), String.t(), any()) :: script_result()
Updates the progress of a job.