Oban v1.1.0 Oban View Source

Oban is a robust job processing library which uses PostgreSQL for storage and coordination.

Each Oban instance is a supervision tree and not an application. That means it won't be started automatically and must be included in your application's supervision tree. All of your configuration is passed into the supervisor, allowing you to configure Oban like the rest of your application:

# config/config.exs
config :my_app, Oban,
  repo: MyApp.Repo,
  prune: {:maxlen, 10_000},
  queues: [default: 10, events: 50, media: 20]

# lib/my_app/application.ex
defmodule MyApp.Application do
  @moduledoc false

  use Application

  alias MyApp.{Endpoint, Repo}

  def start(_type, _args) do
    children = [
      Repo,
      Endpoint,
      {Oban, Application.get_env(:my_app, Oban)}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
  end
end

If you are running tests (which you should be) you'll want to disable pruning and job dispatching altogether when testing:

# config/test.exs
config :my_app, Oban, crontab: false, queues: false, prune: :disabled

Without dispatch and pruning disabled Ecto will raise constant ownership errors and you won't be able to run tests.

Configuring Queues

Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:

queues: [default: 10, mailers: 20, events: 50, media: 5]

There isn't a limit to the number of queues or how many jobs may execute concurrently in each queue. Here are a few caveats and guidelines:

Caveats & Guidelines

  • Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.

  • Queue limits are local (per-node), not global (per-cluster). For example, running a queue with a local limit of one on three separate nodes is effectively a global limit of three. If you require a global limit you must restrict the number of nodes running a particular queue.

  • Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.

  • Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don't apply when using ports or shelling out commands.

Defining Workers

Worker modules do the work of processing a job. At a minimum they must define a perform/2 function, which is called with an args map and the job struct.

Note that the args map passed to perform/2 will always have string keys, regardless of the key type when the job was enqueued. The args are stored as jsonb in PostgreSQL and the serialization process automatically stringifies all keys.

Define a worker to process jobs in the events queue:

defmodule MyApp.Business do
  use Oban.Worker, queue: :events

  @impl Oban.Worker
  def perform(%{"id" => id} = args, _job) do
    model = MyApp.Repo.get(MyApp.Business.Man, id)

    case args do
      %{"in_the" => "business"} ->
        IO.inspect(model)

      %{"vote_for" => vote} ->
        IO.inspect([vote, model])

      _ ->
        IO.inspect(model)
    end

    :ok
  end
end

The use macro also accepts options to customize max attempts, priority, tags, and uniqueness:

defmodule MyApp.LazyBusiness do
  use Oban.Worker,
    queue: :events,
    priority: 3,
    max_attempts: 3,
    tags: ["business"],
    unique: [period: 30]

  @impl Oban.Worker
  def perform(_args, _job) do
    # do business slowly

    :ok
  end
end

The value returned from perform/2 is ignored, unless it an {:error, reason} tuple. With an error return or when perform has an uncaught exception or throw then the error is reported and the job is retried (provided there are attempts remaining).

See the Oban.Worker docs for more details on failure conditions and Oban.Telemetry for details on job reporting.

Enqueueing Jobs

Jobs are simply Ecto structs and are enqueued by inserting them into the database. For convenience and consistency all workers provide a new/2 function that converts an args map into a job changeset suitable for insertion:

%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()

The worker's defaults may be overridden by passing options:

%{id: 1, vote_for: "none of the above"}
|> MyApp.Business.new(queue: :special, max_attempts: 5)
|> Oban.insert()

Jobs may be scheduled at a specific datetime in the future:

%{id: 1}
|> MyApp.Business.new(scheduled_at: ~U[2020-12-25 19:00:56.0Z])
|> Oban.insert()

Jobs may also be scheduled down to the second any time in the future:

%{id: 1}
|> MyApp.Business.new(schedule_in: 5)
|> Oban.insert()

Unique jobs can be configured in the worker, or when the job is built:

%{email: "brewster@example.com"}
|> MyApp.Mailer.new(unique: [period: 300, fields: [:queue, :worker])
|> Oban.insert()

Job priority can be specified using an integer from 0 to 3, with 0 being the default and highest priority:

%{id: 1}
|> MyApp.Backfiller.new(priority: 2)
|> Oban.insert()

Any number of tags can be added to a job dynamically, at the time it is inserted:

id = 1

%{id: id}
|> MyApp.OnboardMailer.new(tags: ["mailer", "record-#{id}"])
|> Oban.insert()

Multiple jobs can be inserted in a single transaction:

Ecto.Multi.new()
|> Oban.insert(:b_job, MyApp.Business.new(%{id: 1}))
|> Oban.insert(:m_job, MyApp.Mailer.new(%{email: "brewser@example.com"}))
|> Repo.transaction()

Occasionally you may need to insert a job for a worker that exists in another application. In that case you can use Oban.Job.new/2 to build the changeset manually:

%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: OtherApp.Worker)
|> Oban.insert()

Oban.insert/2,4 is the preferred way of inserting jobs as it provides some of Oban's advanced features (i.e., unique jobs). However, you can use your application's Repo.insert/2 function if necessary.

See Oban.Job.new/2 for a full list of job options.

Pruning Historic Jobs

Job stats and queue introspection are built on keeping job rows in the database after they have completed. This allows administrators to review completed jobs and build informative aggregates, at the expense of storage and an unbounded table size. To prevent the oban_jobs table from growing indefinitely, Oban provides active pruning of completed and discarded jobs.

By default, pruning retains a conservatively low 1,000 jobs. Pruning is configured with the :prune option. There are three distinct modes of pruning:

  • :disabled - No pruning happens at all, primarily useful for testing.

  • {:maxlen, count} - Pruning is based on the number of rows in the table, any rows beyond the configured count may be deleted. This is the default mode.

  • {:maxage, seconds} - Pruning is based on a row's age, any rows older than the configured number of seconds are deleted. The age unit is always specified in seconds, but values on the scale of days, weeks or months are perfectly acceptable.

Caveats & Guidelines

  • Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified length or age may not be pruned immediately after jobs complete. Prune timing is based on the configured prune_interval, which is one minute by default.

  • If you're using a row-limited database service, like Heroku's hobby plan with 10M rows, and you have pruning :disabled, you could hit that row limit quickly by filling up the oban_beats table. Instead of fully disabling pruning, consider setting a far-out limit: {:maxage, 60 * 60 * 24 * 365} (1 year). You will get the benefit of retaining completed and discarded jobs for a year without an unwieldy beats table.

  • Pruning is only applied to jobs that are completed or discarded (has reached the maximum number of retries or has been manually killed). It'll never delete a new job, a scheduled job or a job that failed and will be retried.

Unique Jobs

The unique jobs feature lets you specify constraints to prevent enqueuing duplicate jobs. Uniquness is based on a combination of args, queue, worker, state and insertion time. It is configured at the worker or job level using the following options:

  • :period — The number of seconds until a job is no longer considered duplicate. You should always specify a period.

  • :fields — The fields to compare when evaluating uniqueness. The available fields are :args, :queue and :worker, by default all three are used.

  • :states — The job states that are checked for duplicates. The available states are :available, :scheduled, :executing, :retryable and :completed. By default all states are checked, which prevents any duplicates, even if the previous job has been completed.

For example, configure a worker to be unique across all fields and states for 60 seconds:

use Oban.Worker, unique: [period: 60]

Configure the worker to be unique only by :worker and :queue:

use Oban.Worker, unique: [fields: [:queue, :worker], period: 60]

Or, configure a worker to be unique until it has executed:

use Oban.Worker, unique: [period: 300, states: [:available, :scheduled, :executing]]

Strong Guarantees

Unique jobs are guaranteed through transactional locks and database queries: they do not rely on unique constraints in the database. This makes uniquness entirely configurable by application code, without the need for database migrations.

Performance Note

If your application makes heavy use of unique jobs you may want to add an index on the args column of the oban_jobs table. The other columns considered for uniqueness are already covered by indexes.

Periodic Jobs

Oban allows jobs to be registered with a cron-like schedule and enqueued automatically. Periodic jobs are registered as a list of {cron, worker} or {cron, worker, options} tuples:

config :my_app, Oban, repo: MyApp.Repo, crontab: [
  {"* * * * *", MyApp.MinuteWorker},
  {"0 * * * *", MyApp.HourlyWorker, args: %{custom: "arg"}},
  {"0 0 * * *", MyApp.DailyWorker, max_attempts: 1},
  {"0 12 * * MON", MyApp.MondayWorker, queue: :scheduled, tags: ["mondays"]}
]

These jobs would be executed as follows:

  • MyApp.MinuteWorker — Executed once every minute
  • MyApp.HourlyWorker — Executed at the first minute of every hour with custom args
  • MyApp.DailyWorker — Executed at midnight every day with no retries
  • MyApp.MondayWorker — Executed at noon every Monday in the "scheduled" queue

The crontab format respects all standard rules and has one minute resolution. Jobs are considered unique for most of each minute, which prevents duplicate jobs with multiple nodes and across node restarts.

Cron Expressions

Standard Cron expressions are composed of rules specifying the minutes, hours, days, months and weekdays. Rules for each field are comprised of literal values, wildcards, step values or ranges:

  • * — Wildcard, matches any value (0, 1, 2, ...)
  • 0 — Literal, matches only itself (only 0)
  • */15 — Step, matches any value that is a multiple (0, 15, 30, 45)
  • 0-5 — Range, matches any value within the range (0, 1, 2, 3, 4, 5)

Each part may have multiple rules, where rules are separated by a comma. The allowed values for each field are as follows:

  • minute — 0-59
  • hour — 0-23
  • days — 1-31
  • month — 1-12 (or aliases, JAN, FEB, MAR, etc.)
  • weekdays — 0-6 (or aliases, SUN, MON, TUE, etc.)

Some specific examples that demonstrate the full range of expressions:

  • 0 * * * * — The first minute of every hour
  • */15 9-17 * * * — Every fifteen minutes during standard business hours
  • 0 0 * DEC * — Once a day at midnight during december
  • 0 7-9,4-6 13 * FRI — Once an hour during both rush hours on Friday the 13th

For more in depth information see the man documentation for cron and crontab in your system. Alternatively you can experiment with various expressions online at Crontab Guru.

Caveats & Guidelines

  • All schedules are evaluated as UTC unless a different timezone is configured. See Oban.start_link/1 for information about configuring a timezone.

  • Workers can be used for regular and scheduled jobs so long as they accept different arguments.

  • Duplicate jobs are prevented through transactional locks and unique constraints. Workers that are used for regular and scheduled jobs must not specify unique options less than 60s.

  • Long running jobs may execute simultaneously if the scheduling interval is shorter than it takes to execute the job. You can prevent overlap by passing custom unique opts in the crontab config:

  custom_args = %{scheduled: true}

  unique_opts = [
    period: 60 * 60 * 24,
    states: [:available, :scheduled, :executing]
  ]

  config :my_app, Oban, repo: MyApp.Repo, crontab: [
    {"* * * * *", MyApp.SlowWorker, args: custom_args, unique: unique_opts},
  ]

Prioritizing Jobs

Normally, all available jobs within a queue are executed in the order they were scheduled. You can override the normal behavior and prioritize or de-prioritize a job by assigning a numerical priority.

  • Priorities from 0-3 are allowed, where 0 is the highest priority and 3 is the lowest.

  • The default priority is 0, unless specified all jobs have an equally high priority.

  • All jobs with a higher priority will execute before any jobs with a lower priority. Within a particular priority jobs are executed in their scheduled order.

Testing

Oban provides some helpers to facilitate testing. The helpers handle the boilerplate of making assertions on which jobs are enqueued. To use the assert_enqueued/1 and refute_enqueued/1 helpers in your tests you must include them in your testing module and specify your app's Ecto repo:

use Oban.Testing, repo: MyApp.Repo

Now you can assert, refute or list jobs that have been enqueued within your tests:

assert_enqueued worker: MyWorker, args: %{id: 1}

# or

refute_enqueued queue: :special, args: %{id: 2}

# or

assert [%{args: %{"id" => 1}}] = all_enqueued worker: MyWorker

See the Oban.Testing module for more details.

Caveats & Guidelines

As noted in Usage, there are some guidelines for running tests:

  • Disable all job dispatching by setting queues: false or queues: nil in your test.exs config. Keyword configuration is deep merged, so setting queues: [] won't have any effect.

  • Disable pruning via prune: :disabled. Pruning isn't necessary in testing mode because jobs created within the sandbox are rolled back at the end of the test. Additionally, the periodic pruning queries will raise DBConnection.OwnershipError when the application boots.

  • Disable cron jobs via crontab: false. Periodic jobs aren't useful while testing and scheduling can lead to random ownership issues.

  • Be sure to use the Ecto Sandbox for testing. Oban makes use of database pubsub events to dispatch jobs, but pubsub events never fire within a transaction. Since sandbox tests run within a transaction no events will fire and jobs won't be dispatched.

  config :my_app, MyApp.Repo, pool: Ecto.Adapters.SQL.Sandbox

Integration Testing

During integration testing it may be necessary to run jobs because they do work essential for the test to complete, i.e. sending an email, processing media, etc. You can execute all available jobs in a particular queue by calling Oban.drain_queue/1 directly from your tests.

For example, to process all pending jobs in the "mailer" queue while testing some business logic:

defmodule MyApp.BusinessTest do
  use MyApp.DataCase, async: true

  alias MyApp.{Business, Worker}

  test "we stay in the business of doing business" do
    :ok = Business.schedule_a_meeting(%{email: "monty@brewster.com"})

    assert %{success: 1, failure: 0} == Oban.drain_queue(:mailer)

    # Now, make an assertion about the email delivery
  end
end

See Oban.drain_queue/1 for additional details.

Error Handling

When a job returns an error value, raises an error or exits during execution the details are recorded within the errors array on the job. When the number of execution attempts is below the configured max_attempts limit, the job will automatically be retried in the future.

The retry delay has an exponential backoff, meaning the job's second attempt will be after 16s, third after 31s, fourth after 1m 36s, etc.

See the Oban.Worker documentation on "Customizing Backoff" for alternative backoff strategies.

Error Details

Execution errors are stored as a formatted exception along with metadata about when the failure ocurred and which attempt caused it. Each error is stored with the following keys:

  • at The utc timestamp when the error occurred at
  • attempt The attempt number when the error ocurred
  • error A formatted error message and stacktrace

See the Instrumentation docs for an example of integrating with external error reporting systems.

Limiting Retries

By default, jobs are retried up to 20 times. The number of retries is controlled by the max_attempts value, which can be set at the Worker or Job level. For example, to instruct a worker to discard jobs after three failures:

use Oban.Worker, queue: :limited, max_attempts: 3

Limiting Execution Time

By default, individual jobs may execute indefinitely. If this is undesirable you may define a timeout in milliseconds with the timeout/1 callback on your worker module.

For example, to limit a worker's execution time to 30 seconds:

def MyApp.Worker do
  use Oban.Worker

  @impl Oban.Worker
  def perform(_args, _job) do
    something_that_may_take_a_long_time()

    :ok
  end

  @impl Oban.Worker
  def timeout(_job), do: :timer.seconds(30)
end

The timeout/1 function accepts an Oban.Job struct, so you can customize the timeout using any job attributes.

Define the timeout value through job args:

def timeout(%_{args: %{"timeout" => timeout}}), do: timeout

Define the timeout based on the number of attempts:

def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)

Instrumentation and Logging

Oban provides integration with Telemetry, a dispatching library for metrics. It is easy to report Oban metrics to any backend by attaching to :oban events.

Here is an example of a sample unstructured log handler:

defmodule MyApp.ObanLogger do
  require Logger

  def handle_event([:oban, event], measure, meta, nil) when do
    Logger.warn("[Oban #{event}]: #{meta.worker} ran in #{measure.duration}")
  end
end

Attach the handler to success and failure events in application.ex:

events = [[:oban, :success], [:oban, :failure]]

:telemetry.attach_many("oban-logger", events, &MyApp.ObanLogger.handle_event/4, nil)

The Oban.Telemetry module provides a robust structured logger that handles all of Oban's telemetry events. As in the example above, attach it within your application.ex module:

:ok = Oban.Telemetry.attach_default_logger()

For more details on the default structured logger and information on event metadata see docs for the Oban.Telemetry module.

Reporting Errors

Another great use of execution data is error reporting. Here is an example of integrating with Honeybadger to report job failures:

defmodule ErrorReporter do
  def handle_event([:oban, :failure], measure, meta, _) do
    context =
      meta
      |> Map.take([:id, :args, :queue, :worker])
      |> Map.merge(measure)

    Honeybadger.notify(meta.error, context, meta.stack)
  end

  def handle_event([:oban, :trip_circuit], _measure, meta, _) do
    context = Meta.take(meta, [:name])

    Honeybadger.notify(meta.error, context, meta.stack)
  end
end

:telemetry.attach_many(
  "oban-errors",
  [[:oban, :failure], [:oban, :trip_circuit]],
  &ErrorReporter.handle_event/4,
  nil
)

Isolation

Oban supports namespacing through PostgreSQL schemas, also called "prefixes" in Ecto. With prefixes your jobs table can reside outside of your primary schema (usually public) and you can have multiple separate job tables.

To use a prefix you first have to specify it within your migration:

defmodule MyApp.Repo.Migrations.AddPrefixedObanJobsTable do
  use Ecto.Migration

  def up do
    Oban.Migrations.up(prefix: "private")
  end

  def down do
    Oban.Migrations.down(prefix: "private")
  end
end

The migration will create the "private" schema and all tables, functions and triggers within that schema. With the database migrated you'll then specify the prefix in your configuration:

config :my_app, Oban,
  prefix: "private",
  repo: MyApp.Repo,
  queues: [default: 10]

Now all jobs are inserted and executed using the private.oban_jobs table. Note that Oban.insert/2,4 will write jobs in the private.oban_jobs table, you'll need to specify a prefix manually if you insert jobs directly through a repo.

Supervisor Isolation

Not only is the oban_jobs table isolated within the schema, but all notification events are also isolated. That means that insert/update events will only dispatch new jobs for their prefix. You can run multiple Oban instances with different prefixes on the same system and have them entirely isolated, provided you give each supervisor a distinct id.

Here we configure our application to start three Oban supervisors using the "public", "special" and "private" prefixes, respectively:

def start(_type, _args) do
  children = [
    Repo,
    Endpoint,
    Supervisor.child_spec({Oban, name: ObanA, repo: Repo}, id: ObanA),
    Supervisor.child_spec({Oban, name: ObanB, repo: Repo, prefix: "special"}, id: ObanB),
    Supervisor.child_spec({Oban, name: ObanC, repo: Repo, prefix: "private"}, id: ObanC)
  ]

  Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end

Pulse Tracking

Historic introspection is a defining feature of Oban. In addition to retaining completed jobs Oban also generates "heartbeat" records every second for each running queue.

Heartbeat records are recorded in the oban_beats table and pruned to five minutes of backlog. The recorded information is used for a couple of purposes:

  1. To track active jobs. When a job executes it records the node and queue that ran it in the attempted_by column. Zombie jobs (jobs that were left executing when a producer crashes or the node is shut down) are found by comparing the attempted_by values with recent heartbeat records and resurrected accordingly.
  2. Each heartbeat records information about a node/queue pair such as whether it is paused, what the execution limit is and exactly which jobs are running. These records can power additional logging or metrics (and are the backbone of the Oban UI).

Troubleshooting

Querying the Jobs Table

Oban.Job defines an Ecto schema and the jobs table can therefore be queried as usual, e.g.:

MyApp.Repo.all(
  from j in Oban.Job,
    where: j.worker == "MyApp.Business",
    where: j.state == "discarded"
)

Heroku

If your app crashes on launch, be sure to confirm you are running the correct version of Elixir and Erlang (view requirements). If using the hashnuke/elixir buildpack, you can update the elixir_buildpack.config file in your application's root directory to something like:

# Elixir version
elixir_version=1.9.0

# Erlang version
erlang_version=22.0.3

Available Erlang versions are available here.

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Retrieve the config struct for a named Oban supervision tree.

Synchronously execute all available jobs in a queue.

Synchronously execute all available jobs in a queue.

Synchronously execute all available jobs in a queue. All execution happens within the current process and it is guaranteed not to raise an error or exit.

Insert a new job into the database for execution.

Similar to insert/2, but raises an Ecto.InvalidChangesetError if the job can't be inserted.

Insert multiple jobs into the database for execution.

Kill an actively executing job and mark it as discarded, ensuring that it won't be retried.

Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.

Resume executing jobs in a paused queue.

Scale the concurrency for a queue.

Starts an Oban supervision tree linked to the current process.

Start a new supervised queue across all connected nodes.

Shutdown a queue's supervision tree and stop running jobs for that queue across all running nodes.

Link to this section Types

Link to this type

drain_option()

View Source
drain_option() :: {:with_scheduled, boolean()}
Link to this type

drain_result()

View Source
drain_result() :: %{success: non_neg_integer(), failure: non_neg_integer()}
Link to this type

option()

View Source
option() ::
  {:beats_maxage, pos_integer()}
  | {:circuit_backoff, timeout()}
  | {:crontab, [Oban.Config.cronjob()]}
  | {:dispatch_cooldown, pos_integer()}
  | {:name, module()}
  | {:node, binary()}
  | {:poll_interval, pos_integer()}
  | {:prefix, binary()}
  | {:prune, :disabled | {:maxlen, pos_integer()} | {:maxage, pos_integer()}}
  | {:prune_interval, pos_integer()}
  | {:prune_limit, pos_integer()}
  | {:queues, [{atom(), pos_integer()}]}
  | {:repo, module()}
  | {:rescue_after, pos_integer()}
  | {:rescue_interval, pos_integer()}
  | {:shutdown_grace_period, timeout()}
  | {:timezone, Calendar.time_zone()}
  | {:verbose, false | Logger.level()}
Link to this type

queue_name()

View Source
queue_name() :: atom() | binary()

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

config(name \\ __MODULE__)

View Source (since 0.2.0)
config(name :: atom()) :: Oban.Config.t()

Retrieve the config struct for a named Oban supervision tree.

Link to this function

drain_queue(queue)

View Source (since 0.4.0)
drain_queue(queue :: atom() | binary()) :: drain_result()

Synchronously execute all available jobs in a queue.

See drain_queue/3.

Link to this function

drain_queue(queue, opts)

View Source (since 0.4.0)

Synchronously execute all available jobs in a queue.

See drain_queue/3.

Link to this function

drain_queue(name, queue, opts)

View Source (since 0.4.0)
drain_queue(name :: atom(), queue :: queue_name(), [drain_option()]) ::
  drain_result()

Synchronously execute all available jobs in a queue. All execution happens within the current process and it is guaranteed not to raise an error or exit.

Draining a queue from within the current process is especially useful for testing. Jobs that are enqueued by a process when Ecto is in sandbox mode are only visible to that process. Calling drain_queue/3 allows you to control when the jobs are executed and to wait synchronously for all jobs to complete.

Example

Drain a queue with three available jobs, two of which succeed and one of which fails:

Oban.drain_queue(:default)
%{success: 2, failure: 1}

Failures & Retries

Draining a queue uses the same execution mechanism as regular job dispatch. That means that any job failures or crashes are captured and result in a retry. Retries are scheduled in the future with backoff and won't be retried immediately.

Exceptions are not raised in to the calling process. If you expect jobs to fail, would like to track failures, or need to check for specific errors you can use one of these mechanisms:

  • Check for side effects from job execution
  • Use telemetry events to track success and failure
  • Check the database for jobs with errors

Scheduled Jobs

By default, drain_queue/3 will execute all currently available jobs. In order to execute scheduled jobs, you may pass the :with_scheduled flag which will cause scheduled jobs to be marked as available beforehand.

# This will execute all scheduled jobs.
Oban.drain_queue(:default, with_scheduled: true)
%{success: 1, failure: 0}
Link to this function

insert(name \\ __MODULE__, changeset)

View Source (since 0.7.0)
insert(name :: atom(), changeset :: Ecto.Changeset.t(Oban.Job.t())) ::
  {:ok, Oban.Job.t()} | {:error, Ecto.Changeset.t()}

Insert a new job into the database for execution.

This and the other insert variants are the recommended way to enqueue jobs because they support features like unique jobs.

See the section on "Unique Jobs" for more details.

Example

Insert a single job:

{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}))

Insert a job while ensuring that it is unique within the past 30 seconds:

{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}, unique: [period: 30]))
Link to this function

insert(name \\ __MODULE__, multi, multi_name, changeset)

View Source (since 0.7.0)
insert(
  name :: atom(),
  multi :: Ecto.Multi.t(),
  multi_name :: term(),
  changeset :: Ecto.Changeset.t(Oban.Job.t())
) :: Ecto.Multi.t()

Put a job insert operation into an Ecto.Multi.

Like insert/2, this variant is recommended over Ecto.Multi.insert beause it supports all of Oban's features, i.e. unique jobs.

See the section on "Unique Jobs" for more details.

Example

Ecto.Multi.new()
|> Oban.insert("job-1", MyApp.Worker.new(%{id: 1}))
|> Oban.insert("job-2", MyApp.Worker.new(%{id: 2}))
|> MyApp.Repo.transaction()
Link to this function

insert!(name \\ __MODULE__, changeset)

View Source (since 0.7.0)
insert!(name :: atom(), changeset :: Ecto.Changeset.t(Oban.Job.t())) ::
  Oban.Job.t()

Similar to insert/2, but raises an Ecto.InvalidChangesetError if the job can't be inserted.

Example

job = Oban.insert!(MyApp.Worker.new(%{id: 1}))
Link to this function

insert_all(name \\ __MODULE__, changesets)

View Source (since 0.9.0)
insert_all(name :: atom(), jobs :: [Ecto.Changeset.t(Oban.Job.t())]) :: [
  Oban.Job.t()
]

Insert multiple jobs into the database for execution.

Insertion respects prefix and verbose settings, but it does not use per-job unique configuration. You must use insert/2,4 or insert!/2 for per-job unique support.

There are a few important differences between this function and Ecto.Repo.insert_all/3:

  1. This function always returns a list rather than a tuple of {count, records}
  2. This function requires a list of changesets rather than a list of maps or keyword lists

Example

1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all()
Link to this function

insert_all(name \\ __MODULE__, multi, multi_name, changesets)

View Source (since 0.9.0)
insert_all(
  name :: atom(),
  multi :: Ecto.Multi.t(),
  multi_name :: term(),
  changeset :: [Ecto.Changeset.t(Oban.Job.t())]
) :: Ecto.Multi.t()

Put an insert_all operation into an Ecto.Multi.

This function supports the same features and has the same caveats as insert_all/2.

Example

changesets = Enum.map(0..100, MyApp.Worker.new(%{id: &1}))

Ecto.Multi.new()
|> Oban.insert_all(:jobs, changesets)
|> MyApp.Repo.transaction()
Link to this function

kill_job(name \\ __MODULE__, job_id)

View Source (since 0.2.0)
kill_job(name :: atom(), job_id :: pos_integer()) :: :ok

Kill an actively executing job and mark it as discarded, ensuring that it won't be retried.

If the job happens to fail before it can be killed the state is set to discarded. However, if it manages to complete successfully then the state will still be completed.

Example

Kill a long running job with an id of 1:

Oban.kill_job(1)
:ok
Link to this function

pause_queue(name \\ __MODULE__, queue)

View Source (since 0.2.0)
pause_queue(name :: atom(), queue :: queue_name()) :: :ok

Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.

When shutdown begins all queues are paused.

Example

Pause the default queue:

Oban.pause_queue(:default)
:ok
Link to this function

resume_queue(name \\ __MODULE__, queue)

View Source (since 0.2.0)
resume_queue(name :: atom(), queue :: queue_name()) :: :ok

Resume executing jobs in a paused queue.

Example

Resume a paused default queue:

Oban.resume_queue(:default)
:ok
Link to this function

scale_queue(name \\ __MODULE__, queue, scale)

View Source (since 0.2.0)
scale_queue(name :: atom(), queue :: queue_name(), scale :: pos_integer()) ::
  :ok

Scale the concurrency for a queue.

Example

Scale a queue up, triggering immediate execution of queued jobs:

Oban.scale_queue(:default, 50)
:ok

Scale the queue back down, allowing executing jobs to finish:

Oban.scale_queue(:default, 5)
:ok
Link to this function

start_link(opts)

View Source (since 0.1.0)
start_link([option()]) :: Supervisor.on_start()

Starts an Oban supervision tree linked to the current process.

Options

These options are required; without them the supervisor won't start

  • :name — used for supervisor registration, defaults to Oban
  • :repo — specifies the Ecto repo used to insert and retrieve jobs.

Primary Options

These options determine what the system does at a high level, i.e. which queues to run or how vigorously to prune.

  • :crontab — a list of cron expressions that enqueue jobs on a periodic basis. See "Periodic (CRON) Jobs" in the module docs.

    For testing purposes :crontab may be set to false or nil, which disables scheduling.

  • :node — used to identify the node that the supervision tree is running in. If no value is provided it will use the node name in a distributed system, or the hostname in an isolated node. See "Node Name" below.

  • :prefix — the query prefix, or schema, to use for inserting and executing jobs. An oban_jobs table must exist within the prefix. See the "Prefix Support" section in the module documentation for more details.

  • :prune - configures job pruning behavior, see "Pruning Historic Jobs" for more information. Defaults to {:maxlen, 1_000}, meaning the jobs table will retain roughly 1,000 completed jobs.

  • :queues — a keyword list where the keys are queue names and the values are the concurrency setting. For example, setting queues to [default: 10, exports: 5] would start the queues default and exports with a combined concurrency level of 20. The concurrency setting specifies how many jobs each queue will run concurrently.

    For testing purposes :queues may be set to false or nil, which effectively disables all job dispatching.

  • :timezone — which timezone to use when scheduling cron jobs. To use a timezone other than the default of "Etc/UTC" you must have a timezone database like tzdata installed and configured.

  • :verbose — either false to disable logging or a standard log level (:error, :warn, :info, :debug). This determines whether queries are logged or not; overriding the repo's configured log level. Defaults to false, where no queries are logged.

Twiddly Options

Additional options used to tune system behaviour. These are primarily useful for testing or troubleshooting and don't usually need modification.

  • :beats_maxage — the number of seconds that heartbeat rows in the oban_beats table should be retained. The value must be greater than 60 (the value of rescue_after). Defaults to 300s, or five minutes.

  • :circuit_backoff — the number of milliseconds until queries are attempted after a database error. All processes communicating with the database are equipped with circuit breakers and will use this for the backoff. Defaults to 30_000ms.

  • :dispatch_cooldown — the minimum number of milliseconds a producer will wait before fetching and running more jobs. A slight cooldown period prevents a producer from flooding with messages and thrashing the database. The cooldown period directly impacts a producer's throughput: jobs per second for a single queue is calculated by (1000 / cooldown) * limit. For example, with a 5ms cooldown and a queue limit of 25 a single queue can run 2,500 jobs/sec.

    The default is 5ms and the minimum is 1ms, which is likely faster than the database can return new jobs to run.

  • :poll_interval - the number of milliseconds between polling for new jobs in a queue. This is directly tied to the resolution of scheduled jobs. For example, with a poll_interval of 5_000ms, scheduled jobs are checked every 5 seconds. The default is 1_000ms.

  • :prune_interval — the number of milliseconds between calls to prune historic jobs. The default is 60_000ms, or one minute.

  • :prune_limit – the maximum number of jobs that can be pruned at each prune interval. The default is 5_000.

  • :rescue_after — the number of seconds after an executing job without any pulse activity may be rescued. This value must be greater than the poll_interval. The default is 60s.

  • :rescue_interval — the number of milliseconds between calls to rescue orphaned jobs, the default is 60_000ms, or one minute.

  • :shutdown_grace_period - the amount of time a queue will wait for executing jobs to complete before hard shutdown, specified in milliseconds. The default is 15_000, or 15 seconds.

Examples

To start an Oban supervisor within an application's supervision tree:

def start(_type, _args) do
  children = [MyApp.Repo, {Oban, queues: [default: 50]}]

  Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end

Node Name

When the node value hasn't been configured it is generated based on the environment:

  • In a distributed system the node name is used
  • In a Heroku environment the system environment's DYNO value is used
  • Otherwise, the system hostname is used
Link to this function

start_queue(name \\ __MODULE__, queue, limit)

View Source (since 0.12.0)
start_queue(name :: atom(), queue :: queue_name(), limit :: pos_integer()) ::
  :ok

Start a new supervised queue across all connected nodes.

Example

Start the :priority queue with a concurrency limit of 10.

Oban.start_queue(:priority, 10)
:ok
Link to this function

stop_queue(name \\ __MODULE__, queue)

View Source (since 0.12.0)
stop_queue(name :: atom(), queue :: queue_name()) :: :ok

Shutdown a queue's supervision tree and stop running jobs for that queue across all running nodes.

The shutdown process pauses the queue first and allows current jobs to exit gracefully, provided they finish within the shutdown limit.

Example

Oban.stop_queue(:default)
:ok