Oban.Job (Oban v2.10.1) View Source

A Job is an Ecto schema used for asynchronous execution.

Job changesets are created by your application code and inserted into the database for asynchronous execution. Jobs can be inserted along with other application data as part of a transaction, which guarantees that jobs will only be triggered from a successful transaction.

Link to this section Summary

Functions

Construct a new job changeset ready for insertion into the database.

A canonical list of all possible job states.

Convert a Job changeset into a map suitable for database insertion.

Link to this section Types

Specs

args() :: map()

Specs

changeset() :: Ecto.Changeset.t(t())

Specs

changeset_fun() :: (map() -> changeset())

Specs

changeset_list() :: [changeset()]

Specs

changeset_list_fun() :: (map() -> changeset_list())

Specs

errors() :: [%{at: DateTime.t(), attempt: pos_integer(), error: binary()}]

Specs

option() ::
  {:args, args()}
  | {:max_attempts, pos_integer()}
  | {:meta, map()}
  | {:priority, pos_integer()}
  | {:queue, atom() | binary()}
  | {:schedule_in, schedule_in_option()}
  | {:replace_args, boolean()}
  | {:replace, [replace_option()]}
  | {:scheduled_at, DateTime.t()}
  | {:tags, tags()}
  | {:unique, [unique_option()]}
  | {:worker, atom() | binary()}

Specs

replace_option() :: [
  :args
  | :max_attempts
  | :meta
  | :priority
  | :queue
  | :scheduled_at
  | :tags
  | :worker
]

Specs

schedule_in_option() ::
  pos_integer()
  | {pos_integer(),
     :second
     | :seconds
     | :minute
     | :minutes
     | :hour
     | :hours
     | :day
     | :days
     | :week
     | :weeks}

Specs

t() :: %Oban.Job{
  __meta__: term(),
  args: args(),
  attempt: non_neg_integer(),
  attempted_at: DateTime.t(),
  attempted_by: [binary()],
  cancelled_at: DateTime.t(),
  completed_at: DateTime.t(),
  conf: Oban.Config.t(),
  conflict?: boolean(),
  discarded_at: DateTime.t(),
  errors: errors(),
  id: pos_integer(),
  inserted_at: DateTime.t(),
  max_attempts: pos_integer(),
  meta: map(),
  priority: pos_integer(),
  queue: binary(),
  replace: [replace_option()],
  scheduled_at: DateTime.t(),
  state: binary(),
  tags: tags(),
  unique: %{
    fields: [unique_field()],
    period: unique_period(),
    states: [unique_state()]
  },
  unsaved_error: %{
    kind: atom(),
    reason: term(),
    stacktrace: Exception.stacktrace()
  },
  worker: binary()
}

Specs

tags() :: [binary()]

Specs

unique_field() :: [:args | :meta | :queue | :worker]

Specs

unique_option() ::
  {:fields, [unique_field()]}
  | {:keys, [atom()]}
  | {:period, unique_period()}
  | {:states, [unique_state()]}

Specs

unique_period() :: pos_integer() | :infinity

Specs

unique_state() :: [
  :available
  | :scheduled
  | :executing
  | :retryable
  | :completed
  | :discarded
  | :cancelled
]

Link to this section Functions

Link to this function

new(args, opts \\ [])

View Source (since 0.1.0)

Specs

new(args(), [option()]) :: changeset()

Construct a new job changeset ready for insertion into the database.

Options

  • :max_attempts — the maximum number of times a job can be retried if there are errors during execution
  • :meta — a map containing additional information about the job
  • :priority — a numerical indicator from 0 to 3 of how important this job is relative to other jobs in the same queue. The lower the number, the higher priority the job.
  • :queue — a named queue to push the job into. Jobs may be pushed into any queue, regardless of whether jobs are currently being processed for the queue.
  • :schedule_in - the number of seconds until the job should be executed or a tuple containing a number and unit
  • :replace - a list of keys to replace on a unique conflict
  • :scheduled_at - a time in the future after which the job should be executed
  • :tags — a list of tags to group and organize related jobs, i.e. to identify scheduled jobs
  • :unique — a keyword list of options specifying how uniqueness will be calculated. The options define which fields will be used, for how long, with which keys, and for which states.
  • :worker — a module to execute the job in. The module must implement the Oban.Worker behaviour.

Examples

Insert a job with the :default queue:

%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: MyApp.Worker)
|> Oban.insert()

Generate a pre-configured job for MyApp.Worker and push it:

%{id: 1, user_id: 2} |> MyApp.Worker.new() |> Oban.insert()

Schedule a job to run in 5 seconds:

%{id: 1} |> MyApp.Worker.new(schedule_in: 5) |> Oban.insert()

Schedule a job to run in 5 minutes:

%{id: 1} |> MyApp.Worker.new(schedule_in: {5, :minutes}) |> Oban.insert()

Insert a job, ensuring that it is unique within the past minute:

%{id: 1} |> MyApp.Worker.new(unique: [period: 60]) |> Oban.insert()

Insert a unique job based only on the worker field, and within multiple states:

fields = [:worker]
states = [:available, :scheduled, :executing, :retryable, :completed]

%{id: 1}
|> MyApp.Worker.new(unique: [fields: fields, period: 60, states: states])
|> Oban.insert()

Insert a unique job considering only the worker and specified keys in the args:

keys = [:account_id, :url]

%{account_id: 1, url: "https://example.com"}
|> MyApp.Worker.new(unique: [fields: [:args, :worker], keys: keys])
|> Oban.insert()

Insert a unique job considering only specified keys in the meta:

unique = [fields: [:meta], keys: [:slug]]

%{id: 1}
|> MyApp.Worker.new(meta: %{slug: "unique-key"}, unique: unique)
|> Oban.insert()

A canonical list of all possible job states.

This may be used to build up :unique options without duplicating states in application code.

Examples

iex> Oban.Job.states() -- [:completed, :discarded]
[:scheduled, :available, :executing, :retryable, :cancelled]
Link to this function

to_map(changeset)

View Source (since 0.9.0)

Specs

to_map(Ecto.Changeset.t(t())) :: map()

Convert a Job changeset into a map suitable for database insertion.

Examples

Convert a worker generated changeset into a plain map:

%{id: 123}
|> MyApp.Worker.new()
|> Oban.Job.to_map()