Oban v2.0.0-rc.1 Oban.Worker behaviour View Source

Defines a behavior and macro to guide the creation of worker modules.

Worker modules do the work of processing a job. At a minimum they must define a perform/1 function, which is called with the full Oban.Job struct.

Defining Workers

Worker modules are defined by using Oban.Worker. A bare use Oban.Worker invocation sets a worker with these defaults:

  • :max_attempts — 20
  • :priority — 0
  • :queue:default
  • :unique — no uniqueness set

The following example defines a worker module to process jobs in the events queue. It then dials down the priority from 0 to 1, limits retrying on failures to 10, adds a "business" tag, and ensures that duplicate jobs aren't enqueued within a 30 second period:

defmodule MyApp.Workers.Business do
  use Oban.Worker,
    queue: :events,
    priority: 1,
    max_attempts: 10,
    tags: ["business"],
    unique: [period: 30]

  @impl Oban.Worker
  def perform(%Oban.Job{attempt: attempt}) when attempt > 3 do
    IO.inspect(attempt)
  end

  def perform(job) do
    IO.inspect(job.args)
  end
end

The perform/1 function receives an Oban.Job struct as argument. This allows workers to change the behavior of perform/1 based on attributes of the job, e.g. the number of execution attempts or when it was inserted.

The value returned from perform/1 can control whether the job is a success or a failure:

  • :ok or {:ok, value} — the job is successful; for success tuples the value is ignored
  • :discard — discard the job and prevent it from being retried again
  • {:error, error} — the job failed, record the error and schedule a retry if possible
  • {:snooze, seconds} — consider the job a success and schedule it to run seconds in the future. Snoozing will also increase the max_attempts by one to ensure that the job isn't accidentally discarded before it can run.

In addition to explicit return values, any unhandled exception, exit or throw will fail the job and schedule a retry if possible.

As an example of error tuple handling, this worker will return an error tuple when the value is less than one:

defmodule MyApp.Workers.ErrorExample do
  use Oban.Worker

  @impl Worker
  def perform(%{args: %{"value" => value}}) do
    if value > 1 do
      :ok
    else
      {:error, "invalid value given: " <> inspect(value)}
    end
  end
end

Enqueuing Jobs

All workers implement a new/2 function that converts an args map into a job changeset suitable for inserting into the database for later execution:

%{in_the: "business", of_doing: "business"}
|> MyApp.Workers.Business.new()
|> Oban.insert()

The worker's defaults may be overridden by passing options:

%{vote_for: "none of the above"}
|> MyApp.Workers.Business.new(queue: "special", max_attempts: 5)
|> Oban.insert()

Uniqueness options may also be overridden by passing options:

%{expensive: "business"}
|> MyApp.Workers.Business.new(unique: [period: 120, fields: [:worker]])
|> Oban.insert()

Note that unique options aren't merged, they are overridden entirely.

See Oban.Job for all available options.

Customizing Backoff

When jobs fail they may be retried again in the future using a backoff algorithm. By default the backoff is exponential with a fixed padding of 15 seconds. The default backoff is clamped to a maximum of 24 days, the equivalent of the 20th attempt.

If the default strategy is too aggressive or otherwise unsuited to your app's workload you can define a custom backoff function using the backoff/1 callback.

The following worker defines a backoff/1 function that delays retries using a variant of the historic Resque/Sidekiq algorithm:

defmodule MyApp.SidekiqBackoffWorker do
  use Oban.Worker

  @impl Worker
  def backoff(%Job{attempt: attempt}) do
    trunc(:math.pow(attempt, 4) + 15 + :rand.uniform(30) * attempt)
  end

  @impl Worker
  def perform(_job) do
    :do_business
  end
end

Here are some alternative backoff strategies to consider:

  • constant — delay by a fixed number of seconds, e.g. 1→15, 2→15, 3→15
  • linear — delay for the same number of seconds as the current attempt, e.g. 1→1, 2→2, 3→3
  • squared — delay by attempt number squared, e.g. 1→1, 2→4, 3→9
  • sidekiq — delay by a base amount plus some jitter, e.g. 1→32, 2→61, 3→135

Contextual Backoff

Any error, catch or throw is temporarily recorded in the job's unsaved_error map. The unsaved error map can be used by backoff/1 to calculate a custom backoff based on the exact error that failed the job. In this example the backoff/1 callback checks to see if the error was due to rate limiting and adjusts the backoff accordingly:

defmodule MyApp.ApiWorker do
  use Oban.Worker

  @five_minutes 5 * 60

  @impl Worker
  def perform(%{args: args}) do
    MyApp.make_external_api_call(args)
  end

  @impl Worker
  def backoff(%Job{attempt: attempt, unsaved_error: unsaved_error}) do
    %{kind: _, reason: reason, stacktrace: _} = unsaved_error

    case reason do
      %MyApp.ApiError{status: 429} -> @five_minutes
      _ -> trunc(:math.pow(attempt, 4))
    end
  end
end

Limiting Execution Time

By default, individual jobs may execute indefinitely. If this is undesirable you may define a timeout in milliseconds with the timeout/1 callback on your worker module.

For example, to limit a worker's execution time to 30 seconds:

def MyApp.Worker do
  use Oban.Worker

  @impl Oban.Worker
  def perform(_job) do
    something_that_may_take_a_long_time()

    :ok
  end

  @impl Oban.Worker
  def timeout(_job), do: :timer.seconds(30)
end

The timeout/1 function accepts an Oban.Job struct, so you can customize the timeout using any job attributes.

Define the timeout value through job args:

def timeout(%_{args: %{"timeout" => timeout}}), do: timeout

Define the timeout based on the number of attempts:

def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)

Link to this section Summary

Callbacks

Calculate the execution backoff.

Build a job changeset for this worker with optional overrides.

The perform/1 function is called to execute a job.

Set a job's maximum execution time in milliseconds.

Link to this section Types

Specs

result() ::
  :ok
  | :discard
  | {:ok, ignored :: term()}
  | {:error, reason :: term()}
  | {:snooze, seconds :: pos_integer()}

Specs

t() :: module()

Link to this section Callbacks

Specs

backoff(job :: Oban.Job.t()) :: pos_integer()

Calculate the execution backoff.

In this context backoff specifies the number of seconds to wait before retrying a failed job.

Defaults to an exponential algorithm with a minimum delay of 15 seconds.

Specs

new(args :: Oban.Job.args(), opts :: [Oban.Job.option()]) :: Ecto.Changeset.t()

Build a job changeset for this worker with optional overrides.

See Oban.Job.new/2 for the available options.

Specs

perform(job :: Oban.Job.t()) :: result()

The perform/1 function is called to execute a job.

Each perform/1 function should return :ok or a success tuple. When the return is an error tuple, an uncaught exception or a throw then the error is recorded and the job may be retried if there are any attempts remaining.

Note that the args map provided to perform/1 will always have string keys, regardless of the key type when the job was enqueued. The args are stored as jsonb in PostgreSQL and the serialization process automatically stringifies all keys.

Specs

timeout(job :: Oban.Job.t()) :: :infinity | pos_integer()

Set a job's maximum execution time in milliseconds.

Jobs that exceed the time limit are considered a failure and may be retried.

Defaults to :infinity.