Oban v0.5.0 Oban.Worker behaviour View Source
Defines a behavior and macro to guide the creation of worker modules.
Worker modules do the work of processing a job. At a minimum they must define a perform/1
function, which will be called with an args
map.
Defining Workers
Define a worker to process jobs in the events
queue:
defmodule MyApp.Workers.Business do
use Oban.Worker, queue: "events", max_attempts: 10
@impl true
def perform(args) do
IO.inspect(args)
end
end
The perform/1
function will always receive a job's args
map. In this example the worker will
simply inspect any arguments that are provided. A job is considered complete if perform/1
returns a non-error value, and it doesn't raise an exception or have an unhandled exit.
Any of these return values or error events will fail the job:
- return
{:error, error}
- return
:error
- an unhandled exception
- an unhandled exit or throw
As an example of error tuple handling, this worker may return an error tuple when the value is less than one:
defmodule MyApp.Workers.ErrorExample do
use Oban.Worker
@impl true
def perform(%{value: value}) do
if value > 1 do
:ok
else
{:error, "invalid value given: " <> inspect(value)"}
end
end
end
Enqueuing Jobs
All workers implement a new/2
function that converts an args map into a job changeset
suitable for inserting into the database for later execution:
%{in_the: "business", of_doing: "business"}
|> MyApp.Workers.Business.new()
|> MyApp.Repo.insert()
The worker's defaults may be overridden by passing options:
%{vote_for: "none of the above"}
|> MyApp.Workers.Business.new(queue: "special", max_attempts: 5)
|> MyApp.Repo.insert()
See Oban.Job
for all available options.
Customizing Backoff
When jobs fail they may be retried again in the future using a backoff algorithm. By default the backoff is exponential with a fixed padding of 15 seconds. This may be too aggressive for jobs that are resource intensive or need more time between retries. To make backoff scheduling flexible a worker module may define a custom backoff function.
This worker defines a backoff function that delays retries using a variant of the historic Resque/Sidekiq algorithm:
defmodule MyApp.SidekiqBackoffWorker do
use Oban.Worker
@impl true
def backoff(attempt) do
:math.pow(attempt, 4) + 15 + :rand.uniform(30) * attempt
end
@impl true
def perform(args) do
:do_business
end
end
Here are some alternative backoff strategies to consider:
- constant — delay by a fixed number of seconds, e.g. 1→15, 2→15, 3→15
- linear — delay for the same number of seconds as the current attempt, e.g. 1→1, 2→2, 3→3
- squared — delay by attempt number squared, e.g. 1→1, 2→4, 3→9
- sidekiq — delay by a base amount plus some jitter, e.g. 1→32, 2→61, 3→135
Link to this section Summary
Callbacks
Calculate the execution backoff, or the number of seconds to wait before retrying a failed job.
Build a job changeset for this worker with optional overrides.
The perform/1
function is called when the job is executed.
Link to this section Callbacks
backoff(attempt)
View Source
backoff(attempt :: pos_integer()) :: pos_integer()
backoff(attempt :: pos_integer()) :: pos_integer()
Calculate the execution backoff, or the number of seconds to wait before retrying a failed job.
new(args, opts)
View Source
new(args :: Oban.Job.args(), opts :: [Oban.Job.option()]) :: Ecto.Changeset.t()
new(args :: Oban.Job.args(), opts :: [Oban.Job.option()]) :: Ecto.Changeset.t()
Build a job changeset for this worker with optional overrides.
See Oban.Job.new/2
for the available options.
perform(args) View Source
The perform/1
function is called when the job is executed.
The function is passed a job's args, which is always a map with string keys.
The return value is not important. If the function executes without raising an exception it is considered a success. If the job raises an exception it is a failure and the job may be scheduled for a retry.