View Source Oban (Oban v2.16.3)
Oban is a robust job processing library which uses PostgreSQL or SQLite3 for storage and coordination.
Installation
See the installation guide for details on installing and configuring Oban in your application.
Requirements
Oban requires Elixir 1.12+, Erlang 22+, and PostgreSQL 12.0+ or SQLite3 3.37.0+.
Testing
Find testing setup, helpers, and strategies in the testing guide.
Oban Web+Pro
A web dashboard for managing Oban, along with an official set of extensions, plugins, and workers are available as licensed packages:
Learn more about prices and licensing Oban Web+Pro at getoban.pro.
Running with SQLite3
Oban ships with engines for PostgreSQL and SQLite3. Both engines support the same core functionality for a single node, while the Postgres engine is more advanced and designed to run in a distributed environment.
Running with SQLite3 requires adding ecto_sqlite3
to your app's dependencies
and setting the Oban.Engines.Lite
engine:
config :my_app, Oban,
engine: Oban.Engines.Lite,
queues: [default: 10],
repo: MyApp.Repo
Please note that SQLite3 may not be suitable for high-concurrency systems or for systems that need to handle large amounts of data. If you expect your background jobs to generate high loads, it would be better to use a more robust database solution that supports horizontal scalability, like Postgres.
Configuring Queues
Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:
config :my_app, Oban,
queues: [default: 10, mailers: 20, events: 50, media: 5],
repo: MyApp.Repo
You may also use an expanded form to configure queues with individual overrides:
queues: [
default: 10,
events: [limit: 50, paused: true]
]
The events
queue will now start in a paused state, which means it won't
process anything until Oban.resume_queue/2
is called to start it.
There isn't a limit to the number of queues or how many jobs may execute concurrently in each queue. Some additional guidelines:
Caveats & Guidelines
Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.
Queue limits are local (per-node), not global (per-cluster). For example, running a queue with a local limit of one on three separate nodes is effectively a global limit of three. If you require a global limit you must restrict the number of nodes running a particular queue.
Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.
Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don't apply when using ports or shelling out commands.
Defining Workers
Worker modules do the work of processing a job. At a minimum they must define a
perform/1
function, which is called with an %Oban.Job{}
struct.
Note that the args
field of the job struct will always have string keys,
regardless of the key type when the job was enqueued. The args
are stored as
json
and the serialization process automatically stringifies all keys.
Define a worker to process jobs in the events
queue:
defmodule MyApp.Business do
use Oban.Worker, queue: :events
@impl Oban.Worker
def perform(%Oban.Job{args: %{"id" => id} = args}) do
model = MyApp.Repo.get(MyApp.Business.Man, id)
case args do
%{"in_the" => "business"} ->
IO.inspect(model)
%{"vote_for" => vote} ->
IO.inspect([vote, model])
_ ->
IO.inspect(model)
end
:ok
end
end
The use
macro also accepts options to customize max_attempts
, priority
, tags
, and unique
options:
defmodule MyApp.LazyBusiness do
use Oban.Worker,
queue: :events,
priority: 3,
max_attempts: 3,
tags: ["business"],
unique: [period: 30]
@impl Oban.Worker
def perform(_job) do
# do business slowly
:ok
end
end
Like all use
macros, options are defined at compile time. Avoid using Application.get_env/2
to
define worker options. Instead, pass dynamic options at runtime by passing them to
MyWorker.new/2
:
MyApp.MyWorker.new(args, queue: dynamic_queue)
Successful jobs should return :ok
or an {:ok, value}
tuple. The value returned from
perform/1
is used to control whether the job is treated as a success, a failure, cancelled or
deferred for retrying later.
See the Oban.Worker
docs for more details on failure conditions and Oban.Telemetry
for details
on job reporting.
Enqueueing Jobs
Jobs are simply Ecto structs and are enqueued by inserting them into the
database. For convenience and consistency all workers provide a new/2
function that converts an args map into a job changeset suitable for insertion:
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
The worker's defaults may be overridden by passing options:
%{id: 1, vote_for: "none of the above"}
|> MyApp.Business.new(queue: :special, max_attempts: 5)
|> Oban.insert()
Unique jobs can be configured in the worker, or when the job is built:
%{email: "brewster@example.com"}
|> MyApp.Mailer.new(unique: [period: 300, fields: [:queue, :worker])
|> Oban.insert()
Job priority can be specified using an integer from 0 to 3, with 0 being the default and highest priority:
%{id: 1}
|> MyApp.Backfiller.new(priority: 2)
|> Oban.insert()
Any number of tags can be added to a job dynamically, at the time it is inserted:
id = 1
%{id: id}
|> MyApp.OnboardMailer.new(tags: ["mailer", "record-#{id}"])
|> Oban.insert()
Multiple jobs can be inserted in a single transaction:
Ecto.Multi.new()
|> Oban.insert(:b_job, MyApp.Business.new(%{id: 1}))
|> Oban.insert(:m_job, MyApp.Mailer.new(%{email: "brewser@example.com"}))
|> Repo.transaction()
Occasionally you may need to insert a job for a worker that exists in another
application. In that case you can use Oban.Job.new/2
to build the changeset
manually:
%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: OtherApp.Worker)
|> Oban.insert()
Oban.insert/2,4
is the preferred way of inserting jobs as it provides some of
Oban's advanced features (i.e., unique jobs). However, you can use your
application's Repo.insert/2
function if necessary.
See Oban.Job.new/2
for a full list of job options.
Scheduling Jobs
Jobs may be scheduled down to the second any time in the future:
%{id: 1}
|> MyApp.Business.new(schedule_in: 5)
|> Oban.insert()
Jobs may also be scheduled at a specific datetime in the future:
%{id: 1}
|> MyApp.Business.new(scheduled_at: ~U[2020-12-25 19:00:56.0Z])
|> Oban.insert()
Scheduling is always in UTC. You'll have to shift timestamps in other zones to UTC before scheduling:
%{id: 1}
|> MyApp.Business.new(scheduled_at: DateTime.shift_zone!(datetime, "Etc/UTC"))
|> Oban.insert()
Caveats & Guidelines
Usually, scheduled job management operates in global
mode and notifies queues
of available jobs via PubSub to minimize database load. However, when PubSub
isn't available, staging switches to a local
mode where each queue polls
independently.
Local mode is less efficient and will only happen if you're running in an
environment where neither Postgres
nor PG
notifications work. That situation
should be rare and limited to the following conditions:
- Running with a connection pooler, i.e.,
pg_bouncer
, in transaction mode. - Running without clustering, i.e., without Distributed Erlang
If both of those criteria apply and PubSub notifications won't work, then
staging will switch to polling in local
mode.
Prioritizing Jobs
Normally, all available jobs within a queue are executed in the order they were
scheduled. You can override the normal behavior and prioritize or de-prioritize
a job by assigning a numerical priority
.
Priorities from 0-3 are allowed, where 0 is the highest priority and 3 is the lowest.
The default priority is 0, unless specified all jobs have an equally high priority.
All jobs with a higher priority will execute before any jobs with a lower priority. Within a particular priority jobs are executed in their scheduled order.
Caveats & Guidelines
The default priority is defined in the jobs table. The least intrusive way to change it for all jobs is to change the column default:
alter table("oban_jobs") do
modify :priority, :integer, default: 1, from: {:integer, default: 0}
end
Unique Jobs
The unique jobs feature lets you specify constraints to prevent enqueueing duplicate jobs.
Uniqueness is based on a combination of args
, queue
, worker
, state
and insertion time. It
is configured at the worker or job level using the following options:
:period
— The number of seconds until a job is no longer considered duplicate. You should always specify a period, otherwise Oban will default to 60 seconds.:infinity
can be used to indicate the job be considered a duplicate as long as jobs are retained.:fields
— The fields to compare when evaluating uniqueness. The available fields are:args
,:queue
,:worker
, and:meta
. By default, fields is set to[:worker, :queue, :args]
.:keys
— A specific subset of the:args
or:meta
to consider when comparing against historic jobs. This allows a job with multiple key/value pairs in the args to be compared using only a subset of them.:states
— The job states that are checked for duplicates. The available states are:available
,:scheduled
,:executing
,:retryable
,:completed
,:cancelled
and:discarded
. By default all states except for:discarded
and:cancelled
are checked, which prevents duplicates even if the previous job has been completed.:timestamp
— Which timestamp to check the period against. The available timestamps are:inserted_at
or:scheduled_at
, and it defaults to:inserted_at
for legacy reasons.
For example, configure a worker to be unique across all fields and states for 60 seconds:
use Oban.Worker, unique: [period: 60]
Configure the worker to be unique only by :worker
and :queue
:
use Oban.Worker, unique: [fields: [:queue, :worker], period: 60]
Check the :scheduled_at
timestamp instead of :inserted_at
for uniqueness:
use Oban.Worker, unique: [period: 120, timestamp: :scheduled_at]
Or, configure a worker to be unique until it has executed:
use Oban.Worker, unique: [period: 300, states: [:available, :scheduled, :executing]]
Only consider the :url
key rather than the entire args
:
use Oban.Worker, unique: [fields: [:args, :worker], keys: [:url]]
You can use Oban.Job.states/0
to specify uniqueness across all states,
including :discarded
:
use Oban.Worker, unique: [period: 300, states: Oban.Job.states()]
Detecting Unique Conflicts
When unique settings match an existing job, the return value of Oban.insert/2
is still {:ok, job}
. However, you can detect a unique conflict by checking the
jobs' :conflict?
field. If there was an existing job, the field is true
;
otherwise it is false
.
You can use the :conflict?
field to customize responses after insert:
with {:ok, %Job{conflict?: true}} <- Oban.insert(changeset) do
{:error, :job_already_exists}
end
Note that conflicts are only detected for jobs enqueued through Oban.insert/2,3
.
Jobs enqueued through Oban.insert_all/2
do not use per-job unique
configuration.
Replacing Values
In addition to detecting unique conflicts, passing options to replace
can
update any job field when there is a conflict. Any of the following fields can
be replaced per state: args
, max_attempts
, meta
, priority
, queue
,
scheduled_at
, tags
, worker
.
For example, to change the priority
and increase max_attempts
when there is
a conflict with a job in a scheduled
state:
BusinessWorker.new(
args,
max_attempts: 5,
priority: 0,
replace: [scheduled: [:max_attempts, :priority]]
)
Another example is bumping the scheduled time on conflict. Either scheduled_at
or schedule_in
values will work, but the replace option is always
scheduled_at
.
UrgentWorker.new(args, schedule_in: 1, replace: [scheduled: [:scheduled_at]])
NOTE: If you use this feature to replace a field (e.g. args
) in the
executing
state by doing something like: UniqueWorker.new(new_args, replace: [executing: [:args]])
Oban will update the args
, but the job will continue
executing with the original value.
Strong Guarantees
Unique jobs are guaranteed through transactional locks and database queries: they do not rely on unique constraints in the database. This makes uniqueness entirely configurable by application code, without the need for database migrations.
Pruning Historic Jobs
Job stats and queue introspection are built on keeping job rows in the database
after they have completed. This allows administrators to review completed jobs
and build informative aggregates, at the expense of storage and an unbounded
table size. To prevent the oban_jobs
table from growing indefinitely, Oban
provides active pruning of completed
, cancelled
and discarded
jobs.
By default, the Pruner
plugin retains jobs for 60 seconds. You can configure a
longer retention period by providing a max_age
in seconds to the Pruner
plugin.
# Set the max_age for 5 minutes
config :my_app, Oban,
plugins: [{Oban.Plugins.Pruner, max_age: 300}]
...
Caveats & Guidelines
Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified age may not be pruned immediately after jobs complete.
Pruning is only applied to jobs that are
completed
,cancelled
ordiscarded
. It'll never delete a new job, a scheduled job or a job that failed and will be retried.
Periodic Jobs
Oban's Cron
plugin registers workers a cron-like schedule and enqueues jobs
automatically. Periodic jobs are declared as a list of {cron, worker}
or
{cron, worker, options}
tuples:
config :my_app, Oban,
repo: MyApp.Repo,
plugins: [
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", MyApp.MinuteWorker},
{"0 * * * *", MyApp.HourlyWorker, args: %{custom: "arg"}},
{"0 0 * * *", MyApp.DailyWorker, max_attempts: 1},
{"0 12 * * MON", MyApp.MondayWorker, queue: :scheduled, tags: ["mondays"]},
{"@daily", MyApp.AnotherDailyWorker}
]}
]
The crontab would insert jobs as follows:
MyApp.MinuteWorker
— Inserted once every minuteMyApp.HourlyWorker
— Inserted at the first minute of every hour with custom argsMyApp.DailyWorker
— Inserted at midnight every day with no retriesMyApp.MondayWorker
— Inserted at noon every Monday in the "scheduled" queueMyApp.AnotherDailyWorker
— Inserted at midnight every day with no retries
The crontab format respects all standard rules and has one minute resolution. Jobs are considered unique for most of each minute, which prevents duplicate jobs with multiple nodes and across node restarts.
Like other jobs, recurring jobs will use the :queue
specified by the worker
module (or :default
if one is not specified).
Cron Expressions
Standard Cron expressions are composed of rules specifying the minutes, hours, days, months and weekdays. Rules for each field are comprised of literal values, wildcards, step values or ranges:
*
— Wildcard, matches any value (0, 1, 2, ...)0
— Literal, matches only itself (only 0)*/15
— Step, matches any value that is a multiple (0, 15, 30, 45)0-5
— Range, matches any value within the range (0, 1, 2, 3, 4, 5)0-9/2
- Step values can be used in conjunction with ranges (0, 2, 4, 6, 8)
Each part may have multiple rules, where rules are separated by a comma. The allowed values for each field are as follows:
minute
— 0-59hour
— 0-23days
— 1-31month
— 1-12 (or aliases,JAN
,FEB
,MAR
, etc.)weekdays
— 0-6 (or aliases,SUN
,MON
,TUE
, etc.)
The following Cron extensions are supported:
@hourly
—0 * * * *
@daily
(as well as@midnight
) —0 0 * * *
@weekly
—0 0 * * 0
@monthly
—0 0 1 * *
@yearly
(as well as@annually
) —0 0 1 1 *
@reboot
— Run once at boot across the entire cluster
Some specific examples that demonstrate the full range of expressions:
0 * * * *
— The first minute of every hour*/15 9-17 * * *
— Every fifteen minutes during standard business hours0 0 * DEC *
— Once a day at midnight during December0 7-9,4-6 13 * FRI
— Once an hour during both rush hours on Friday the 13th
For more in depth information see the man documentation for cron
and crontab
in your system. Alternatively you can experiment with various expressions
online at Crontab Guru.
Caveats & Guidelines
All schedules are evaluated as UTC unless a different timezone is provided. See
Oban.Plugins.Cron
for information about configuring a timezone.Workers can be used for regular and scheduled jobs so long as they accept different arguments.
Duplicate jobs are prevented through transactional locks and unique constraints. Workers that are used for regular and scheduled jobs must not specify
unique
options less than60s
.Long running jobs may execute simultaneously if the scheduling interval is shorter than it takes to execute the job. You can prevent overlap by passing custom
unique
opts in the crontab config:custom_args = %{scheduled: true} unique_opts = [ period: 60 * 60 * 24, states: [:available, :scheduled, :executing] ] config :my_app, Oban, repo: MyApp.Repo, plugins: [ {Oban.Plugins.Cron, crontab: [ {"* * * * *", MyApp.SlowWorker, args: custom_args, unique: unique_opts} ]} ]
Error Handling
When a job returns an error value, raises an error, or exits during execution the
details are recorded within the errors
array on the job. When the number of
execution attempts is below the configured max_attempts
limit, the job will
automatically be retried in the future.
The retry delay has an exponential backoff, meaning the job's second attempt will be after 16s, third after 31s, fourth after 1m 36s, etc.
See the Oban.Worker
documentation on "Customizing Backoff" for alternative
backoff strategies.
Error Details
Execution errors are stored as a formatted exception along with metadata about when the failure occurred and which attempt caused it. Each error is stored with the following keys:
at
The UTC timestamp when the error occurred atattempt
The attempt number when the error occurrederror
A formatted error message and stacktrace
See the Instrumentation docs for an example of integrating with external error reporting systems.
Limiting Retries
By default, jobs are retried up to 20 times. The number of retries is controlled
by the max_attempts
value, which can be set at the Worker or Job level. For
example, to instruct a worker to discard jobs after three failures:
use Oban.Worker, queue: :limited, max_attempts: 3
Limiting Execution Time
By default, individual jobs may execute indefinitely. If this is undesirable you
may define a timeout in milliseconds with the timeout/1
callback on your
worker module.
For example, to limit a worker's execution time to 30 seconds:
def MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(_job) do
something_that_may_take_a_long_time()
:ok
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(30)
end
The timeout/1
function accepts an Oban.Job
struct, so you can customize the
timeout using any job attributes.
Define the timeout
value through job args:
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
Define the timeout
based on the number of attempts:
def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)
Instrumentation, Error Reporting, and Logging
Oban provides integration with Telemetry, a dispatching library for
metrics. It is easy to report Oban metrics to any backend by attaching to
:oban
events.
Here is an example of a sample unstructured log handler:
defmodule MyApp.ObanLogger do
require Logger
def handle_event([:oban, :job, :start], measure, meta, _) do
Logger.warning("[Oban] :started #{meta.worker} at #{measure.system_time}")
end
def handle_event([:oban, :job, event], measure, meta, _) do
Logger.warning("[Oban] #{event} #{meta.worker} ran in #{measure.duration}")
end
end
Attach the handler to success and failure events in application.ex
:
events = [[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]]
:telemetry.attach_many("oban-logger", events, &MyApp.ObanLogger.handle_event/4, [])
The Oban.Telemetry
module provides a robust structured logger that handles all
of Oban's telemetry events. As in the example above, attach it within your
application.ex
module:
:ok = Oban.Telemetry.attach_default_logger()
For more details on the default structured logger and information on event
metadata see docs for the Oban.Telemetry
module.
Reporting Errors
Another great use of execution data is error reporting. Here is an example of integrating with Sentry to report job failures:
defmodule MyApp.ErrorReporter do
def attach do
:telemetry.attach(
"oban-errors",
[:oban, :job, :exception],
&__MODULE__.handle_event/4,
[]
)
end
def handle_event([:oban, :job, :exception], measure, meta, _) do
extra =
meta.job
|> Map.take([:id, :args, :meta, :queue, :worker])
|> Map.merge(measure)
Sentry.capture_exception(meta.reason, stacktrace: meta.stacktrace, extra: extra)
end
end
MyApp.ErrorReporter.attach()
You can use exception events to send error reports to Honeybadger, Rollbar, AppSignal or any other application monitoring platform.
Instance and Database Isolation
You can run multiple Oban instances with different prefixes on the same system and have them entirely isolated, provided you give each supervisor a distinct id.
Here we configure our application to start three Oban supervisors using the "public", "special" and "private" prefixes, respectively:
def start(_type, _args) do
children = [
Repo,
Endpoint,
{Oban, name: ObanA, repo: Repo},
{Oban, name: ObanB, repo: Repo, prefix: "special"},
{Oban, name: ObanC, repo: Repo, prefix: "private"}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
Umbrella Apps
If you need to run Oban from an umbrella application where more than one of
the child apps need to interact with Oban, you may need to set the :name
for
each child application that configures Oban.
For example, your umbrella contains two apps: MyAppA
and MyAppB
. MyAppA
is
responsible for inserting jobs, while only MyAppB
actually runs any queues.
Configure Oban with a custom name for MyAppA
:
config :my_app_a, Oban,
name: MyAppA.Oban,
repo: MyApp.Repo
Then configure Oban for MyAppB
with a different name:
config :my_app_b, Oban,
name: MyAppB.Oban,
repo: MyApp.Repo,
queues: [default: 10]
Now, use the configured name when calling functions like Oban.insert/2
,
Oban.insert_all/2
, Oban.drain_queue/2
, etc., to reference the correct Oban
process for the current application.
Oban.insert(MyAppA.Oban, MyWorker.new(%{}))
Oban.insert_all(MyAppB.Oban, multi, :multiname, [MyWorker.new(%{})])
Oban.drain_queue(MyAppB.Oban, queue: :default)
Database Prefixes
Oban supports namespacing through PostgreSQL schemas, also called "prefixes" in Ecto. With prefixes your jobs table can reside outside of your primary schema (usually public) and you can have multiple separate job tables.
To use a prefix you first have to specify it within your migration:
defmodule MyApp.Repo.Migrations.AddPrefixedObanJobsTable do
use Ecto.Migration
def up do
Oban.Migrations.up(prefix: "private")
end
def down do
Oban.Migrations.down(prefix: "private")
end
end
The migration will create the "private" schema and all tables, functions and triggers within that schema. With the database migrated you'll then specify the prefix in your configuration:
config :my_app, Oban,
prefix: "private",
repo: MyApp.Repo,
queues: [default: 10]
Now all jobs are inserted and executed using the private.oban_jobs
table. Note
that Oban.insert/2,4
will write jobs in the private.oban_jobs
table, you'll
need to specify a prefix manually if you insert jobs directly through a repo.
Not only is the oban_jobs
table isolated within the schema, but all
notification events are also isolated. That means that insert/update events will
only dispatch new jobs for their prefix.
Dynamic Repositories
Oban supports Ecto dynamic repositories through the
:get_dynamic_repo
option. To make this work, you need to run a separate Oban
instance per each dynamic repo instance. Most often it's worth bundling each
Oban and repo instance under the same supervisor:
def start_repo_and_oban(instance_id) do
children = [
{MyDynamicRepo, name: nil, url: repo_url(instance_id)},
{Oban, name: instance_id, get_dynamic_repo: fn -> repo_pid(instance_id) end}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
The function repo_pid/1
must return the pid of the repo for the given
instance. You can use Registry
to register the repo (for example in the repo's
init/2
callback) and discover it.
If your application exclusively uses dynamic repositories and doesn't specify
all credentials upfront, you must implement an init/1
callback in your Ecto
Repo. Doing so provides the Postgres notifier with the correct credentials on
init, allowing jobs to process as expected.
Ecto Multi-tenancy
If you followed the Ecto guide on setting up multi-tenancy with foreign keys, you need to add an
exception for queries originating from Oban. All of Oban's queries have the custom option oban: true
to help you identify them in prepare_query/3
or other instrumentation:
# Sample code, only relevant if you followed the Ecto guide on multi tenancy with foreign keys.
defmodule MyApp.Repo do
use Ecto.Repo, otp_app: :my_app
require Ecto.Query
@impl true
def prepare_query(_operation, query, opts) do
cond do
opts[:skip_org_id] || opts[:schema_migration] || opts[:oban] ->
{query, opts}
org_id = opts[:org_id] ->
{Ecto.Query.where(query, org_id: ^org_id), opts}
true ->
raise "expected org_id or skip_org_id to be set"
end
end
end
Summary
Types
The name of an Oban instance. This is used to identify instances in the internal registry for configuration lookup.
Functions
Creates a facade for Oban
functions and automates fetching configuration from the application
environment.
Cancel many jobs based on a queryable and mark them as cancelled
to prevent them from running.
Any currently executing
jobs are killed while the others are ignored.
Cancel an executing
, available
, scheduled
or retryable
job and mark it as cancelled
to
prevent it from running. If the job is currently executing
it will be killed and otherwise it
is ignored.
Check the current state of a queue producer.
Retrieve the Oban.Config
struct for a named Oban supervision tree.
Synchronously execute all available jobs in a queue.
Insert a new job into the database for execution.
Put a job insert operation into an Ecto.Multi
.
Similar to insert/3
, but raises an Ecto.InvalidChangesetError
if the job can't be inserted.
Insert multiple jobs into the database for execution.
Put an insert_all
operation into an Ecto.Multi
.
Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.
Resume executing jobs in a paused queue.
Retries all jobs that match on the given queryable.
Sets a job as available
, adding attempts if already maxed out. Jobs currently available
,
executing
or scheduled
are ignored. The job is scheduled for immediate execution.
Scale the concurrency for a queue.
Starts an Oban
supervision tree linked to the current process.
Start a new supervised queue.
Shutdown a queue's supervision tree and stop running jobs for that queue.
Returns the pid of the root Oban process for the given name.
Types
@type changeset_or_fun() :: Oban.Job.changeset() | Oban.Job.changeset_fun()
@type changeset_wrapper() :: %{ :changesets => Oban.Job.changeset_list(), optional(atom()) => term() }
@type changesets_or_wrapper() :: Oban.Job.changeset_list() | changeset_wrapper()
@type changesets_or_wrapper_or_fun() :: changesets_or_wrapper() | Oban.Job.changeset_list_fun()
@type drain_option() :: {:queue, queue_name()} | {:with_limit, pos_integer()} | {:with_recursion, boolean()} | {:with_safety, boolean()} | {:with_scheduled, boolean() | DateTime.t()}
@type drain_result() :: %{ cancelled: non_neg_integer(), discard: non_neg_integer(), failure: non_neg_integer(), snoozed: non_neg_integer(), success: non_neg_integer() }
@type multi() :: Ecto.Multi.t()
@type multi_name() :: Ecto.Multi.name()
@type name() :: term()
The name of an Oban instance. This is used to identify instances in the internal registry for configuration lookup.
@type option() :: {:dispatch_cooldown, pos_integer()} | {:engine, module()} | {:get_dynamic_repo, nil | (-> pid() | atom())} | {:log, false | Logger.level()} | {:name, name()} | {:node, String.t()} | {:notifier, module()} | {:peer, false | module()} | {:plugins, false | [module() | {module() | Keyword.t()}]} | {:prefix, String.t()} | {:queues, false | [{queue_name(), pos_integer() | Keyword.t()}]} | {:repo, module()} | {:shutdown_grace_period, timeout()} | {:stage_interval, timeout()} | {:testing, :disabled | :inline | :manual}
@type queue_option() :: {:queue, queue_name()} | {:limit, pos_integer()} | {:local_only, boolean()} | {:node, String.t()}
@type queue_state() :: %{ :limit => pos_integer(), :node => binary(), :paused => boolean(), :queue => queue_name(), :running => [pos_integer()], :started_at => DateTime.t(), :updated_at => DateTime.t(), optional(atom()) => any() }
Functions
Creates a facade for Oban
functions and automates fetching configuration from the application
environment.
Facade modules support configuration via the application environment under an OTP application key. For example, the facade:
defmodule MyApp.Oban do
use Oban, otp_app: MyApp
end
Could be configured with:
config :my_app, Oban, repo: MyApp.Repo
Then you can include MyApp.Oban
in your application's supervision tree without passing extra
options:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
MyApp.Repo,
MyApp.Oban
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Calling Functions
Facade modules allow you to call Oban
functions on instances with custom names, e.g. not
Oban
, without passing a Oban.name/0
as the first argument.
For example, rather than calling Oban.config/1
you'd call MyOban.config/0
:
MyOban.config()
It also makes piping into Oban functions far more convenient:
%{some: :args}
|> MyWorker.new()
|> MyOban.insert()
Merging Configuration
All configuration can be provided through the use
macro or application config, and options
from the application supersedes those passed through use
. Configuration is prioritized in
order:
- Options passed through
use
- Options pulled from the OTP app via
Application.get_env/3
- Options passed through a child spec in the supervisor
@spec cancel_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
Cancel many jobs based on a queryable and mark them as cancelled
to prevent them from running.
Any currently executing
jobs are killed while the others are ignored.
If executing jobs happen to fail before cancellation then the state is set to cancelled
.
However, any that complete successfully will remain completed
.
Only jobs with the statuses executing
, available
, scheduled
, or retryable
can be cancelled.
Example
Cancel all jobs:
Oban.cancel_all_jobs(Oban.Job)
{:ok, 9}
Cancel all jobs for a specific worker:
Oban.Job
|> Ecto.Query.where(worker: "MyApp.MyWorker")
|> Oban.cancel_all_jobs()
{:ok, 2}
@spec cancel_job(name(), job_or_id :: Oban.Job.t() | integer()) :: :ok
Cancel an executing
, available
, scheduled
or retryable
job and mark it as cancelled
to
prevent it from running. If the job is currently executing
it will be killed and otherwise it
is ignored.
If an executing job happens to fail before it can be cancelled the state is set to cancelled
.
However, if it manages to complete successfully then the state will still be completed
.
Example
Cancel a job:
Oban.cancel_job(job)
:ok
@spec check_queue(name(), opts :: [{:queue, queue_name()}]) :: queue_state()
Check the current state of a queue producer.
This allows you to introspect on a queue's health by retrieving key attributes of the producer's
state; values such as the current limit
, the running
job ids, and when the producer was
started.
Options
:queue
- a string or atom specifying the queue to check, required
Example
Oban.check_queue(queue: :default)
%{
limit: 10,
node: "me@local",
paused: false,
queue: "default",
running: [100, 102],
started_at: ~D[2020-10-07 15:31:00],
updated_at: ~D[2020-10-07 15:31:00]
}
@spec config(name()) :: Oban.Config.t()
Retrieve the Oban.Config
struct for a named Oban supervision tree.
Example
Retrieve the default Oban
instance config:
%Oban.Config{} = Oban.config()
Retrieve the config for an instance started with a custom name:
%Oban.Config{} = Oban.config(MyCustomOban)
@spec drain_queue(name(), [drain_option()]) :: drain_result()
Synchronously execute all available jobs in a queue.
All execution happens within the current process and it is guaranteed not to raise an error or exit.
Draining a queue from within the current process is especially useful for testing. Jobs that are
enqueued by a process when Ecto
is in sandbox mode are only visible to that process. Calling
drain_queue/2
allows you to control when the jobs are executed and to wait synchronously for
all jobs to complete.
Failures & Retries
Draining a queue uses the same execution mechanism as regular job dispatch. That means that any job failures or crashes are captured and result in a retry. Retries are scheduled in the future with backoff and won't be retried immediately.
By default jobs are executed in safe
mode, just as they are in production. Safe mode catches
any errors or exits and records the formatted error in the job's errors
array. That means
exceptions and crashes are not bubbled up to the calling process.
If you expect jobs to fail, would like to track failures, or need to check for specific errors
you can pass the with_safety: false
flag. See the "Options" section below for more details.
Scheduled Jobs
By default, drain_queue/2
will execute all currently available jobs. In order to execute
scheduled jobs, you may pass the with_scheduled: true
which will cause all scheduled jobs to
be marked as available
beforehand. To run jobs scheduled up to a specific point in time, pass
a DateTime
instead.
Options
:queue
- a string or atom specifying the queue to drain, required:with_limit
— the maximum number of jobs to drain at once. When recursion is enabled this is how many jobs are processed per-iteration.:with_recursion
— whether to keep draining a queue repeatedly when jobs insert more jobs:with_safety
— whether to silently catch errors when draining, defaults totrue
. Whenfalse
, raised exceptions or unhandled exits are reraised (unhandled exits are wrapped inOban.CrashError
).:with_scheduled
— whether to include any scheduled jobs when draining, defaultfalse
. Whentrue
, drains all scheduled jobs. When aDateTime
is provided, drains all jobs scheduled up to, and including, that point in time.
Example
Drain a queue with three available jobs, two of which succeed and one of which fails:
Oban.drain_queue(queue: :default)
%{failure: 1, snoozed: 0, success: 2}
Drain a queue including any scheduled jobs:
Oban.drain_queue(queue: :default, with_scheduled: true)
%{failure: 0, snoozed: 0, success: 1}
Drain a queue including jobs scheduled up to a minute:
Oban.drain_queue(queue: :default, with_scheduled: DateTime.add(DateTime.utc_now(), 60, :second))
Drain a queue and assert an error is raised:
assert_raise RuntimeError, fn -> Oban.drain_queue(queue: :risky, with_safety: false) end
Drain a queue repeatedly until there aren't any more jobs to run. This is particularly useful for testing jobs that enqueue other jobs:
Oban.drain_queue(queue: :default, with_recursion: true)
%{failure: 1, snoozed: 0, success: 2}
Drain only the top (by scheduled time and priority) five jobs off a queue:
Oban.drain_queue(queue: :default, with_limit: 5)
%{failure: 0, snoozed: 0, success: 1}
Drain a queue recursively, only one job at a time:
Oban.drain_queue(queue: :default, with_limit: 1, with_recursion: true)
%{failure: 0, snoozed: 0, success: 3}
@spec insert(name(), Oban.Job.changeset(), Keyword.t()) :: {:ok, Oban.Job.t()} | {:error, Oban.Job.changeset() | term()}
@spec insert(multi(), multi_name(), changeset_or_fun()) :: multi()
Insert a new job into the database for execution.
This and the other insert
variants are the recommended way to enqueue jobs because they
support features like unique jobs.
See the section on "Unique Jobs" for more details.
Example
Insert a single job:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}))
Insert a job while ensuring that it is unique within the past 30 seconds:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}, unique: [period: 30]))
Insert a job using a custom timeout:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}), timeout: 10_000)
Insert a job using an alternative instance name:
{:ok, job} = Oban.insert(MyOban, MyApp.Worker.new(%{id: 1}))
@spec insert(name(), multi(), multi_name(), changeset_or_fun(), Keyword.t()) :: multi()
Put a job insert operation into an Ecto.Multi
.
Like insert/2
, this variant is recommended over Ecto.Multi.insert
because it supports all of
Oban's features, i.e. unique jobs.
See the section on "Unique Jobs" for more details.
Example
Ecto.Multi.new()
|> Oban.insert("job-1", MyApp.Worker.new(%{id: 1}))
|> Oban.insert("job-2", fn _ -> MyApp.Worker.new(%{id: 2}) end)
|> MyApp.Repo.transaction()
@spec insert!(name(), Oban.Job.changeset(), opts :: Keyword.t()) :: Oban.Job.t()
Similar to insert/3
, but raises an Ecto.InvalidChangesetError
if the job can't be inserted.
Example
Insert a single job:
job = Oban.insert!(MyApp.Worker.new(%{id: 1}))
Insert a job using a custom timeout:
job = Oban.insert!(MyApp.Worker.new(%{id: 1}), timeout: 10_000)
Insert a job using an alternative instance name:
job = Oban.insert!(MyOban, MyApp.Worker.new(%{id: 1}))
insert_all(name \\ __MODULE__, changesets, opts \\ [])
View Source (since 0.9.0)@spec insert_all( name() | multi(), changesets_or_wrapper() | multi_name(), Keyword.t() | changesets_or_wrapper_or_fun() ) :: [Oban.Job.t()] | multi()
Insert multiple jobs into the database for execution.
There are a few important differences between this function and Ecto.Repo.insert_all/3
:
- This function always returns a list rather than a tuple of
{count, records}
- This function accepts a list of changesets rather than a list of maps or keyword lists
Error Handling and Rollbacks
If insert_all
encounters an issue, the function will raise an error based on your database
adapter. This behavior is valuable in conjunction with Ecto.Repo.transaction/2
because it
allows for rollbacks.
For example, an invalid changeset raises:
* (Ecto.InvalidChangesetError) could not perform insert because changeset is invalid.
🌟 Unique Jobs and Batching
Only the Smart Engine in Oban Pro supports bulk unique jobs and automatic batching. With the basic engine, you must use
insert/3
for unique support.
Options
Accepts any of Ecto's "Shared Options" such as timeout
and log
.
Example
Insert 100 jobs with a single operation:
1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all()
Insert with a custom timeout:
1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all(timeout: 10_000)
Insert with an alternative instance name:
changesets = Enum.map(1..100, &MyApp.Worker.new(%{id: &1}))
jobs = Oban.insert_all(MyOban, changesets)
insert_all(name, multi, multi_name, changesets, opts)
View Source (since 0.9.0)@spec insert_all( name(), multi(), multi_name(), changesets_or_wrapper_or_fun(), Keyword.t() ) :: multi()
Put an insert_all
operation into an Ecto.Multi
.
This function supports the same features and has the same caveats as insert_all/2
.
Example
Insert job changesets within a multi:
changesets = Enum.map(0..100, &MyApp.Worker.new(%{id: &1}))
Ecto.Multi.new()
|> Oban.insert_all(:jobs, changesets)
|> MyApp.Repo.transaction()
Insert job changesets using a function:
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, user_changeset)
|> Oban.insert_all(:jobs, fn %{user: user} ->
email_job = EmailWorker.new(%{id: user.id})
staff_job = StaffWorker.new(%{id: user.id})
[email_job, staff_job]
end)
|> MyApp.Repo.transaction()
@spec pause_queue(name(), opts :: [queue_option()]) :: :ok
Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.
When shutdown begins all queues are paused.
Options
:queue
- a string or atom specifying the queue to pause, required:local_only
- whether the queue will be paused only on the local node, default:false
:node
- restrict pausing to a particular node
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true
as even if the queue does not exist locally, it might be running on
another node.
Example
Pause the default queue:
Oban.pause_queue(queue: :default)
:ok
Pause the default queue, but only on the local node:
Oban.pause_queue(queue: :default, local_only: true)
:ok
Pause the default queue only on a particular node:
Oban.pause_queue(queue: :default, node: "worker.1")
:ok
@spec resume_queue(name(), opts :: [queue_option()]) :: :ok
Resume executing jobs in a paused queue.
Options
:queue
- a string or atom specifying the queue to resume, required:local_only
- whether the queue will be resumed only on the local node, default:false
:node
- restrict resuming to a particular node
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true
as even if the queue does not exist locally, it might be running on
another node.
Example
Resume a paused default queue:
Oban.resume_queue(queue: :default)
:ok
Resume the default queue, but only on the local node:
Oban.resume_queue(queue: :default, local_only: true)
:ok
Resume the default queue only on a particular node:
Oban.resume_queue(queue: :default, node: "worker.1")
:ok
@spec retry_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
Retries all jobs that match on the given queryable.
If no queryable is given, Oban will retry all jobs that aren't currently available
or
executing
. Note that regardless of constraints, it will never retry available
or
executing
jobs.
Example
Retries jobs in any state other than available
or executing
:
Oban.retry_all_jobs(Oban.Job)
{:ok, 9}
Retries jobs with the retryable
state:
Oban.Job
|> Ecto.Query.where(state: "retryable")
|> Oban.retry_all_jobs()
{:ok, 3}
Retries all inactive jobs with priority 0
Oban.Job
|> Ecto.Query.where(priority: 0)
|> Oban.retry_all_jobs()
{:ok, 5}
@spec retry_job(name(), job_or_id :: Oban.Job.t() | integer()) :: :ok
Sets a job as available
, adding attempts if already maxed out. Jobs currently available
,
executing
or scheduled
are ignored. The job is scheduled for immediate execution.
Example
Retry a job:
Oban.retry_job(job)
:ok
@spec scale_queue(name(), opts :: [queue_option()]) :: :ok
Scale the concurrency for a queue.
Options
:queue
- a string or atom specifying the queue to scale, required:limit
— the new concurrency limit, required:local_only
— whether the queue will be scaled only on the local node, default:false
:node
- restrict scaling to a particular node
In addition, all engine-specific queue options are passed along after validation.
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true
as even if the queue does not exist locally, it might be running on
another node.
Example
Scale a queue up, triggering immediate execution of queued jobs:
Oban.scale_queue(queue: :default, limit: 50)
:ok
Scale the queue back down, allowing executing jobs to finish:
Oban.scale_queue(queue: :default, limit: 5)
:ok
Scale the queue only on the local node:
Oban.scale_queue(queue: :default, limit: 10, local_only: true)
:ok
Scale the queue on a particular node:
Oban.scale_queue(queue: :default, limit: 10, node: "worker.1")
:ok
@spec start_link([option()]) :: Supervisor.on_start()
Starts an Oban
supervision tree linked to the current process.
Options
These options are required; without them the supervisor won't start:
:repo
— specifies the Ecto repo used to insert and retrieve jobs
Primary Options
These options determine what the system does at a high level, i.e. which queues to run:
:engine
— facilitates inserting, fetching, and otherwise managing jobs.There are three built-in engines:
Oban.Engines.Basic
for Postgres databases,Oban.Engines.Lite
for SQLite3 databases, andOban.Engines.Inline
for simplified testing (only available for:inline
testing mode).When the
Oban.Engines.Lite
engine is used the:notifier
and:peer
are automatically set toPG
andisolated
mode, respectively.Additional engines, such as Oban Pro's
SmartEngine
with advanced functionality for Postgres, are also available as an add-on.Defaults to the
Basic
engine for Postgres.:log
— eitherfalse
to disable logging or a standard log level (:error
,:warning
,:info
,:debug
, etc.). This determines whether queries are logged or not; overriding the repo's configured log level. Defaults tofalse
, where no queries are logged.:name
— used for supervisor registration, it must be unique across an entire VM instance. Defaults toOban
when no name is provided.:node
— used to identify the node that the supervision tree is running in. If no value is provided it will use thenode
name in a distributed system, or thehostname
in an isolated node. See "Node Name" below.:notifier
— used to relay messages between processes, nodes, and the Postgres database.There are two built-in notifiers:
Oban.Notifiers.Postgres
, which uses Postgres PubSub; andOban.Notifiers.PG
, which uses process groups with distributed erlang. Defaults to the Postgres notifier.:peer
— used to specify which peer module to use for cluster leadership.There are two built-in peers:
Oban.Peers.Postgres
, which uses table-based leadership through theoban_peers
table; andOban.Peers.Global
, which uses global locks through distributed Erlang.Leadership can be disabled by setting
peer: false
, but note that centralized plugins likeCron
won't run without leadership.Defaults to the
Postgres
peer.:plugins
— a list or modules or module/option tuples that are started as children of an Oban supervisor. Any supervisable module is a valid plugin, i.e. aGenServer
or anAgent
.:prefix
— the query prefix, or schema, to use for inserting and executing jobs. Anoban_jobs
table must exist within the prefix. See the "Prefix Support" section in the module documentation for more details.:queues
— a keyword list where the keys are queue names and the values are the concurrency setting or a keyword list of queue options. For example, setting queues to[default: 10, exports: 5]
would start the queuesdefault
andexports
with a combined concurrency level of 15. The concurrency setting specifies how many jobs each queue will run concurrently.Queues accept additional override options to customize their behavior, e.g. by setting
paused
ordispatch_cooldown
for a specific queue.:testing
— a mode that controls how an instance is configured for testing. When set to:inline
or:manual
queues, peers, and plugins are automatically disabled. Defaults to:disabled
, no test mode.
Twiddly Options
Additional options used to tune system behaviour. These are primarily useful for testing or troubleshooting and don't usually need modification.
:dispatch_cooldown
— the minimum number of milliseconds a producer will wait before fetching and running more jobs. A slight cooldown period prevents a producer from flooding with messages and thrashing the database. The cooldown period directly impacts a producer's throughput: jobs per second for a single queue is calculated by(1000 / cooldown) * limit
. For example, with a5ms
cooldown and a queue limit of25
a single queue can run 5,000 jobs/sec.The default is
5ms
and the minimum is1ms
, which is likely faster than the database can return new jobs to run.:shutdown_grace_period
— the amount of time a queue will wait for executing jobs to complete before hard shutdown, specified in milliseconds. The default is15_000
, or 15 seconds.:stage_interval
— the number of milliseconds between making scheduled jobs available and notifying relevant queues that jobs are available. This is directly tied to the resolution ofscheduled
orretryable
jobs and how frequently the database is checked for jobs to run. To minimize database load, only5_000
jobs are staged at each interval.Only the leader node stages jobs and notifies queues when the
:notifier's
pubsub notifications are functional. If pubusb messages can't get through then staging switches to a less efficient "local" mode in which all nodes poll for jobs to run.Setting the interval to
:infinity
disables staging entirely. The default is1_000ms
.
Example
Start a stand-alone Oban
instance:
{:ok, pid} = Oban.start_link(repo: MyApp.Repo, queues: [default: 10])
To start an Oban
instance within an application's supervision tree:
def start(_type, _args) do
children = [MyApp.Repo, {Oban, repo: MyApp.Repo, queues: [default: 10]}]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
Start multiple, named Oban
supervisors within a supervision tree:
children = [
MyApp.Repo,
{Oban, name: Oban.A, repo: MyApp.Repo, queues: [default: 10]},
{Oban, name: Oban.B, repo: MyApp.Repo, queues: [special: 10]},
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
Start a local Oban
instance for SQLite:
{:ok, pid} = Oban.start_link(engine: Oban.Engines.Lite, repo: MyApp.Repo)
Node Name
When the node
value hasn't been configured it is generated based on the environment:
- In a distributed system the node name is used
- In a Heroku environment the system environment's
DYNO
value is used - Otherwise, the system hostname is used
Start a new supervised queue.
By default this starts a new supervised queue across all nodes running Oban on the same database
and prefix. You can pass the option local_only: true
if you prefer to start the queue only on
the local node.
Options
:queue
- a string or atom specifying the queue to start, required:local_only
- whether the queue will be started only on the local node, default:false
:limit
- set the concurrency limit, required:paused
— set whether the queue starts in the "paused" state, optional
In addition, all engine-specific queue options are passed along after validation.
Example
Start the :priority
queue with a concurrency limit of 10 across the connected nodes.
Oban.start_queue(queue: :priority, limit: 10)
:ok
Start the :media
queue with a concurrency limit of 5 only on the local node.
Oban.start_queue(queue: :media, limit: 5, local_only: true)
:ok
Start the :media
queue on a particular node.
Oban.start_queue(queue: :media, limit: 5, node: "worker.1")
:ok
Start the :media
queue in a paused
state.
Oban.start_queue(queue: :media, limit: 5, paused: true)
:ok
@spec stop_queue(name(), opts :: [queue_option()]) :: :ok
Shutdown a queue's supervision tree and stop running jobs for that queue.
By default this action will occur across all the running nodes. Still, if you prefer to stop the
queue's supervision tree and stop running jobs for that queue only on the local node, you can
pass the option: local_only: true
The shutdown process pauses the queue first and allows current jobs to exit gracefully, provided they finish within the shutdown limit.
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true
as even if the queue does not exist locally, it might be running on
another node.
Options
:queue
- a string or atom specifying the queue to stop, required:local_only
- whether the queue will be stopped only on the local node, default:false
:node
- restrict stopping to a particular node
Example
Stop a running queue on all nodes:
Oban.stop_queue(queue: :default)
:ok
Stop the queue only on the local node:
Oban.stop_queue(queue: :media, local_only: true)
:ok
Stop the queue only on a particular node:
Oban.stop_queue(queue: :media, node: "worker.1")
:ok
Returns the pid of the root Oban process for the given name.
Example
Find the default instance:
Oban.whereis(Oban)
Find a dynamically named instance:
Oban.whereis({:oban, 1})