BullMQ.JobScheduler (BullMQ v1.0.1)

View Source

Job scheduler for creating recurring jobs.

The JobScheduler allows you to create jobs that repeat on a schedule, using either cron expressions or fixed intervals. This supersedes the older "repeatable jobs" concept.

Node.js Interoperability Note

Seconds field: Node.js supports an optional 6th field for seconds at the beginning (second minute hour day month weekday). Elixir's crontab library uses standard 5-field format by default. 6-field expressions with seconds from Node.js will fail to parse in Elixir.

Sunday: Elixir uses 7 for Sunday, Node.js uses 0 (or 7). Use 7 for Sunday to ensure compatibility.

FeatureElixirNode.jsCompatible?
5-field (no seconds)✅ Yes
6-field (with seconds)❌ No
Sunday = 7✅ Yes
Sunday = 0❌ No

For cross-platform compatibility:

  • Use 5-field cron expressions (no seconds)
  • Use 7 for Sunday, not 0
  • Or use interval-based schedulers (:every)

Usage

# Create a scheduler with cron pattern
{:ok, job} = BullMQ.JobScheduler.upsert(conn, queue_name, "daily_report",
  %{pattern: "0 9 * * *"},  # Every day at 9 AM
  "report",
  %{type: "daily"},
  []
)

# Create an interval-based scheduler (RECOMMENDED for cross-platform)
{:ok, job} = BullMQ.JobScheduler.upsert(conn, queue_name, "heartbeat",
  %{every: 60_000},  # Every minute
  "ping",
  %{},
  []
)

Repeat Options

  • :pattern - Cron expression (e.g., "/5 *" for every 5 minutes)
  • :every - Interval in milliseconds (mutually exclusive with :pattern)
  • :limit - Maximum number of times to repeat
  • :start_date - When to start the schedule (milliseconds or DateTime)
  • :end_date - When to stop the schedule (milliseconds or DateTime)
  • :tz - Timezone for cron expressions (default: UTC)
  • :immediately - Run the first job immediately (only with pattern)
  • :offset - Offset in milliseconds for every-based jobs

Cron Expressions

Cron expressions follow the standard 5-field format:

 minute (0 - 59)
  hour (0 - 23)
   day of month (1 - 31)
    month (1 - 12)
     day of week (1 - 7) (Monday to Sunday)
    
* * * * *

Weekday Numbering

DayElixirNode.jsCompatible?
Monday11✅ Yes
Tuesday22✅ Yes
Wednesday33✅ Yes
Thursday44✅ Yes
Friday55✅ Yes
Saturday66✅ Yes
Sunday70 or 7⚠️ Use 7

Use 7 for Sunday - it works in both Elixir and Node.js. Avoid 0 for Sunday as it fails to parse in Elixir.

Examples:

  • "0 * * * *" - Every hour
  • "*/15 * * * *" - Every 15 minutes
  • "0 9 * * 1-5" - Every weekday at 9 AM
  • "0 0 1 * *" - First day of every month at midnight
  • "0 0 * * 7" - Every Sunday at midnight (Elixir only)

Summary

Functions

Calculates the next execution time for a repeat configuration.

Gets the count of job schedulers for a queue.

Lists all job schedulers for a queue.

Removes a job scheduler and its next scheduled job.

Removes a job scheduler by its key (alias for remove/4).

Types

job_opts()

@type job_opts() :: %{
  optional(:priority) => non_neg_integer(),
  optional(:lifo) => boolean(),
  optional(:delay) => non_neg_integer(),
  optional(:attempts) => pos_integer(),
  optional(:backoff) => map(),
  optional(:remove_on_complete) => boolean() | map(),
  optional(:remove_on_fail) => boolean() | map()
}

repeat_opts()

@type repeat_opts() :: %{
  optional(:pattern) => String.t(),
  optional(:every) => non_neg_integer(),
  optional(:limit) => pos_integer(),
  optional(:start_date) => DateTime.t() | non_neg_integer(),
  optional(:end_date) => DateTime.t() | non_neg_integer(),
  optional(:tz) => String.t(),
  optional(:immediately) => boolean(),
  optional(:offset) => non_neg_integer(),
  optional(:count) => non_neg_integer()
}

scheduler_json()

@type scheduler_json() :: %{
  :key => String.t(),
  :name => String.t(),
  optional(:next) => non_neg_integer(),
  optional(:iteration_count) => non_neg_integer(),
  optional(:limit) => non_neg_integer(),
  optional(:start_date) => non_neg_integer(),
  optional(:end_date) => non_neg_integer(),
  optional(:tz) => String.t(),
  optional(:pattern) => String.t(),
  optional(:every) => non_neg_integer(),
  optional(:offset) => non_neg_integer(),
  optional(:template) => map()
}

Functions

calculate_next_millis(repeat_opts, reference_time \\ System.system_time(:millisecond))

@spec calculate_next_millis(repeat_opts(), non_neg_integer()) ::
  non_neg_integer() | nil

Calculates the next execution time for a repeat configuration.

Parameters

  • repeat_opts - Repeat options with pattern or every
  • reference_time - Reference time in milliseconds (default: now)

Returns

  • The next execution time in milliseconds, or nil if no next time

Examples

# Next minute
next = BullMQ.JobScheduler.calculate_next_millis(%{every: 60_000}, now)

# Next cron execution
next = BullMQ.JobScheduler.calculate_next_millis(%{pattern: "0 * * * *"}, now)

count(conn, queue_name, opts \\ [])

@spec count(pid() | atom(), String.t(), keyword()) ::
  {:ok, non_neg_integer()} | {:error, term()}

Gets the count of job schedulers for a queue.

Examples

{:ok, count} = BullMQ.JobScheduler.count(conn, "my_queue")
# => {:ok, 5}

get(conn, queue_name, scheduler_id, opts \\ [])

@spec get(pid() | atom(), String.t(), String.t(), keyword()) ::
  {:ok, scheduler_json() | nil} | {:error, term()}

Gets a job scheduler by ID.

Parameters

  • conn - Redis connection
  • queue_name - Queue name
  • scheduler_id - The scheduler ID
  • opts - Options (prefix)

Returns

  • {:ok, scheduler} - The scheduler data
  • {:ok, nil} - Scheduler not found
  • {:error, reason} - Redis error

Examples

{:ok, scheduler} = BullMQ.JobScheduler.get(conn, "my_queue", "daily_report")
# => %{
#   key: "daily_report",
#   name: "daily_report",
#   pattern: "0 9 * * *",
#   next: 1699999999000,
#   iteration_count: 5,
#   template: %{data: %{}, opts: %{}}
# }

list(conn, queue_name, opts \\ [])

@spec list(pid() | atom(), String.t(), keyword()) ::
  {:ok, [scheduler_json()]} | {:error, term()}

Lists all job schedulers for a queue.

Parameters

  • conn - Redis connection
  • queue_name - Queue name
  • opts - Options:
    • :start - Start index (default: 0)
    • :end - End index (default: -1 for all)
    • :asc - Sort ascending (default: false)
    • :prefix - Key prefix (default: "bull")

Returns

  • {:ok, schedulers} - List of scheduler data
  • {:error, reason} - Redis error

Examples

{:ok, schedulers} = BullMQ.JobScheduler.list(conn, "my_queue")
{:ok, first_10} = BullMQ.JobScheduler.list(conn, "my_queue", start: 0, end: 9)

remove(conn, queue_name, scheduler_id, opts \\ [])

@spec remove(pid() | atom(), String.t(), String.t(), keyword()) ::
  {:ok, boolean()} | {:error, term()}

Removes a job scheduler and its next scheduled job.

Parameters

  • conn - Redis connection
  • queue_name - Queue name
  • scheduler_id - The scheduler ID to remove
  • opts - Options (prefix)

Returns

  • {:ok, true} - Scheduler was removed
  • {:ok, false} - Scheduler not found
  • {:error, reason} - Redis error

Examples

{:ok, true} = BullMQ.JobScheduler.remove(conn, "my_queue", "daily_report")

remove_by_key(conn, queue_name, key, opts \\ [])

@spec remove_by_key(pid() | atom(), String.t(), String.t(), keyword()) ::
  {:ok, boolean()} | {:error, term()}

Removes a job scheduler by its key (alias for remove/4).

For backwards compatibility with the "repeatable jobs" API.

upsert(conn, queue_name, scheduler_id, repeat_opts, job_name, job_data \\ %{}, opts \\ [])

@spec upsert(
  pid() | atom(),
  String.t(),
  String.t(),
  repeat_opts(),
  String.t(),
  map(),
  keyword()
) :: {:ok, BullMQ.Job.t()} | {:error, atom()}

Creates or updates a job scheduler.

If a scheduler with the given ID already exists, it will be updated. The scheduler will create a delayed job for the next scheduled execution.

Parameters

  • conn - Redis connection
  • queue_name - Queue name
  • scheduler_id - Unique identifier for the scheduler
  • repeat_opts - Repeat configuration (pattern, every, limit, etc.)
  • job_name - Name for the jobs created by this scheduler
  • job_data - Data to be passed to each job
  • opts - Job options (priority, attempts, backoff, etc.)

Returns

  • {:ok, job} - The job struct for the next scheduled execution
  • {:error, :both_pattern_and_every} - Both pattern and every specified
  • {:error, :no_pattern_or_every} - Neither pattern nor every specified
  • {:error, :immediately_with_start_date} - Both immediately and start_date specified
  • {:error, :limit_reached} - Job has reached its iteration limit
  • {:error, :end_date_reached} - Job has passed its end date
  • {:error, :job_id_collision} - A job with the same ID already exists in a non-updatable state
  • {:error, :job_slots_busy} - Both current and next time slots have jobs

Examples

# Every day at 9 AM
{:ok, job} = BullMQ.JobScheduler.upsert(conn, "emails", "daily_digest",
  %{pattern: "0 9 * * *"},
  "send_digest",
  %{recipient: "all"},
  priority: 10
)

# Every 5 minutes, limited to 100 executions
{:ok, job} = BullMQ.JobScheduler.upsert(conn, "health", "heartbeat",
  %{every: 300_000, limit: 100},
  "ping",
  %{timestamp: true},
  []
)

# Start immediately then repeat every hour
{:ok, job} = BullMQ.JobScheduler.upsert(conn, "sync", "data_sync",
  %{pattern: "0 * * * *", immediately: true},
  "sync_data",
  %{},
  []
)