Oban v0.5.0 Oban View Source

Oban isn't an application and won't be started automatically. It is started by a supervisor that must be included in your application's supervision tree. All of your configuration is passed into the Oban supervisor, allowing you to configure Oban like the rest of your application.

# confg/config.exs
config :my_app, Oban,
  repo: MyApp.Repo,
  queues: [default: 10, events: 50, media: 20]

# lib/my_app/application.ex
defmodule MyApp.Application do
  @moduledoc false

  use Application

  alias MyApp.{Endpoint, Repo}

  def start(_type, _args) do
    children = [
      Repo,
      Endpoint,
      {Oban, Application.get_env(:my_app, Oban)}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
  end
end

Configuring Queues

Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:

queues: [default: 10, mailers: 20, events: 50, media: 5]

There isn't a limit to the number of queues or how many jobs may execute concurrently. Here are a few caveats and guidelines:

  • Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.
  • Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.
  • Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don't apply when using ports or shelling out commands.

Creating Workers

Worker modules do the work of processing a job. At a minimum they must define a perform/1 function, which is called with an args map.

Define a worker to process jobs in the events queue:

defmodule MyApp.Workers.Business do
  use Oban.Worker, queue: "events", max_attempts: 10

  @impl Oban.Worker
  def perform(%{"id" => id}) do
    model = MyApp.Repo.get(MyApp.Business.Man, id)

    IO.inspect(model)
  end
end

The value returned from perform/1 is ignored, unless it an {:error, reason} tuple. With an error return or when perform has an uncaught exception or throw then the error will be reported and the job will be retried (provided there are attempts remaining).

See Oban.Worker for more details on failure conditions and Oban.Telemetry for details on job reporting.

Enqueueing Jobs

Jobs are simply Ecto structs and are enqueued by inserting them into the database. For convenience and consistency all workers provide a new/2 function that converts an args map into a job changeset suitable for insertion:

%{in_the: "business", of_doing: "business"}
|> MyApp.Workers.Business.new()
|> MyApp.Repo.insert()

The worker's defaults may be overridden by passing options:

%{vote_for: "none of the above"}
|> MyApp.Workers.Business.new(queue: "special", max_attempts: 5)
|> MyApp.Repo.insert()

Jobs may also be scheduled down to the second any time in the future:

%{id: 1}
|> MyApp.Workers.Business.new(schedule_in: 5)
|> MyApp.Repo.insert()

Occasionally you may need to insert a job for a worker that exists in another application. In that case you can use Oban.Job.new/2 to build the changeset manually:

%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: OtherApp.Worker)
|> MyApp.Repo.insert()

See Oban.Job.new/2 for a full list of job options.

Testing

As noted in the Usage section above there are some guidelines for running tests:

  • Disable all job dispatching by setting queues: false or queues: nil in your test.exs config. Keyword configuration is deep merged, so setting queues: [] won't have any effect.

  • Disable pruning via prune: :disabled. Pruning isn't necessary in testing mode because jobs created within the sandbox are rolled back at the end of the test. Additionally, the periodic pruning queries will raise DBConnection.OwnershipError when the application boots.

  • Be sure to use the Ecto Sandbox for testing. Oban makes use of database pubsub events to dispatch jobs, but pubsub events never fire within a transaction. Since sandbox tests run within a transaction no events will fire and jobs won't be dispatched.

  config :my_app, MyApp.Repo, pool: Ecto.Adapters.SQL.Sandbox

Oban provides some helpers to facilitate testing. The helpers handle the boilerplate of making assertions on which jobs are enqueued. To use the assert_enqueued/1 and refute_enqueued/1 helpers in your tests you must include them in your testing module and specify your app's Ecto repo:

use Oban.Testing, repo: MyApp.Repo

Now you can assert or refute that jobs have been enqueued within your tests:

assert_enqueued worker: MyWorker, args: %{id: 1}

# or

refute_enqueued queue: "special", args: %{id: 2}

See the Oban.Testing module for more details on making assertions.

Integration Testing

During integration testing it may be necessary to run jobs because they do work essential for the test to complete, i.e. sending an email, processing media, etc. You can execute all available jobs in a particular queue by calling Oban.drain_queue/1 directly from your tests.

For example, to process all pending jobs in the "mailer" queue while testing some business logic:

defmodule MyApp.BusinessTest do
  use MyApp.DataCase, async: true

  alias MyApp.{Business, Worker}

  test "we stay in the business of doing business" do
    :ok = Business.schedule_a_meeting(%{email: "monty@brewster.com"})

    assert %{success: 1, failure: 0} == Oban.drain_queue(:mailer)

    # Make an assertion about the email delivery
  end
end

See Oban.drain_queue/1 for additional details.

Error Handling

When a job returns an error value, raises an error or exits during execution the details are recorded within the errors array on the job. When the number of execution attempts is below the configured max_attempts limit, the job will automatically be retried in the future.

The retry delay has an exponential backoff, meaning the job's second attempt will be after 16s, third after 31s, fourth after 1m 36s, etc.

See the Oban.Worker documentation on "Customizing Backoff" for alternative backoff strategies.

Error Details

Execution errors are stored as a formatted exception along with metadata about when the failure ocurred and which attempt caused it. Each error is stored with the following keys:

  • at The utc timestamp when the error occurred at
  • attempt The attempt number when the error ocurred
  • error A formatted error message and stacktrace

See the Instrumentation docs below for an example of integrating with external error reporting systems.

Limiting Retries

By default jobs will be retried up to 20 times. The number of retries is controlled by the max_attempts value, which can be set at the Worker or Job level. For example, to instruct a worker to discard jobs after three failures:

use Oban.Worker, queue: "limited", max_attempts: 3

Pruning Historic Jobs

Job stats and queue introspection is built on keeping job rows in the database after they have completed. This allows administrators to review completed jobs and build informative aggregates, but at the expense of storage and an unbounded table size. To prevent the oban_jobs table from growing indefinitely, Oban provides active pruning of completed jobs.

By default, pruning is disabled. To enable pruning we configure a supervision tree with the :prune option. There are three distinct modes of pruning:

  • :disabled - This is the default, where no pruning happens at all
  • {:maxlen, count} - Pruning is based on the number of rows in the table, any rows beyond the configured count will be deleted
  • {:maxage, seconds} - Pruning is based on a row's age, any rows older than the configured number of seconds will be deleted. The age unit is always specified in seconds, but values on the scale of days, weeks or months are perfectly acceptable.

Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified length or age may not be pruned immediately after jobs complete. Prune timing is based on the configured prune_interval, which is one minute by default.

Only jobs in a completed or discarded state will be deleted. Currently executing jobs and older jobs that are still in the available state will be retained.

Instrumentation & Logging

Oban provides integration with Telemetry, a dispatching library for metrics. It is easy to report Oban metrics to any backend by attaching to :oban events.

For instructions on using the default structured logger and information on event metadata see docs for the Oban.Telemetry module.

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Retreive the current config struct.

Synchronously execute all available jobs in a queue. All execution happens within the current process and it is guaranteed not to raise an error or exit.

Kill an actively executing job and mark it as discarded, ensuring that it won't be retried.

Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.

Resume executing jobs in a paused queue.

Scale the concurrency for a queue.

Starts an Oban supervision tree linked to the current process.

Link to this section Types

Link to this type

option() View Source
option() ::
  {:name, module()}
  | {:node, binary()}
  | {:poll_interval, pos_integer()}
  | {:prune, :disabled | {:maxlen, pos_integer()} | {:maxage, pos_integer()}}
  | {:prune_interval, pos_integer()}
  | {:prune_limit, pos_integer()}
  | {:queues, [{atom(), pos_integer()}]}
  | {:repo, module()}
  | {:shutdown_grace_period, timeout()}

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Retreive the current config struct.

Link to this function

drain_queue(queue) View Source (since 0.4.0)
drain_queue(queue :: atom() | binary()) :: %{
  success: non_neg_integer(),
  failure: non_neg_integer()
}

Synchronously execute all available jobs in a queue. All execution happens within the current process and it is guaranteed not to raise an error or exit.

Draining a queue from within the current process is especially useful for testing. Jobs that are enqueued by a process when Ecto is in sandbox mode are only visible to that process. Calling drain_queue/1 allows you to control when the jobs are executed and to wait synchronously for all jobs to complete.

Failures & Retries

Draining a queue uses the same execution mechanism as regular job dispatch. That means that any job failures or crashes will be captured and result in a retry. Retries are scheduled in the future with backoff and won't be retried immediately.

Exceptions are not raised in to the calling process. If you expect jobs to fail, would like to track failures, or need to check for specific errors you can use one of these mechanisms:

  • Check for side effects from job execution
  • Use telemetry events to track success and failure
  • Check the database for jobs with errors

Example

Drain a queue with three available jobs, two of which succeed and one of which fails:

Oban.drain_queue(:default)
%{success: 2, failure: 1}
Link to this function

kill_job(job_id) View Source (since 0.2.0)
kill_job(job_id :: pos_integer()) :: :ok

Kill an actively executing job and mark it as discarded, ensuring that it won't be retried.

If the job happens to fail before it can be killed the state is set to discarded. However, if it manages to complete successfully then the state will still be completed.

Example

Kill a long running job with an id of 1:

Oban.kill_job(1)
:ok
Link to this function

pause_queue(queue) View Source (since 0.2.0)
pause_queue(queue :: atom()) :: :ok

Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.

When shutdown begins all queues are paused.

Example

Pause the default queue:

Oban.pause_queue(:default)
:ok
Link to this function

resume_queue(queue) View Source (since 0.2.0)
resume_queue(queue :: atom()) :: :ok

Resume executing jobs in a paused queue.

Example

Resume a paused default queue:

Oban.resume_queue(:default)
:ok
Link to this function

scale_queue(queue, scale) View Source (since 0.2.0)
scale_queue(queue :: atom(), scale :: pos_integer()) :: :ok

Scale the concurrency for a queue.

Example

Scale a queue up, triggering immediate execution of queued jobs:

Oban.scale_queue(:default, 50)
:ok

Scale the queue back down, allowing executing jobs to finish:

Oban.scale_queue(:default, 5)
:ok
Link to this function

start_link(opts) View Source (since 0.1.0)
start_link([option()]) :: Supervisor.on_start()

Starts an Oban supervision tree linked to the current process.

Options

  • :name — used for name supervisor registration

  • :node — used to identify the node that the supervision tree is running in. If no value is provided it will use the node name in a distributed system, the hostname in an isolated node. See "Node Name" below.

  • :repo — specifies the Ecto repo used to insert and retreive jobs.

  • :queues — a keyword list where the keys are queue names and the values are the concurrency setting. For example, setting queues to [default: 10, exports: 5] would start the queues default and exports with a combined concurrency level of 20. The concurrency setting specifies how many jobs each queue will run concurrently.

    For testing purposes :queues may be set to false or nil, which effectively disables all job dispatching.

  • :poll_interval - the number of milliseconds between polling for new jobs in a queue. This is directly tied to the resolution of scheduled jobs. For example, with a poll_interval of 5_000ms, scheduled jobs are checked every 5 seconds. The default is 1_000ms.

  • :prune - configures job pruning behavior, see "Pruning Historic Jobs" for more information

  • :prune_interval — the number of milliseconds between calls to prune historic jobs. The default is 60_000ms, or one minute.

  • :prune_limit – the maximum number of jobs that will be pruned at each prune interval. The default is 5_000.

  • :shutdown_grace_period - the amount of time a queue will wait for executing jobs to complete before hard shutdown, specified in milliseconds. The default is 15_000, or 15 seconds.

Note that any options passed to start_link will override options set through the using macro.

Examples

To start an Oban supervisor within an application's supervision tree:

def start(_type, _args) do
  children = [MyApp.Repo, {Oban, queues: [default: 50]}]

  Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end

Node Name

When the node value hasn't been configured it will be generated based on the environment:

  • In a distributed system the node name is used
  • In a Heroku environment the system environment's DYNO value is used
  • Otherwise, the system hostname is used