Oban.Worker behaviour (Oban v2.10.1) View Source
Defines a behavior and macro to guide the creation of worker modules.
Worker modules do the work of processing a job. At a minimum they must define a perform/1
function, which is called with the full Oban.Job
struct.
Defining Workers
Worker modules are defined by using Oban.Worker
. A bare use Oban.Worker
invocation sets a
worker with these defaults:
:max_attempts
— 20:priority
— 0:queue
—:default
:unique
— no uniqueness set
The following example defines a worker module to process jobs in the events
queue. It then
dials down the priority from 0 to 1, limits retrying on failures to 10, adds a "business" tag,
and ensures that duplicate jobs aren't enqueued within a 30 second period:
defmodule MyApp.Workers.Business do
use Oban.Worker,
queue: :events,
priority: 1,
max_attempts: 10,
tags: ["business"],
unique: [period: 30]
@impl Oban.Worker
def perform(%Oban.Job{attempt: attempt}) when attempt > 3 do
IO.inspect(attempt)
end
def perform(job) do
IO.inspect(job.args)
end
end
The perform/1
function receives an Oban.Job
struct as argument. This allows workers to
change the behavior of perform/1
based on attributes of the job, e.g. the number of
execution attempts or when it was inserted.
The value returned from perform/1
can control whether the job is a success or a failure:
:ok
or{:ok, value}
— the job is successful; for success tuples thevalue
is ignored:discard
or{:discard, reason}
— discard the job and prevent it from being retried again. An error is recorded using the optional reason, though the job is still successful{:error, error}
— the job failed, record the error and schedule a retry if possible{:snooze, seconds}
— mark the job assnoozed
and schedule it to run againseconds
in the future. See Snoozing for more details.
In addition to explicit return values, any unhandled exception, exit or throw will fail the job and schedule a retry if possible.
As an example of error tuple handling, this worker will return an error tuple when the value
is less than one:
defmodule MyApp.Workers.ErrorExample do
use Oban.Worker
@impl Worker
def perform(%{args: %{"value" => value}}) do
if value > 1 do
:ok
else
{:error, "invalid value given: " <> inspect(value)}
end
end
end
The error tuple is wrapped in an Oban.PerformError
with a formatted message. The error tuple
itself is available through the exception's :reason
field.
Enqueuing Jobs
All workers implement a new/2
function that converts an args map into a job changeset
suitable for inserting into the database for later execution:
%{in_the: "business", of_doing: "business"}
|> MyApp.Workers.Business.new()
|> Oban.insert()
The worker's defaults may be overridden by passing options:
%{vote_for: "none of the above"}
|> MyApp.Workers.Business.new(queue: "special", max_attempts: 5)
|> Oban.insert()
Uniqueness options may also be overridden by passing options:
%{expensive: "business"}
|> MyApp.Workers.Business.new(unique: [period: 120, fields: [:worker]])
|> Oban.insert()
Note that unique
options aren't merged, they are overridden entirely.
See Oban.Job
for all available options.
Customizing Backoff
When jobs fail they may be retried again in the future using a backoff algorithm. By default the backoff is exponential with a fixed padding of 15 seconds and a small amount of jitter. The jitter helps to prevent jobs that fail simultaneously from consistently retrying at the same time. The default backoff is clamped to a maximum of 24 days, the equivalent of the 20th attempt.
If the default strategy is too aggressive or otherwise unsuited to your app's workload you can
define a custom backoff function using the backoff/1
callback.
The following worker defines a backoff/1
function that delays retries using a variant of the
historic Resque/Sidekiq algorithm:
defmodule MyApp.SidekiqBackoffWorker do
use Oban.Worker
@impl Worker
def backoff(%Job{attempt: attempt}) do
trunc(:math.pow(attempt, 4) + 15 + :rand.uniform(30) * attempt)
end
@impl Worker
def perform(_job) do
:do_business
end
end
Here are some alternative backoff strategies to consider:
- constant — delay by a fixed number of seconds, e.g. 1→15, 2→15, 3→15
- linear — delay for the same number of seconds as the current attempt, e.g. 1→1, 2→2, 3→3
- squared — delay by attempt number squared, e.g. 1→1, 2→4, 3→9
- sidekiq — delay by a base amount plus some jitter, e.g. 1→32, 2→61, 3→135
Contextual Backoff
Any error, catch or throw is temporarily recorded in the job's unsaved_error
map. The unsaved
error map can be used by backoff/1
to calculate a custom backoff based on the exact error
that failed the job. In this example the backoff/1
callback checks to see if the error was
due to rate limiting and adjusts the backoff accordingly:
defmodule MyApp.ApiWorker do
use Oban.Worker
@five_minutes 5 * 60
@impl Worker
def perform(%{args: args}) do
MyApp.make_external_api_call(args)
end
@impl Worker
def backoff(%Job{attempt: attempt, unsaved_error: unsaved_error}) do
%{kind: _, reason: reason, stacktrace: _} = unsaved_error
case reason do
%MyApp.ApiError{status: 429} -> @five_minutes
_ -> trunc(:math.pow(attempt, 4))
end
end
end
Snoozing jobs
When returning {:snooze, snooze_time}
in perform/1
, the job is postponed for at
least snooze_time
seconds. Snoozing is done by incrementing the job's max_attempts
field and
scheduling execution for snooze_time
seconds in the future.
Snoozing does not change the number of retries remaining on the job, but it does increment the attempt
number each time the job snoozes, which will affect the default backoff exponential retry
algorithm. In the example below the backoff/1
callback compensates for snoozing:
defmodule MyApp.SnoozingWorker do
@max_attempts 20
use Oban.Worker, max_attempts: @max_attempts
@impl Worker
def backoff(%Job{} = job) do
corrected_attempt = @max_attempts - (job.max_attempts - job.attempt)
Worker.backoff(%{job | attempt: corrected_attempt})
end
@impl Worker
def perform(job) do
if MyApp.something?(job), do: :ok, else: {:snooze, 60}
end
end
Limiting Execution Time
By default, individual jobs may execute indefinitely. If this is undesirable you may define a
timeout in milliseconds with the timeout/1
callback on your worker module.
For example, to limit a worker's execution time to 30 seconds:
def MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(_job) do
something_that_may_take_a_long_time()
:ok
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(30)
end
The timeout/1
function accepts an Oban.Job
struct, so you can customize the timeout using
any job attributes.
Define the timeout
value through job args:
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
Define the timeout
based on the number of attempts:
def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)
Link to this section Summary
Callbacks
Calculate the execution backoff.
Build a job changeset for this worker with optional overrides.
The perform/1
function is called to execute a job.
Set a job's maximum execution time in milliseconds.
Link to this section Types
Specs
result() :: :ok | :discard | {:discard, reason :: term()} | {:ok, ignored :: term()} | {:error, reason :: term()} | {:snooze, seconds :: pos_integer()}
Specs
t() :: module()
Link to this section Callbacks
Specs
backoff(job :: Oban.Job.t()) :: pos_integer()
Calculate the execution backoff.
In this context backoff specifies the number of seconds to wait before retrying a failed job.
Defaults to an exponential algorithm with a minimum delay of 15 seconds and a small amount of jitter.
Specs
new(args :: Oban.Job.args(), opts :: [Oban.Job.option()]) :: Oban.Job.changeset()
Build a job changeset for this worker with optional overrides.
See Oban.Job.new/2
for the available options.
Specs
perform(job :: Oban.Job.t()) :: result()
The perform/1
function is called to execute a job.
Each perform/1
function should return :ok
or a success tuple. When the return is an error
tuple, an uncaught exception or a throw then the error is recorded and the job may be retried if
there are any attempts remaining.
Note that the args
map provided to perform/1
will always have string keys, regardless of
the key type when the job was enqueued. The args
are stored as jsonb
in PostgreSQL and the
serialization process automatically stringifies all keys.
Specs
timeout(job :: Oban.Job.t()) :: :infinity | pos_integer()
Set a job's maximum execution time in milliseconds.
Jobs that exceed the time limit are considered a failure and may be retried.
Defaults to :infinity
.
Link to this section Functions
Specs
from_string(String.t()) :: {:ok, module()} | {:error, Exception.t()}
Resolve a module from a worker string.
Examples
iex> Oban.Worker.from_string("Oban.Integration.Worker")
{:ok, Oban.Integration.Worker}
iex> defmodule NotAWorker, do: []
...> Oban.Worker.from_string("NotAWorker")
{:error, %RuntimeError{message: "module is not a worker: NotAWorker"}}
iex> Oban.Worker.from_string("RandomWorker")
{:error, %RuntimeError{message: "unknown worker: RandomWorker"}}
Specs
Return a string representation of a worker module.
This is particularly useful for normalizing worker names when building custom Ecto queries.
Examples
iex> Oban.Worker.to_string(MyApp.SomeWorker)
"MyApp.SomeWorker"
iex> Oban.Worker.to_string(Elixir.MyApp.SomeWorker)
"MyApp.SomeWorker"
iex> Oban.Worker.to_string("Elixir.MyApp.SomeWorker")
"MyApp.SomeWorker"