View Source Oban.Engine behaviour (Oban v2.19.0)

Defines an Engine for job orchestration.

Engines are responsible for all non-plugin database interaction, from inserting through executing jobs.

Oban ships with three Engine implementations:

  1. Basic — The default engine for development, production, and manual testing mode.
  2. Inline — Designed specifically for testing, it executes jobs immediately, in-memory, as they are inserted.
  3. Lite - The engine for running Oban using SQLite3.

🌟 SmartEngine

The Basic engine lacks advanced functionality such as global limits, rate limits, and unique bulk insert. For those features and more, see the Smart engine in Oban Pro.

Summary

Callbacks

Mark many executing, available, scheduled or retryable job as cancelled to prevent them from running.

Mark an executing, available, scheduled or retryable job as cancelled to prevent it from running.

Check for a list of queues with available jobs.

Format engine meta in a digestible format for queue inspection.

Record that a job completed successfully.

Delete many jobs that aren't currently executing.

Delete a job that isn't currently executing.

Transition a job to discarded and record an optional reason that it shouldn't be ran again.

Record an executing job's errors and either retry or discard it, depending on whether it has exhausted its available attempts.

Fetch available jobs for the given queue, up to configured limits.

Initialize metadata for a queue engine.

Insert multiple jobs into the database.

Insert a job into the database.

Delete completed, cancelled and discarded jobs.

Store the given key/value pair in the engine meta.

Refresh a queue to indicate that it is still alive.

Transition stuck jobs to available or discarded state based on attempts.

Mark many jobs as available, adding attempts if already maxed out. Any jobs currently available, executing or scheduled should be ignored.

Mark a job as available, adding attempts if already maxed out. If the job is currently available, executing or scheduled it should be ignored.

Prepare a queue engine for shutdown.

Reschedule an executing job to run some number of seconds in the future.

Transition scheduled or retryable jobs to available prior to execution.

Types

conf()

@type conf() :: Oban.Config.t()

job()

@type job() :: Oban.Job.t()

meta()

@type meta() :: map()

opts()

@type opts() :: Keyword.t()

queryable()

@type queryable() :: Ecto.Queryable.t()

running()

@type running() :: map()

seconds()

@type seconds() :: non_neg_integer()

Callbacks

cancel_all_jobs(conf, queryable)

@callback cancel_all_jobs(conf(), queryable()) :: {:ok, [map()]}

Mark many executing, available, scheduled or retryable job as cancelled to prevent them from running.

cancel_job(conf, t)

@callback cancel_job(conf(), Oban.Job.t()) :: :ok

Mark an executing, available, scheduled or retryable job as cancelled to prevent it from running.

check_available(conf)

(optional)
@callback check_available(conf()) :: {:ok, [String.t()]}

Check for a list of queues with available jobs.

check_meta(conf, meta, running)

@callback check_meta(conf(), meta(), running()) :: map()

Format engine meta in a digestible format for queue inspection.

complete_job(conf, t)

@callback complete_job(conf(), Oban.Job.t()) :: :ok

Record that a job completed successfully.

delete_all_jobs(conf, queryable)

(optional)
@callback delete_all_jobs(conf(), queryable()) :: {:ok, [map()]}

Delete many jobs that aren't currently executing.

delete_job(conf, t)

(optional)
@callback delete_job(conf(), Oban.Job.t()) :: :ok

Delete a job that isn't currently executing.

discard_job(conf, t)

@callback discard_job(conf(), Oban.Job.t()) :: :ok

Transition a job to discarded and record an optional reason that it shouldn't be ran again.

error_job(conf, t, seconds)

@callback error_job(conf(), Oban.Job.t(), seconds()) :: :ok

Record an executing job's errors and either retry or discard it, depending on whether it has exhausted its available attempts.

fetch_jobs(conf, meta, running)

@callback fetch_jobs(conf(), meta(), running()) ::
  {:ok, {meta(), [Oban.Job.t()]}} | {:error, term()}

Fetch available jobs for the given queue, up to configured limits.

init(conf, opts)

@callback init(conf(), opts()) :: {:ok, meta()} | {:error, term()}

Initialize metadata for a queue engine.

Queue metadata is used to identify and track subsequent actions such as fetching or staging jobs.

insert_all_jobs(conf, list, opts)

@callback insert_all_jobs(conf(), [Oban.Job.changeset()], opts()) :: [Oban.Job.t()]

Insert multiple jobs into the database.

insert_all_jobs(conf, t, name, list, opts)

(optional)
This callback is deprecated. Handled automatically by engine dispatch..
@callback insert_all_jobs(
  conf(),
  Ecto.Multi.t(),
  Ecto.Multi.name(),
  [Oban.Job.changeset()],
  opts()
) ::
  Ecto.Multi.t()

insert_job(conf, changeset, opts)

@callback insert_job(conf(), Oban.Job.changeset(), opts()) ::
  {:ok, Oban.Job.t()} | {:error, term()}

Insert a job into the database.

insert_job(conf, t, name, list, opts)

(optional)
This callback is deprecated. Handled automatically by engine dispatch..
@callback insert_job(
  conf(),
  Ecto.Multi.t(),
  Ecto.Multi.name(),
  [Oban.Job.changeset()],
  opts()
) ::
  Ecto.Multi.t()

prune_jobs(conf, queryable, opts)

(optional)
@callback prune_jobs(conf(), queryable(), opts()) :: {:ok, [map()]}

Delete completed, cancelled and discarded jobs.

put_meta(conf, meta, atom, term)

@callback put_meta(conf(), meta(), atom(), term()) :: meta()

Store the given key/value pair in the engine meta.

refresh(conf, meta)

@callback refresh(conf(), meta()) :: meta()

Refresh a queue to indicate that it is still alive.

rescue_jobs(conf, queryable, opts)

(optional)
@callback rescue_jobs(conf(), queryable(), opts()) :: {:ok, [map()]}

Transition stuck jobs to available or discarded state based on attempts.

retry_all_jobs(conf, queryable)

@callback retry_all_jobs(conf(), queryable()) :: {:ok, [map()]}

Mark many jobs as available, adding attempts if already maxed out. Any jobs currently available, executing or scheduled should be ignored.

retry_job(conf, t)

@callback retry_job(conf(), Oban.Job.t()) :: :ok

Mark a job as available, adding attempts if already maxed out. If the job is currently available, executing or scheduled it should be ignored.

shutdown(conf, meta)

@callback shutdown(conf(), meta()) :: meta()

Prepare a queue engine for shutdown.

The queue process is expected to stop processing new jobs after shutdown starts, though it may continue executing jobs that are already running.

snooze_job(conf, t, seconds)

@callback snooze_job(conf(), Oban.Job.t(), seconds()) :: :ok

Reschedule an executing job to run some number of seconds in the future.

stage_jobs(conf, queryable, opts)

(optional)
@callback stage_jobs(conf(), queryable(), opts()) :: {:ok, [Oban.Job.t()]}

Transition scheduled or retryable jobs to available prior to execution.