Oban (Oban v2.4.0) View Source
Oban is a robust job processing library which uses PostgreSQL for storage and coordination.
Each Oban instance is a supervision tree and not an application. That means it won't be started automatically and must be included in your application's supervision tree. All of your configuration is passed into the supervisor, allowing you to configure Oban like the rest of your application:
# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
plugins: [Oban.Plugins.Pruner],
queues: [default: 10, events: 50, media: 20]
# lib/my_app/application.ex
defmodule MyApp.Application do
@moduledoc false
use Application
alias MyApp.Repo
alias MyAppWeb.Endpoint
def start(_type, _args) do
children = [
Repo,
Endpoint,
{Oban, oban_config()}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
# Conditionally disable queues or plugins here.
defp oban_config do
Application.get_env(:my_app, Oban)
end
end
If you are running tests (which you should be) you'll want to disable plugins, enqueueing scheduled jobs and job dispatching altogether when testing:
# config/test.exs
config :my_app, Oban, queues: false, plugins: false
See the installation instructions in the README or on the Hexdocs guide for details on how to migrate your database.
Configuring Queues
Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:
queues: [default: 10, mailers: 20, events: 50, media: 5]
You may also use an expanded form to configure queues with individual overrides:
queues: [
default: 10,
mailers: [limit: 20, poll_interval: :timer.seconds(30)],
events: [limit: 50, paused: true]
]
Here we've declared the mailers
with the same limit as above, but it will only
poll to deschedule jobs every 30 seconds. The events
queue will start in a
paused state, which means it won't process anything until Oban.resume_queue/2
is called to start it.
There isn't a limit to the number of queues or how many jobs may execute concurrently in each queue. Some additional guidelines:
Caveats & Guidelines
Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.
Queue limits are local (per-node), not global (per-cluster). For example, running a queue with a local limit of one on three separate nodes is effectively a global limit of three. If you require a global limit you must restrict the number of nodes running a particular queue.
Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.
Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don't apply when using ports or shelling out commands.
Defining Workers
Worker modules do the work of processing a job. At a minimum they must define a
perform/1
function, which is called with an %Oban.Job{}
struct.
Note that the args
field of the job struct will always have string keys,
regardless of the key type when the job was enqueued. The args
are stored as
jsonb
in PostgreSQL and the serialization process automatically stringifies
all keys.
Define a worker to process jobs in the events
queue:
defmodule MyApp.Business do
use Oban.Worker, queue: :events
@impl Oban.Worker
def perform(%Oban.Job{args: %{"id" => id} = args}) do
model = MyApp.Repo.get(MyApp.Business.Man, id)
case args do
%{"in_the" => "business"} ->
IO.inspect(model)
%{"vote_for" => vote} ->
IO.inspect([vote, model])
_ ->
IO.inspect(model)
end
:ok
end
end
The use
macro also accepts options to customize max attempts, priority, tags,
and uniqueness:
defmodule MyApp.LazyBusiness do
use Oban.Worker,
queue: :events,
priority: 3,
max_attempts: 3,
tags: ["business"],
unique: [period: 30]
@impl Oban.Worker
def perform(_job) do
# do business slowly
:ok
end
end
Successful jobs should return :ok
or an {:ok, value}
tuple. The value
returned from perform/1
is used to control whether the job is treated as a
success, a failure, discarded completely or deferred until later.
See the Oban.Worker
docs for more details on failure conditions and
Oban.Telemetry
for details on job reporting.
Enqueueing Jobs
Jobs are simply Ecto structs and are enqueued by inserting them into the
database. For convenience and consistency all workers provide a new/2
function that converts an args map into a job changeset suitable for insertion:
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
The worker's defaults may be overridden by passing options:
%{id: 1, vote_for: "none of the above"}
|> MyApp.Business.new(queue: :special, max_attempts: 5)
|> Oban.insert()
Jobs may be scheduled at a specific datetime in the future:
%{id: 1}
|> MyApp.Business.new(scheduled_at: ~U[2020-12-25 19:00:56.0Z])
|> Oban.insert()
Jobs may also be scheduled down to the second any time in the future:
%{id: 1}
|> MyApp.Business.new(schedule_in: 5)
|> Oban.insert()
Unique jobs can be configured in the worker, or when the job is built:
%{email: "brewster@example.com"}
|> MyApp.Mailer.new(unique: [period: 300, fields: [:queue, :worker])
|> Oban.insert()
Job priority can be specified using an integer from 0 to 3, with 0 being the default and highest priority:
%{id: 1}
|> MyApp.Backfiller.new(priority: 2)
|> Oban.insert()
Any number of tags can be added to a job dynamically, at the time it is inserted:
id = 1
%{id: id}
|> MyApp.OnboardMailer.new(tags: ["mailer", "record-#{id}"])
|> Oban.insert()
Multiple jobs can be inserted in a single transaction:
Ecto.Multi.new()
|> Oban.insert(:b_job, MyApp.Business.new(%{id: 1}))
|> Oban.insert(:m_job, MyApp.Mailer.new(%{email: "brewser@example.com"}))
|> Repo.transaction()
Occasionally you may need to insert a job for a worker that exists in another
application. In that case you can use Oban.Job.new/2
to build the changeset
manually:
%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: OtherApp.Worker)
|> Oban.insert()
Oban.insert/2,4
is the preferred way of inserting jobs as it provides some of
Oban's advanced features (i.e., unique jobs). However, you can use your
application's Repo.insert/2
function if necessary.
See Oban.Job.new/2
for a full list of job options.
Pruning Historic Jobs
Job stats and queue introspection are built on keeping job rows in the database
after they have completed. This allows administrators to review completed jobs
and build informative aggregates, at the expense of storage and an unbounded
table size. To prevent the oban_jobs
table from growing indefinitely, Oban
provides active pruning of completed
, cancelled
and discarded
jobs.
By default, the Pruner
plugin retains jobs for 60 seconds. You can configure a
longer retention period by providing a max_age
in seconds to the Pruner
plugin.
# Set the max_age for 5 minutes
config :my_app, Oban,
plugins: [{Oban.Plugins.Pruner, max_age: 300}]
...
Caveats & Guidelines
Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified age may not be pruned immediately after jobs complete.
Pruning is only applied to jobs that are
completed
,cancelled
ordiscarded
. It'll never delete a new job, a scheduled job or a job that failed and will be retried.
Unique Jobs
The unique jobs feature lets you specify constraints to prevent enqueueing
duplicate jobs. Uniqueness is based on a combination of args
, queue
,
worker
, state
and insertion time. It is configured at the worker or job
level using the following options:
:period
— The number of seconds until a job is no longer considered duplicate. You should always specify a period.:infinity
can be used to indicate the job be considered a duplicate as long as jobs are retained.:fields
— The fields to compare when evaluating uniqueness. The available fields are:args
,:queue
and:worker
, by default all three are used.:keys
— A specific subset of the:args
to consider when comparing against historic jobs. This allows a job with multiple key/value pairs in the args to be compared using only a subset of them.:states
— The job states that are checked for duplicates. The available states are:available
,:scheduled
,:executing
,:retryable
,:completed
,:cancelled
and:discarded
. By default all states except for:discarded
and:cancelled
are checked, which prevents duplicates even if the previous job has been completed.
For example, configure a worker to be unique across all fields and states for 60 seconds:
use Oban.Worker, unique: [period: 60]
Configure the worker to be unique only by :worker
and :queue
:
use Oban.Worker, unique: [fields: [:queue, :worker], period: 60]
Or, configure a worker to be unique until it has executed:
use Oban.Worker, unique: [period: 300, states: [:available, :scheduled, :executing]]
Only consider the :url
key rather than the entire args
:
use Oban.Worker, unique: [fields: [:args, :worker], keys: [:url]]
You can use Oban.Job.states/0
to specify uniqueness across all states,
including :discarded
:
use Oban.Worker, unique: [period: 300, states: Oban.Job.states()]
Strong Guarantees
Unique jobs are guaranteed through transactional locks and database queries: they do not rely on unique constraints in the database. This makes uniqueness entirely configurable by application code, without the need for database migrations.
Performance Note
If your application makes heavy use of unique jobs you may want to add an index
on the args
column of the oban_jobs
table. The other columns considered for
uniqueness are already covered by indexes.
Periodic Jobs
Oban's Cron
plugin registers workers a cron-like schedule and enqueues jobs
automatically. Periodic jobs are declared as a list of {cron, worker}
or
{cron, worker, options}
tuples:
config :my_app, Oban,
repo: MyApp.Repo,
plugins: [
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", MyApp.MinuteWorker},
{"0 * * * *", MyApp.HourlyWorker, args: %{custom: "arg"}},
{"0 0 * * *", MyApp.DailyWorker, max_attempts: 1},
{"0 12 * * MON", MyApp.MondayWorker, queue: :scheduled, tags: ["mondays"]},
{"@daily", MyApp.AnotherDailyWorker}
]}
]
The crontab would insert jobs as follows:
MyApp.MinuteWorker
— Inserted once every minuteMyApp.HourlyWorker
— Inserted at the first minute of every hour with custom argsMyApp.DailyWorker
— Inserted at midnight every day with no retriesMyApp.MondayWorker
— Inserted at noon every Monday in the "scheduled" queueMyApp.AnotherDailyWorker
— Inserted at midnight every day with no retries
The crontab format respects all standard rules and has one minute resolution. Jobs are considered unique for most of each minute, which prevents duplicate jobs with multiple nodes and across node restarts.
Like other jobs, recurring jobs will use the :queue
specified by the worker
module (or :default
if one is not specified).
Cron Expressions
Standard Cron expressions are composed of rules specifying the minutes, hours, days, months and weekdays. Rules for each field are comprised of literal values, wildcards, step values or ranges:
*
— Wildcard, matches any value (0, 1, 2, ...)0
— Literal, matches only itself (only 0)*/15
— Step, matches any value that is a multiple (0, 15, 30, 45)0-5
— Range, matches any value within the range (0, 1, 2, 3, 4, 5)0-9/2
- Step values can be used in conjunction with ranges (0, 2, 4, 6, 8)
Each part may have multiple rules, where rules are separated by a comma. The allowed values for each field are as follows:
minute
— 0-59hour
— 0-23days
— 1-31month
— 1-12 (or aliases,JAN
,FEB
,MAR
, etc.)weekdays
— 0-6 (or aliases,SUN
,MON
,TUE
, etc.)
The following Cron extensions are supported:
@hourly
—0 * * * *
@daily
(as well as@midnight
) —0 0 * * *
@weekly
—0 0 * * 0
@monthly
—0 0 1 * *
@yearly
(as well as@annually
) —0 0 1 1 *
@reboot
— Run once at boot across the entire cluster
Some specific examples that demonstrate the full range of expressions:
0 * * * *
— The first minute of every hour*/15 9-17 * * *
— Every fifteen minutes during standard business hours0 0 * DEC *
— Once a day at midnight during december0 7-9,4-6 13 * FRI
— Once an hour during both rush hours on Friday the 13th
For more in depth information see the man documentation for cron
and crontab
in your system. Alternatively you can experiment with various expressions
online at Crontab Guru.
Caveats & Guidelines
All schedules are evaluated as UTC unless a different timezone is provided. See
Oban.Plugins.Cron
for information about configuring a timezone.Workers can be used for regular and scheduled jobs so long as they accept different arguments.
Duplicate jobs are prevented through transactional locks and unique constraints. Workers that are used for regular and scheduled jobs must not specify
unique
options less than60s
.Long running jobs may execute simultaneously if the scheduling interval is shorter than it takes to execute the job. You can prevent overlap by passing custom
unique
opts in the crontab config:custom_args = %{scheduled: true} unique_opts = [ period: 60 * 60 * 24, states: [:available, :scheduled, :executing] ] config :my_app, Oban, repo: MyApp.Repo, plugins: [ {Oban.Plugins.Cron, crontab: [ {"* * * * *", MyApp.SlowWorker, args: custom_args, unique: unique_opts} ]} ]
Prioritizing Jobs
Normally, all available jobs within a queue are executed in the order they were
scheduled. You can override the normal behavior and prioritize or de-prioritize
a job by assigning a numerical priority
.
Priorities from 0-3 are allowed, where 0 is the highest priority and 3 is the lowest.
The default priority is 0, unless specified all jobs have an equally high priority.
All jobs with a higher priority will execute before any jobs with a lower priority. Within a particular priority jobs are executed in their scheduled order.
Testing
Oban provides some helpers to facilitate testing. The helpers handle the
boilerplate of making assertions on which jobs are enqueued. To use the
perform_job/2,3
, assert_enqueued/1
and refute_enqueued/1
helpers in your
tests you must include them in your testing module and specify your app's Ecto
repo:
use Oban.Testing, repo: MyApp.Repo
Now you can assert, refute or list jobs that have been enqueued within your integration tests:
assert_enqueued worker: MyWorker, args: %{id: 1}
# or
refute_enqueued queue: :special, args: %{id: 2}
# or
assert [%{args: %{"id" => 1}}] = all_enqueued worker: MyWorker
You can also easily unit test workers with the perform_job/2,3
function, which
automates validating job args, options, and worker results from a single
function call:
assert :ok = perform_job(MyWorker, %{id: 1})
# or
assert :ok = perform_job(MyWorker, %{id: 1}, attempt: 3, max_attempts: 3)
# or
assert {:error, _} = perform_job(MyWorker, %{bad: :arg})
See the Oban.Testing
module for more details.
Caveats & Guidelines
As noted in Usage, there are some guidelines for running tests:
Disable all job dispatching by setting
queues: false
orqueues: nil
in yourtest.exs
config. Keyword configuration is deep merged, so settingqueues: []
won't have any effect.Disable plugins via
plugins: false
. Default plugins, such as the fixed pruner, aren't necessary in testing mode because jobs created within the sandbox are rolled back at the end of the test. Additionally, the periodic pruning queries will raiseDBConnection.OwnershipError
when the application boots.Be sure to use the Ecto Sandbox for testing. Oban makes use of database pubsub events to dispatch jobs, but pubsub events never fire within a transaction. Since sandbox tests run within a transaction no events will fire and jobs won't be dispatched.
config :my_app, MyApp.Repo, pool: Ecto.Adapters.SQL.Sandbox
Integration Testing
During integration testing it may be necessary to run jobs because they do work
essential for the test to complete, i.e. sending an email, processing media,
etc. You can execute all available jobs in a particular queue by calling
Oban.drain_queue/1,2
directly from your tests.
For example, to process all pending jobs in the "mailer" queue while testing some business logic:
defmodule MyApp.BusinessTest do
use MyApp.DataCase, async: true
alias MyApp.{Business, Worker}
test "we stay in the business of doing business" do
:ok = Business.schedule_a_meeting(%{email: "monty@brewster.com"})
assert %{success: 1, failure: 0} == Oban.drain_queue(queue: :mailer)
# Now, make an assertion about the email delivery
end
end
See Oban.drain_queue/1,2
for additional details.
Error Handling
When a job returns an error value, raises an error or exits during execution the
details are recorded within the errors
array on the job. When the number of
execution attempts is below the configured max_attempts
limit, the job will
automatically be retried in the future.
The retry delay has an exponential backoff, meaning the job's second attempt will be after 16s, third after 31s, fourth after 1m 36s, etc.
See the Oban.Worker
documentation on "Customizing Backoff" for alternative
backoff strategies.
Error Details
Execution errors are stored as a formatted exception along with metadata about when the failure occurred and which attempt caused it. Each error is stored with the following keys:
at
The utc timestamp when the error occurred atattempt
The attempt number when the error occurrederror
A formatted error message and stacktrace
See the Instrumentation docs for an example of integrating with external error reporting systems.
Limiting Retries
By default, jobs are retried up to 20 times. The number of retries is controlled
by the max_attempts
value, which can be set at the Worker or Job level. For
example, to instruct a worker to discard jobs after three failures:
use Oban.Worker, queue: :limited, max_attempts: 3
Limiting Execution Time
By default, individual jobs may execute indefinitely. If this is undesirable you
may define a timeout in milliseconds with the timeout/1
callback on your
worker module.
For example, to limit a worker's execution time to 30 seconds:
def MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(_job) do
something_that_may_take_a_long_time()
:ok
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(30)
end
The timeout/1
function accepts an Oban.Job
struct, so you can customize the
timeout using any job attributes.
Define the timeout
value through job args:
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
Define the timeout
based on the number of attempts:
def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)
Instrumentation and Logging
Oban provides integration with Telemetry, a dispatching library for
metrics. It is easy to report Oban metrics to any backend by attaching to
:oban
events.
Here is an example of a sample unstructured log handler:
defmodule MyApp.ObanLogger do
require Logger
def handle_event([:oban, :job, :start], measure, meta, _) do
Logger.warn("[Oban] :started #{meta.worker} at #{measure.system_time}")
end
def handle_event([:oban, :job, event], measure, meta, _) do
Logger.warn("[Oban] #{event} #{meta.worker} ran in #{measure.duration}")
end
end
Attach the handler to success and failure events in application.ex
:
events = [[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]]
:telemetry.attach_many("oban-logger", events, &MyApp.ObanLogger.handle_event/4, [])
The Oban.Telemetry
module provides a robust structured logger that handles all
of Oban's telemetry events. As in the example above, attach it within your
application.ex
module:
:ok = Oban.Telemetry.attach_default_logger()
For more details on the default structured logger and information on event
metadata see docs for the Oban.Telemetry
module.
Reporting Errors
Another great use of execution data is error reporting. Here is an example of integrating with Sentry to report job failures:
defmodule ErrorReporter do
def handle_event([:oban, :job, :exception], measure, meta, _) do
extra =
meta
|> Map.take([:id, :args, :queue, :worker])
|> Map.merge(measure)
Sentry.capture_exception(meta.error, stacktrace: meta.stacktrace, extra: extra)
end
def handle_event([:oban, :circuit, :trip], _measure, meta, _) do
Sentry.capture_exception(meta.error, stacktrace: meta.stacktrace, extra: meta)
end
end
:telemetry.attach_many(
"oban-errors",
[[:oban, :job, :exception], [:oban, :circuit, :trip]],
&ErrorReporter.handle_event/4,
%{}
)
You can use exception events to send error reports to Honeybadger, Rollbar, AppSignal or any other application monitoring platform.
Isolation
Oban supports namespacing through PostgreSQL schemas, also called "prefixes" in Ecto. With prefixes your jobs table can reside outside of your primary schema (usually public) and you can have multiple separate job tables.
To use a prefix you first have to specify it within your migration:
defmodule MyApp.Repo.Migrations.AddPrefixedObanJobsTable do
use Ecto.Migration
def up do
Oban.Migrations.up(prefix: "private")
end
def down do
Oban.Migrations.down(prefix: "private")
end
end
The migration will create the "private" schema and all tables, functions and triggers within that schema. With the database migrated you'll then specify the prefix in your configuration:
config :my_app, Oban,
prefix: "private",
repo: MyApp.Repo,
queues: [default: 10]
Now all jobs are inserted and executed using the private.oban_jobs
table. Note
that Oban.insert/2,4
will write jobs in the private.oban_jobs
table, you'll
need to specify a prefix manually if you insert jobs directly through a repo.
Supervisor Isolation
Not only is the oban_jobs
table isolated within the schema, but all
notification events are also isolated. That means that insert/update events will
only dispatch new jobs for their prefix. You can run multiple Oban instances
with different prefixes on the same system and have them entirely isolated,
provided you give each supervisor a distinct id.
Here we configure our application to start three Oban supervisors using the "public", "special" and "private" prefixes, respectively:
def start(_type, _args) do
children = [
Repo,
Endpoint,
{Oban, name: ObanA, repo: Repo},
{Oban, name: ObanB, repo: Repo, prefix: "special"},
{Oban, name: ObanC, repo: Repo, prefix: "private"}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
Dynamic Repositories
Oban supports Ecto dynamic repositories through the
:get_dynamic_repo
option. To make this work, you need to run a separate Oban
instance per each dynamic repo instance. Most often it's worth bundling each
Oban and repo instance under the same supervisor:
def start_repo_and_oban(instance_id) do
children = [
{MyDynamicRepo, name: nil, url: repo_url(instance_id)},
{Oban, name: instance_id, get_dynamic_repo: fn -> repo_pid(instance_id) end}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
The function repo_pid/1
must return the pid of the repo for the given
instance. You can use Registry
to register the repo (for example in the repo's
init/2
callback) and discover it.
Link to this section Summary
Functions
Cancel an available
, scheduled
or retryable
job and mark it as discarded
to prevent it
from running. If the job is currently executing
it will be killed and otherwise it is ignored.
Check the current state of a queue producer.
Returns a specification to start this module under a supervisor.
Retrieve the config struct for a named Oban supervision tree.
Synchronously execute all available jobs in a queue.
Insert a new job into the database for execution.
Put a job insert operation into an Ecto.Multi
.
Similar to insert/2
, but raises an Ecto.InvalidChangesetError
if the job can't be inserted.
Insert multiple jobs into the database for execution.
Put an insert_all
operation into an Ecto.Multi
.
Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.
Resume executing jobs in a paused queue.
Sets a job as available
, adding attempts if already maxed out. If the job is currently
available
, executing
or scheduled
it will be ignored. The job is scheduled for immediate
execution.
Scale the concurrency for a queue.
Starts an Oban
supervision tree linked to the current process.
Start a new supervised queue.
Shutdown a queue's supervision tree and stop running jobs for that queue.
Returns the pid of the root oban process for the given name.
Link to this section Types
Specs
drain_option() :: {:queue, queue_name()} | {:with_safety, boolean()} | {:with_scheduled, boolean()}
Specs
drain_result() :: %{failure: non_neg_integer(), success: non_neg_integer()}
Specs
job_changeset() :: Ecto.Changeset.t(Oban.Job.t())
Specs
name() :: term()
Specs
option() :: {:circuit_backoff, timeout()} | {:dispatch_cooldown, pos_integer()} | {:get_dynamic_repo, nil | (() -> pid() | atom())} | {:log, false | Logger.level()} | {:name, name()} | {:node, binary()} | {:plugins, [module() | {module() | Keyword.t()}]} | {:prefix, binary()} | {:queues, [{queue_name(), pos_integer() | Keyword.t()}]} | {:repo, module()} | {:shutdown_grace_period, timeout()}
Specs
Specs
queue_option() :: {:queue, queue_name()} | {:limit, pos_integer()} | {:local_only, boolean()}
Specs
queue_state() :: %{ limit: pos_integer(), node: binary(), nonce: binary(), paused: boolean(), queue: queue_name(), running: [pos_integer()], started_at: DateTime.t() }
Link to this section Functions
Specs
cancel_job(name(), job_id :: pos_integer()) :: :ok
Cancel an available
, scheduled
or retryable
job and mark it as discarded
to prevent it
from running. If the job is currently executing
it will be killed and otherwise it is ignored.
If an executing job happens to fail before it can be cancelled the state is set to discarded
.
However, if it manages to complete successfully then the state will still be completed
.
Example
Cancel a scheduled job with the id 1
:
Oban.cancel_job(1)
:ok
Specs
check_queue(name(), opts :: [{:queue, queue_name()}]) :: queue_state()
Check the current state of a queue producer.
This allows you to introspect on a queue's health by retrieving key attributes of the producer's
state; values such as the current limit
, the running
job ids, and when the producer was
started.
Options
:queue
- a string or atom specifying the queue to check, required
Example
Oban.check_queue(queue: :default)
%{
limit: 10,
node: "me@local",
none: "a1b2c3d4",
paused: false,
queue: "default",
running: [100, 102],
started_at: ~D[2020-10-07 15:31:00]
}
Specs
child_spec([option()]) :: Supervisor.child_spec()
Returns a specification to start this module under a supervisor.
See Supervisor
.
Specs
config(name()) :: Oban.Config.t()
Retrieve the config struct for a named Oban supervision tree.
Specs
drain_queue(name(), [drain_option()]) :: drain_result()
Synchronously execute all available jobs in a queue.
All execution happens within the current process and it is guaranteed not to raise an error or exit.
Draining a queue from within the current process is especially useful for testing. Jobs that are
enqueued by a process when Ecto
is in sandbox mode are only visible to that process. Calling
drain_queue/2
allows you to control when the jobs are executed and to wait synchronously for
all jobs to complete.
Failures & Retries
Draining a queue uses the same execution mechanism as regular job dispatch. That means that any job failures or crashes are captured and result in a retry. Retries are scheduled in the future with backoff and won't be retried immediately.
By default jobs are executed in safe
mode, just as they are in production. Safe mode catches
any errors or exits and records the formatted error in the job's errors
array. That means
exceptions and crashes are not bubbled up to the calling process.
If you expect jobs to fail, would like to track failures, or need to check for specific errors
you can pass the with_safety: false
flag.
Scheduled Jobs
By default, drain_queue/2
will execute all currently available jobs. In order to execute
scheduled jobs, you may pass the :with_scheduled
flag which will cause scheduled jobs to be
marked as available
beforehand.
Options
:queue
- a string or atom specifying the queue to drain, required:with_scheduled
— whether to include any scheduled jobs when draining, defaultfalse
:with_safety
— whether to silently catch errors when draining, defaulttrue
Example
Drain a queue with three available jobs, two of which succeed and one of which fails:
Oban.drain_queue(queue: :default)
%{success: 2, failure: 1}
Drain a queue including any scheduled jobs:
Oban.drain_queue(queue: :default, with_scheduled: true)
%{success: 1, failure: 0}
Drain a queue and assert an error is raised:
assert_raise RuntimeError, fn -> Oban.drain_queue(queue: :risky, with_safety: false) end
Specs
insert(name(), job_changeset()) :: {:ok, Oban.Job.t()} | {:error, job_changeset()} | {:error, term()}
Insert a new job into the database for execution.
This and the other insert
variants are the recommended way to enqueue jobs because they
support features like unique jobs.
See the section on "Unique Jobs" for more details.
Example
Insert a single job:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}))
Insert a job while ensuring that it is unique within the past 30 seconds:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}, unique: [period: 30]))
insert(name \\ __MODULE__, multi, multi_name, changeset_or_fun)
View Source (since 0.7.0)Specs
insert( name(), multi :: Ecto.Multi.t(), multi_name :: Ecto.Multi.name(), changeset_or_fun :: job_changeset() | (... -> any()) ) :: Ecto.Multi.t()
Put a job insert operation into an Ecto.Multi
.
Like insert/2
, this variant is recommended over Ecto.Multi.insert
because it supports all of
Oban's features, i.e. unique jobs.
See the section on "Unique Jobs" for more details.
Example
Ecto.Multi.new()
|> Oban.insert("job-1", MyApp.Worker.new(%{id: 1}))
|> Oban.insert("job-2", fn _ -> MyApp.Worker.new(%{id: 2}) end)
|> MyApp.Repo.transaction()
Specs
insert!(name(), job_changeset()) :: Oban.Job.t()
Similar to insert/2
, but raises an Ecto.InvalidChangesetError
if the job can't be inserted.
Example
job = Oban.insert!(MyApp.Worker.new(%{id: 1}))
insert_all(name \\ __MODULE__, changesets_or_wrapper)
View Source (since 0.9.0)Specs
insert_all( name(), changesets_or_wrapper :: [job_changeset()] | %{changesets: [job_changeset()]} ) :: [Oban.Job.t()]
Insert multiple jobs into the database for execution.
Insertion respects prefix
and log
settings, but it does not use per-job unique
configuration. You must use insert/2,4
or insert!/2
for per-job unique support.
There are a few important differences between this function and Ecto.Repo.insert_all/3
:
- This function always returns a list rather than a tuple of
{count, records}
- This function requires a list of changesets rather than a list of maps or keyword lists
Example
1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all()
insert_all(name \\ __MODULE__, multi, multi_name, changesets_or_wrapper)
View Source (since 0.9.0)Specs
insert_all( name(), multi :: Ecto.Multi.t(), multi_name :: Ecto.Multi.name(), changesets_or_wrapper :: [job_changeset()] | %{changesets: [job_changeset()]} ) :: Ecto.Multi.t()
Put an insert_all
operation into an Ecto.Multi
.
This function supports the same features and has the same caveats as insert_all/2
.
Example
changesets = Enum.map(0..100, MyApp.Worker.new(%{id: &1}))
Ecto.Multi.new()
|> Oban.insert_all(:jobs, changesets)
|> MyApp.Repo.transaction()
Specs
pause_queue(name(), opts :: [queue_option()]) :: :ok
Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.
When shutdown begins all queues are paused.
Options
:queue
- a string or atom specifying the queue to pause, required:local_only
- whether the queue will be paused only on the local node, default:false
Example
Pause the default queue:
Oban.pause_queue(queue: :default)
:ok
Pause the default queue, but only on the local node:
Oban.pause_queue(queue: :default, local_only: true)
:ok
Specs
resume_queue(name(), opts :: [queue_option()]) :: :ok
Resume executing jobs in a paused queue.
Options
:queue
- a string or atom specifying the queue to resume, required:local_only
- whether the queue will be resumed only on the local node, default:false
Example
Resume a paused default queue:
Oban.resume_queue(:default)
:ok
Resume the default queue, but only on the local node:
Oban.resume_queue(queue: :default, local_only: true)
:ok
Specs
retry_job(name :: atom(), job_id :: pos_integer()) :: :ok
Sets a job as available
, adding attempts if already maxed out. If the job is currently
available
, executing
or scheduled
it will be ignored. The job is scheduled for immediate
execution.
Example
Retry a discarded job with the id 1
:
Oban.retry_job(1)
:ok
Specs
scale_queue(name(), opts :: [queue_option()]) :: :ok
Scale the concurrency for a queue.
Options
:queue
- a string or atom specifying the queue to scale, required:limit
— the new concurrency limit:local_only
— whether the queue will be scaled only on the local node, default:false
Example
Scale a queue up, triggering immediate execution of queued jobs:
Oban.scale_queue(queue: :default, limit: 50)
:ok
Scale the queue back down, allowing executing jobs to finish:
Oban.scale_queue(queue: :default, limit: 5)
:ok
Scale the queue only on the local node:
Oban.scale_queue(queue: :default, limit: 10, local_only: true)
:ok
Specs
start_link([option()]) :: Supervisor.on_start()
Starts an Oban
supervision tree linked to the current process.
Options
These options are required; without them the supervisor won't start
:name
— used for supervisor registration, defaults toOban
:repo
— specifies the Ecto repo used to insert and retrieve jobs
Primary Options
These options determine what the system does at a high level, i.e. which queues to run.
:node
— used to identify the node that the supervision tree is running in. If no value is provided it will use thenode
name in a distributed system, or thehostname
in an isolated node. See "Node Name" below.:plugins
— a list or modules or module/option tuples that are started as children of an Oban supervisor. Any supervisable module is a valid plugin, i.e. aGenServer
or anAgent
.:prefix
— the query prefix, or schema, to use for inserting and executing jobs. Anoban_jobs
table must exist within the prefix. See the "Prefix Support" section in the module documentation for more details.:queues
— a keyword list where the keys are queue names and the values are the concurrency setting or a keyword list of queue options. For example, setting queues to[default: 10, exports: 5]
would start the queuesdefault
andexports
with a combined concurrency level of 15. The concurrency setting specifies how many jobs each queue will run concurrently.Queues accept additional override options to customize their behavior, e.g. by setting the
poll_interval
and thedispatch_cooldown
for a specific queue. For testing purposes:queues
may be set tofalse
ornil
, which effectively disables all job dispatching.:log
— eitherfalse
to disable logging or a standard log level (:error
,:warn
,:info
,:debug
). This determines whether queries are logged or not; overriding the repo's configured log level. Defaults tofalse
, where no queries are logged.
Twiddly Options
Additional options used to tune system behaviour. These are primarily useful for testing or troubleshooting and don't usually need modification.
:circuit_backoff
— the number of milliseconds until queries are attempted after a database error. All processes communicating with the database are equipped with circuit breakers and will use this for the backoff. Defaults to30_000ms
.:dispatch_cooldown
— the minimum number of milliseconds a producer will wait before fetching and running more jobs. A slight cooldown period prevents a producer from flooding with messages and thrashing the database. The cooldown period directly impacts a producer's throughput: jobs per second for a single queue is calculated by(1000 / cooldown) * limit
. For example, with a5ms
cooldown and a queue limit of25
a single queue can run 2,500 jobs/sec.The default is
5ms
and the minimum is1ms
, which is likely faster than the database can return new jobs to run.:shutdown_grace_period
- the amount of time a queue will wait for executing jobs to complete before hard shutdown, specified in milliseconds. The default is15_000
, or 15 seconds.
Example
To start an Oban
supervisor within an application's supervision tree:
def start(_type, _args) do
children = [MyApp.Repo, {Oban, queues: [default: 50]}]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
Node Name
When the node
value hasn't been configured it is generated based on the environment:
- In a distributed system the node name is used
- In a Heroku environment the system environment's
DYNO
value is used - Otherwise, the system hostname is used
Specs
start_queue(name(), opts :: [queue_option()]) :: :ok
Start a new supervised queue.
By default this starts a new supervised queue across all nodes running Oban on the same database
and prefix. You can pass the option local_only: true
if you prefer to start the queue only on
the local node.
Options
:queue
- a string or atom specifying the queue to start, required:limit
- set the concurrency limit, required:local_only
- whether the queue will be started only on the local node, default:false
Example
Start the :priority
queue with a concurrency limit of 10 across the connected nodes.
Oban.start_queue(queue: :priority, limit: 10)
:ok
Start the :media
queue with a concurrency limit of 5 only on the local node.
Oban.start_queue(queue: :media, limit: 5, local_only: true)
:ok
Specs
stop_queue(name(), opts :: [queue_option()]) :: :ok
Shutdown a queue's supervision tree and stop running jobs for that queue.
By default this action will occur across all the running nodes. Still, if you prefer to stop the
queue's supervision tree and stop running jobs for that queue only on the local node, you can
pass the option: local_only: true
The shutdown process pauses the queue first and allows current jobs to exit gracefully, provided they finish within the shutdown limit.
Options
:queue
- a string or atom specifying the queue to stop, required:local_only
- whether the queue will be stopped only on the local node, default:false
Example
Oban.stop_queue(queue: :default)
:ok
Oban.stop_queue(queue: :media, local_only: true)
:ok
Specs
Returns the pid of the root oban process for the given name.
Example
Find the default instance:
Oban.whereis(Oban)
Find a dynamically named instance:
Oban.whereis({:oban, 1})