View Source Oban.Engine behaviour (Oban v2.13.0)

Defines an Engine for job orchestration.

Engines are responsible for all non-plugin database interaction, from inserting through executing jobs.

Oban ships with two Engine implementations:

  1. Basic — The default engine for development, production, and manual testing mode.
  2. Inline — Designed specifically for testing, it executes jobs immediately, in-memory, as they are inserted.

🌟 SmartEngine

The Basic engine lacks advanced functionality such as global limits, rate limits, and unique bulk insert. For those features and more, see the SmartEngine in Oban Pro.

Link to this section Summary

Callbacks

Mark many executing, available, scheduled or retryable job as cancelled to prevent them from running.

Mark an executing, available, scheduled or retryable job as cancelled to prevent it from running.

Format engine meta in a digestible format for queue inspection.

Record that a job completed successfully.

Transition a job to discarded and record an optional reason that it shouldn't be ran again.

Record an executing job's errors and either retry or discard it, depending on whether it has exhausted its available attempts.

Fetch available jobs for the given queue, up to configured limits.

Initialize metadata for a queue engine.

Insert multiple jobs into the database.

Insert a job into the database.

Store the given key/value pair in the engine meta.

Refresh a queue to indicate that it is still alive.

Mark many jobs as available, adding attempts if already maxed out. Any jobs currently available, executing or scheduled should be ignored.

Mark a job as available, adding attempts if already maxed out. If the job is currently available, executing or scheduled it should be ignored.

Prepare a queue engine for shutdown.

Reschedule an executing job to run some number of seconds in the future.

Link to this section Types

Link to this section Callbacks

Link to this callback

cancel_all_jobs(conf, queryable)

View Source
@callback cancel_all_jobs(conf(), queryable()) ::
  {:ok, {non_neg_integer(), [Oban.Job.t()]}}

Mark many executing, available, scheduled or retryable job as cancelled to prevent them from running.

@callback cancel_job(conf(), Oban.Job.t()) :: :ok

Mark an executing, available, scheduled or retryable job as cancelled to prevent it from running.

Link to this callback

check_meta(conf, meta, running)

View Source
@callback check_meta(conf(), meta(), running()) :: map()

Format engine meta in a digestible format for queue inspection.

@callback complete_job(conf(), Oban.Job.t()) :: :ok

Record that a job completed successfully.

@callback discard_job(conf(), Oban.Job.t()) :: :ok

Transition a job to discarded and record an optional reason that it shouldn't be ran again.

Link to this callback

error_job(conf, t, seconds)

View Source
@callback error_job(conf(), Oban.Job.t(), seconds()) :: :ok

Record an executing job's errors and either retry or discard it, depending on whether it has exhausted its available attempts.

Link to this callback

fetch_jobs(conf, meta, running)

View Source
@callback fetch_jobs(conf(), meta(), running()) ::
  {:ok, {meta(), [Oban.Job.t()]}} | {:error, term()}

Fetch available jobs for the given queue, up to configured limits.

@callback init(conf(), opts()) :: {:ok, meta()} | {:error, term()}

Initialize metadata for a queue engine.

Queue metadata is used to identify and track subsequent actions such as fetching or staging jobs.

Link to this callback

insert_all_jobs(conf, changesets_or_wrapper, opts)

View Source
@callback insert_all_jobs(conf(), Oban.changesets_or_wrapper(), opts()) :: [Oban.Job.t()]

Insert multiple jobs into the database.

Link to this callback

insert_all_jobs( conf, t, name, changesets_or_wrapper_or_fun, opts )

View Source
@callback insert_all_jobs(
  conf(),
  Ecto.Multi.t(),
  Ecto.Multi.name(),
  Oban.changesets_or_wrapper_or_fun(),
  opts()
) :: Ecto.Multi.t()

Insert multiple jobs within an Ecto.Multi

Link to this callback

insert_job(conf, changeset, opts)

View Source
@callback insert_job(conf(), Oban.Job.changeset(), opts()) ::
  {:ok, Oban.Job.t()} | {:error, term()}

Insert a job into the database.

Link to this callback

insert_job(conf, t, name, changeset_or_fun, opts)

View Source
@callback insert_job(
  conf(),
  Ecto.Multi.t(),
  Ecto.Multi.name(),
  Oban.changeset_or_fun(),
  opts()
) ::
  Ecto.Multi.t()

Insert a job within an Ecto.Multi.

Link to this callback

put_meta(conf, meta, atom, term)

View Source
@callback put_meta(conf(), meta(), atom(), term()) :: meta()

Store the given key/value pair in the engine meta.

@callback refresh(conf(), meta()) :: meta()

Refresh a queue to indicate that it is still alive.

Link to this callback

retry_all_jobs(conf, queryable)

View Source
@callback retry_all_jobs(conf(), queryable()) :: {:ok, non_neg_integer()}

Mark many jobs as available, adding attempts if already maxed out. Any jobs currently available, executing or scheduled should be ignored.

@callback retry_job(conf(), Oban.Job.t()) :: :ok

Mark a job as available, adding attempts if already maxed out. If the job is currently available, executing or scheduled it should be ignored.

@callback shutdown(conf(), meta()) :: meta()

Prepare a queue engine for shutdown.

The queue process is expected to stop processing new jobs after shutdown starts, though it may continue executing jobs that are already running.

Link to this callback

snooze_job(conf, t, seconds)

View Source
@callback snooze_job(conf(), Oban.Job.t(), seconds()) :: :ok

Reschedule an executing job to run some number of seconds in the future.