View Source Oban.Job (Oban v2.17.0)

A Job is an Ecto schema used for asynchronous execution.

Job changesets are created by your application code and inserted into the database for asynchronous execution. Jobs can be inserted along with other application data as part of a transaction, which guarantees that jobs will only be triggered from a successful transaction.

Summary

Functions

Normalize, blame, and format a job's unsaved_error into the stored error format.

Construct a new job changeset ready for insertion into the database.

A canonical list of all possible job states.

Convert a Job changeset into a map suitable for database insertion.

Types

@type args() :: map()
Link to this type

changeset()

View Source (since 0.1.0)
@type changeset() :: Ecto.Changeset.t(t())
Link to this type

changeset_fun()

View Source (since 0.1.0)
@type changeset_fun() :: (map() -> changeset())
Link to this type

changeset_list()

View Source (since 0.1.0)
@type changeset_list() :: [changeset()]
Link to this type

changeset_list_fun()

View Source (since 0.1.0)
@type changeset_list_fun() :: (map() -> changeset_list())
Link to this type

errors()

View Source (since 0.1.0)
@type errors() :: [%{at: DateTime.t(), attempt: pos_integer(), error: binary()}]
Link to this type

option()

View Source (since 0.1.0)
@type option() ::
  {:args, args()}
  | {:max_attempts, pos_integer()}
  | {:meta, map()}
  | {:priority, pos_integer()}
  | {:queue, atom() | binary()}
  | {:schedule_in, schedule_in_option()}
  | {:replace_args, boolean()}
  | {:replace, [replace_option() | replace_by_state_option()]}
  | {:scheduled_at, DateTime.t()}
  | {:tags, tags()}
  | {:unique, [unique_option()]}
  | {:worker, atom() | binary()}
Link to this type

replace_by_state_option()

View Source (since 0.1.0)
@type replace_by_state_option() :: [
  available: [replace_option()],
  cancelled: [replace_option()],
  completed: [replace_option()],
  discarded: [replace_option()],
  executing: [replace_option()],
  retryable: [replace_option()],
  scheduled: [replace_option()]
]
Link to this type

replace_option()

View Source (since 0.1.0)
@type replace_option() :: [
  :args
  | :max_attempts
  | :meta
  | :priority
  | :queue
  | :scheduled_at
  | :tags
  | :worker
]
Link to this type

schedule_in_option()

View Source (since 0.1.0)
@type schedule_in_option() :: pos_integer() | {pos_integer(), time_unit()}
@type t() :: %Oban.Job{
  __meta__: term(),
  args: args(),
  attempt: non_neg_integer(),
  attempted_at: DateTime.t() | nil,
  attempted_by: [binary()] | nil,
  cancelled_at: DateTime.t() | nil,
  completed_at: DateTime.t() | nil,
  conf: Oban.Config.t() | nil,
  conflict?: boolean(),
  discarded_at: DateTime.t() | nil,
  errors: errors(),
  id: pos_integer(),
  inserted_at: DateTime.t(),
  max_attempts: pos_integer(),
  meta: map(),
  priority: non_neg_integer(),
  queue: binary(),
  replace: [replace_option() | replace_by_state_option()] | nil,
  scheduled_at: DateTime.t(),
  state: binary(),
  tags: tags(),
  unique:
    %{
      fields: [unique_field()],
      period: unique_period(),
      states: [unique_state()]
    }
    | nil,
  unsaved_error:
    %{
      kind: Exception.kind(),
      reason: term(),
      stacktrace: Exception.stacktrace()
    }
    | nil,
  worker: binary()
}
@type tags() :: [binary()]
Link to this type

time_unit()

View Source (since 0.1.0)
@type time_unit() ::
  :second
  | :seconds
  | :minute
  | :minutes
  | :hour
  | :hours
  | :day
  | :days
  | :week
  | :weeks
Link to this type

unique_field()

View Source (since 0.1.0)
@type unique_field() :: [:args | :meta | :queue | :worker]
Link to this type

unique_option()

View Source (since 0.1.0)
@type unique_option() ::
  {:fields, [unique_field()]}
  | {:keys, [atom()]}
  | {:period, unique_period()}
  | {:states, [unique_state()]}
  | {:timestamp, :inserted_at | :scheduled_at}
Link to this type

unique_period()

View Source (since 0.1.0)
@type unique_period() :: pos_integer() | {pos_integer(), time_unit()} | :infinity
Link to this type

unique_state()

View Source (since 0.1.0)
@type unique_state() :: [
  :available
  | :cancelled
  | :completed
  | :discarded
  | :executing
  | :retryable
  | :scheduled
]

Functions

Link to this function

format_attempt(job)

View Source (since 2.14.0)

Normalize, blame, and format a job's unsaved_error into the stored error format.

Formatted errors are stored in a job's errors field.

Link to this function

new(args, opts \\ [])

View Source (since 0.1.0)
@spec new(args(), [option()]) :: changeset()

Construct a new job changeset ready for insertion into the database.

Options

  • :max_attempts — the maximum number of times a job can be retried if there are errors during execution

  • :meta — a map containing additional information about the job

  • :priority — a numerical indicator from 0 to 9 of how important this job is relative to other jobs in the same queue. The lower the number, the higher priority the job.

  • :queue — a named queue to push the job into. Jobs may be pushed into any queue, regardless of whether jobs are currently being processed for the queue.

  • :replace - a list of keys to replace per state on a unique conflict

  • :scheduled_at - a time in the future after which the job should be executed

  • :schedule_in - the number of seconds until the job should be executed or a tuple containing a number and unit

  • :tags — a list of tags to group and organize related jobs, i.e. to identify scheduled jobs

  • :unique — a keyword list of options specifying how uniqueness will be calculated. The options define which fields will be used, for how long, with which keys, and for which states.

  • :worker — a module to execute the job in. The module must implement the Oban.Worker behaviour.

Examples

Insert a job with the :default queue:

%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: MyApp.Worker)
|> Oban.insert()

Generate a pre-configured job for MyApp.Worker:

MyApp.Worker.new(%{id: 1, user_id: 2})

Schedule a job to run in 5 seconds:

MyApp.Worker.new(%{id: 1}, schedule_in: 5)

Schedule a job to run in 5 minutes:

MyApp.Worker.new(%{id: 1}, schedule_in: {5, :minutes})

Insert a job, ensuring that it is unique within the past minute:

MyApp.Worker.new(%{id: 1}, unique: [period: {1, :minute}])

Insert a unique job where the period is compared to the scheduled_at timestamp rather than inserted_at:

MyApp.Worker.new(%{id: 1}, unique: [period: 60, timestamp: :scheduled_at])

Insert a unique job based only on the worker field, and within multiple states:

fields = [:worker]
states = [:available, :scheduled, :executing, :retryable, :completed]

MyApp.Worker.new(%{id: 1}, unique: [fields: fields, period: 60, states: states])

Insert a unique job considering only the worker and specified keys in the args:

keys = [:account_id, :url]
args = %{account_id: 1, url: "https://example.com"}

MyApp.Worker.new(args, unique: [fields: [:args, :worker], keys: keys])

Insert a unique job considering only specified keys in the meta:

unique = [fields: [:meta], keys: [:slug]]

MyApp.Worker.new(%{id: 1}, meta: %{slug: "unique-key"}, unique: unique)

A canonical list of all possible job states.

This may be used to build up :unique options without duplicating states in application code.

Examples

iex> Oban.Job.states() -- [:completed, :discarded]
[:scheduled, :available, :executing, :retryable, :cancelled]

Job State Transitions

  • :scheduled—Jobs inserted with scheduled_at in the future are :scheduled. After the scheduled_at time has ellapsed the Oban.Plugins.Stager will transition them to :available

  • :available—Jobs without a future scheduled_at timestamp are inserted as :available and may execute immediately

  • :executing—Available jobs may be ran, at which point they are :executing

  • :retryable—Jobs that fail and haven't exceeded their max attempts are transitiond to :retryable and rescheduled until after a backoff period. Once the backoff has ellapsed the Oban.Plugins.Stager will transition them back to :available

  • :completed—Jobs that finish executing succesfully are marked :completed

  • :discarded—Jobs that fail and exhaust their max attempts, or return a :discard tuple during execution, are marked :discarded

  • :cancelled—Jobs that are cancelled intentionally

Link to this function

to_map(changeset)

View Source (since 0.9.0)
@spec to_map(Ecto.Changeset.t(t())) :: map()

Convert a Job changeset into a map suitable for database insertion.

Examples

Convert a worker generated changeset into a plain map:

%{id: 123}
|> MyApp.Worker.new()
|> Oban.Job.to_map()