View Source Oban (Oban v2.14.1)
Oban is a robust job processing library which uses PostgreSQL or SQLite3 for storage and coordination.
installation
Installation
See the installation guide for details on installing and configuring Oban in your application.
requirements
Requirements
Oban requires Elixir 1.12+, Erlang 22+, and PostgreSQL 12.0+ or SQLite3 3.37.0+.
testing
Testing
Find testing setup, helpers, and strategies in the testing guide.
oban-web-pro
Oban Web+Pro
A web dashboard for managing Oban, along with an official set of extensions, plugins, and workers are available as licensed packages:
Learn more about prices and licensing Oban Web+Pro at getoban.pro.
running-with-sqlite3
Running with SQLite3
Oban ships with engines for PostgreSQL and SQLite3. Both engines support the same core functionality for a single node, while the Postgres engine is more advanced and designed to run in a distributed environment.
Running with SQLite3 requires adding ecto_sqlite3 to your app's dependencies
and setting the Oban.Engines.Lite engine:
config :my_app, Oban,
engine: Oban.Engines.Lite,
queues: [default: 10],
repo: MyApp.RepoPlease note that SQLite3 may not be suitable for high-concurrency systems or for systems that need to handle large amounts of data. If you expect your background jobs to generate high loads, it would be better to use a more robust database solution that supports horizontal scalability, like Postgres.
configuring-queues
Configuring Queues
Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:
config :my_app, Oban,
queues: [default: 10, mailers: 20, events: 50, media: 5],
repo: MyApp.RepoYou may also use an expanded form to configure queues with individual overrides:
queues: [
default: 10,
events: [limit: 50, paused: true]
]The events queue will now start in a paused state, which means it won't
process anything until Oban.resume_queue/2 is called to start it.
There isn't a limit to the number of queues or how many jobs may execute concurrently in each queue. Some additional guidelines:
Caveats & Guidelines
Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.
Queue limits are local (per-node), not global (per-cluster). For example, running a queue with a local limit of one on three separate nodes is effectively a global limit of three. If you require a global limit you must restrict the number of nodes running a particular queue.
Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.
Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don't apply when using ports or shelling out commands.
defining-workers
Defining Workers
Worker modules do the work of processing a job. At a minimum they must define a
perform/1 function, which is called with an %Oban.Job{} struct.
Note that the args field of the job struct will always have string keys,
regardless of the key type when the job was enqueued. The args are stored as
json and the serialization process automatically stringifies all keys.
Define a worker to process jobs in the events queue:
defmodule MyApp.Business do
use Oban.Worker, queue: :events
@impl Oban.Worker
def perform(%Oban.Job{args: %{"id" => id} = args}) do
model = MyApp.Repo.get(MyApp.Business.Man, id)
case args do
%{"in_the" => "business"} ->
IO.inspect(model)
%{"vote_for" => vote} ->
IO.inspect([vote, model])
_ ->
IO.inspect(model)
end
:ok
end
endThe use macro also accepts options to customize max attempts, priority, tags,
and uniqueness:
defmodule MyApp.LazyBusiness do
use Oban.Worker,
queue: :events,
priority: 3,
max_attempts: 3,
tags: ["business"],
unique: [period: 30]
@impl Oban.Worker
def perform(_job) do
# do business slowly
:ok
end
endSuccessful jobs should return :ok or an {:ok, value} tuple. The value
returned from perform/1 is used to control whether the job is treated as a
success, a failure, cancelled or deferred for retrying later.
See the Oban.Worker docs for more details on failure conditions and
Oban.Telemetry for details on job reporting.
enqueueing-jobs
Enqueueing Jobs
Jobs are simply Ecto structs and are enqueued by inserting them into the
database. For convenience and consistency all workers provide a new/2
function that converts an args map into a job changeset suitable for insertion:
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()The worker's defaults may be overridden by passing options:
%{id: 1, vote_for: "none of the above"}
|> MyApp.Business.new(queue: :special, max_attempts: 5)
|> Oban.insert()Unique jobs can be configured in the worker, or when the job is built:
%{email: "brewster@example.com"}
|> MyApp.Mailer.new(unique: [period: 300, fields: [:queue, :worker])
|> Oban.insert()Job priority can be specified using an integer from 0 to 3, with 0 being the default and highest priority:
%{id: 1}
|> MyApp.Backfiller.new(priority: 2)
|> Oban.insert()Any number of tags can be added to a job dynamically, at the time it is inserted:
id = 1
%{id: id}
|> MyApp.OnboardMailer.new(tags: ["mailer", "record-#{id}"])
|> Oban.insert()Multiple jobs can be inserted in a single transaction:
Ecto.Multi.new()
|> Oban.insert(:b_job, MyApp.Business.new(%{id: 1}))
|> Oban.insert(:m_job, MyApp.Mailer.new(%{email: "brewser@example.com"}))
|> Repo.transaction()Occasionally you may need to insert a job for a worker that exists in another
application. In that case you can use Oban.Job.new/2 to build the changeset
manually:
%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: OtherApp.Worker)
|> Oban.insert()Oban.insert/2,4 is the preferred way of inserting jobs as it provides some of
Oban's advanced features (i.e., unique jobs). However, you can use your
application's Repo.insert/2 function if necessary.
See Oban.Job.new/2 for a full list of job options.
scheduling-jobs
Scheduling Jobs
Jobs may be scheduled down to the second any time in the future:
%{id: 1}
|> MyApp.Business.new(schedule_in: 5)
|> Oban.insert()Jobs may also be scheduled at a specific datetime in the future:
%{id: 1}
|> MyApp.Business.new(scheduled_at: ~U[2020-12-25 19:00:56.0Z])
|> Oban.insert()Scheduling is always in UTC. You'll have to shift timestamps in other zones to UTC before scheduling:
%{id: 1}
|> MyApp.Business.new(scheduled_at: DateTime.shift_zone!(datetime, "Etc/UTC"))
|> Oban.insert()Caveats & Guidelines
Usually, scheduled job management operates in global mode and notifies queues
of available jobs via PubSub to minimize database load. However, when PubSub
isn't available, staging switches to a local mode where each queue polls
independently.
Local mode is less efficient and will only happen if you're running in an
environment where neither Postgres nor PG notifications work. That situation
should be rare and limited to the following conditions:
- Running with a connection pooler, i.e.,
pg_bouncer, in transaction mode. - Running without clustering, i.e., without Distributed Erlang
If both of those criteria apply and PubSub notifications won't work, then
staging will switch to polling in local mode.
prioritizing-jobs
Prioritizing Jobs
Normally, all available jobs within a queue are executed in the order they were
scheduled. You can override the normal behavior and prioritize or de-prioritize
a job by assigning a numerical priority.
Priorities from 0-3 are allowed, where 0 is the highest priority and 3 is the lowest.
The default priority is 0, unless specified all jobs have an equally high priority.
All jobs with a higher priority will execute before any jobs with a lower priority. Within a particular priority jobs are executed in their scheduled order.
unique-jobs
Unique Jobs
The unique jobs feature lets you specify constraints to prevent enqueueing
duplicate jobs. Uniqueness is based on a combination of args, queue,
worker, state and insertion time. It is configured at the worker or job
level using the following options:
:period— The number of seconds until a job is no longer considered duplicate. You should always specify a period, otherwise Oban will default to 60 seconds.:infinitycan be used to indicate the job be considered a duplicate as long as jobs are retained.:fields— The fields to compare when evaluating uniqueness. The available fields are:args,:queueand:worker, by default all three are used.:keys— A specific subset of the:argsto consider when comparing against historic jobs. This allows a job with multiple key/value pairs in the args to be compared using only a subset of them.:states— The job states that are checked for duplicates. The available states are:available,:scheduled,:executing,:retryable,:completed,:cancelledand:discarded. By default all states except for:discardedand:cancelledare checked, which prevents duplicates even if the previous job has been completed.
For example, configure a worker to be unique across all fields and states for 60 seconds:
use Oban.Worker, unique: [period: 60]Configure the worker to be unique only by :worker and :queue:
use Oban.Worker, unique: [fields: [:queue, :worker], period: 60]Or, configure a worker to be unique until it has executed:
use Oban.Worker, unique: [period: 300, states: [:available, :scheduled, :executing]]Only consider the :url key rather than the entire args:
use Oban.Worker, unique: [fields: [:args, :worker], keys: [:url]]You can use Oban.Job.states/0 to specify uniqueness across all states,
including :discarded:
use Oban.Worker, unique: [period: 300, states: Oban.Job.states()]Detecting Unique Conflicts
When unique settings match an existing job, the return value of Oban.insert/2
is still {:ok, job}. However, you can detect a unique conflict by checking the
jobs' :conflict? field. If there was an existing job, the field is true;
otherwise it is false.
You can use the :conflict? field to customize responses after insert:
with {:ok, %Job{conflict?: true}} <- Oban.insert(changeset) do
{:error, :job_already_exists}
endNote that conflicts are only detected for jobs enqueued through Oban.insert/2,3.
Jobs enqueued through Oban.insert_all/2 do not use per-job unique
configuration.
Replacing Values
In addition to detecting unique conflicts, passing options to replace can
update any job field when there is a conflict. Any of the following fields can
be replaced per state: args, max_attempts, meta, priority, queue,
scheduled_at, tags, worker.
For example, to change the priority and increase max_attempts when there is
a conflict with a job in a scheduled state:
BusinessWorker.new(
args,
max_attempts: 5,
priority: 0,
replace: [scheduled: [:max_attempts, :priority]]
)Another example is bumping the scheduled time on conflict. Either scheduled_at
or schedule_in values will work, but the replace option is always
scheduled_at.
UrgentWorker.new(args, schedule_in: 1, replace: [scheduled: [:scheduled_at]])NOTE: If you use this feature to replace a field (e.g. args) in the
executing state by doing something like: UniqueWorker.new(new_args, replace: [executing: [:args]]) Oban will update the args, but the job will continue
executing with the original value.
Strong Guarantees
Unique jobs are guaranteed through transactional locks and database queries: they do not rely on unique constraints in the database. This makes uniqueness entirely configurable by application code, without the need for database migrations.
pruning-historic-jobs
Pruning Historic Jobs
Job stats and queue introspection are built on keeping job rows in the database
after they have completed. This allows administrators to review completed jobs
and build informative aggregates, at the expense of storage and an unbounded
table size. To prevent the oban_jobs table from growing indefinitely, Oban
provides active pruning of completed, cancelled and discarded jobs.
By default, the Pruner plugin retains jobs for 60 seconds. You can configure a
longer retention period by providing a max_age in seconds to the Pruner
plugin.
# Set the max_age for 5 minutes
config :my_app, Oban,
plugins: [{Oban.Plugins.Pruner, max_age: 300}]
...Caveats & Guidelines
Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified age may not be pruned immediately after jobs complete.
Pruning is only applied to jobs that are
completed,cancelledordiscarded. It'll never delete a new job, a scheduled job or a job that failed and will be retried.
periodic-jobs
Periodic Jobs
Oban's Cron plugin registers workers a cron-like schedule and enqueues jobs
automatically. Periodic jobs are declared as a list of {cron, worker} or
{cron, worker, options} tuples:
config :my_app, Oban,
repo: MyApp.Repo,
plugins: [
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", MyApp.MinuteWorker},
{"0 * * * *", MyApp.HourlyWorker, args: %{custom: "arg"}},
{"0 0 * * *", MyApp.DailyWorker, max_attempts: 1},
{"0 12 * * MON", MyApp.MondayWorker, queue: :scheduled, tags: ["mondays"]},
{"@daily", MyApp.AnotherDailyWorker}
]}
]The crontab would insert jobs as follows:
MyApp.MinuteWorker— Inserted once every minuteMyApp.HourlyWorker— Inserted at the first minute of every hour with custom argsMyApp.DailyWorker— Inserted at midnight every day with no retriesMyApp.MondayWorker— Inserted at noon every Monday in the "scheduled" queueMyApp.AnotherDailyWorker— Inserted at midnight every day with no retries
The crontab format respects all standard rules and has one minute resolution. Jobs are considered unique for most of each minute, which prevents duplicate jobs with multiple nodes and across node restarts.
Like other jobs, recurring jobs will use the :queue specified by the worker
module (or :default if one is not specified).
cron-expressions
Cron Expressions
Standard Cron expressions are composed of rules specifying the minutes, hours, days, months and weekdays. Rules for each field are comprised of literal values, wildcards, step values or ranges:
*— Wildcard, matches any value (0, 1, 2, ...)0— Literal, matches only itself (only 0)*/15— Step, matches any value that is a multiple (0, 15, 30, 45)0-5— Range, matches any value within the range (0, 1, 2, 3, 4, 5)0-9/2- Step values can be used in conjunction with ranges (0, 2, 4, 6, 8)
Each part may have multiple rules, where rules are separated by a comma. The allowed values for each field are as follows:
minute— 0-59hour— 0-23days— 1-31month— 1-12 (or aliases,JAN,FEB,MAR, etc.)weekdays— 0-6 (or aliases,SUN,MON,TUE, etc.)
The following Cron extensions are supported:
@hourly—0 * * * *@daily(as well as@midnight) —0 0 * * *@weekly—0 0 * * 0@monthly—0 0 1 * *@yearly(as well as@annually) —0 0 1 1 *@reboot— Run once at boot across the entire cluster
Some specific examples that demonstrate the full range of expressions:
0 * * * *— The first minute of every hour*/15 9-17 * * *— Every fifteen minutes during standard business hours0 0 * DEC *— Once a day at midnight during December0 7-9,4-6 13 * FRI— Once an hour during both rush hours on Friday the 13th
For more in depth information see the man documentation for cron and crontab
in your system. Alternatively you can experiment with various expressions
online at Crontab Guru.
Caveats & Guidelines
All schedules are evaluated as UTC unless a different timezone is provided. See
Oban.Plugins.Cronfor information about configuring a timezone.Workers can be used for regular and scheduled jobs so long as they accept different arguments.
Duplicate jobs are prevented through transactional locks and unique constraints. Workers that are used for regular and scheduled jobs must not specify
uniqueoptions less than60s.Long running jobs may execute simultaneously if the scheduling interval is shorter than it takes to execute the job. You can prevent overlap by passing custom
uniqueopts in the crontab config:custom_args = %{scheduled: true} unique_opts = [ period: 60 * 60 * 24, states: [:available, :scheduled, :executing] ] config :my_app, Oban, repo: MyApp.Repo, plugins: [ {Oban.Plugins.Cron, crontab: [ {"* * * * *", MyApp.SlowWorker, args: custom_args, unique: unique_opts} ]} ]
error-handling
Error Handling
When a job returns an error value, raises an error, or exits during execution the
details are recorded within the errors array on the job. When the number of
execution attempts is below the configured max_attempts limit, the job will
automatically be retried in the future.
The retry delay has an exponential backoff, meaning the job's second attempt will be after 16s, third after 31s, fourth after 1m 36s, etc.
See the Oban.Worker documentation on "Customizing Backoff" for alternative
backoff strategies.
error-details
Error Details
Execution errors are stored as a formatted exception along with metadata about when the failure occurred and which attempt caused it. Each error is stored with the following keys:
atThe UTC timestamp when the error occurred atattemptThe attempt number when the error occurrederrorA formatted error message and stacktrace
See the Instrumentation docs for an example of integrating with external error reporting systems.
limiting-retries
Limiting Retries
By default, jobs are retried up to 20 times. The number of retries is controlled
by the max_attempts value, which can be set at the Worker or Job level. For
example, to instruct a worker to discard jobs after three failures:
use Oban.Worker, queue: :limited, max_attempts: 3
limiting-execution-time
Limiting Execution Time
By default, individual jobs may execute indefinitely. If this is undesirable you
may define a timeout in milliseconds with the timeout/1 callback on your
worker module.
For example, to limit a worker's execution time to 30 seconds:
def MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(_job) do
something_that_may_take_a_long_time()
:ok
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(30)
endThe timeout/1 function accepts an Oban.Job struct, so you can customize the
timeout using any job attributes.
Define the timeout value through job args:
def timeout(%_{args: %{"timeout" => timeout}}), do: timeoutDefine the timeout based on the number of attempts:
def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)
instrumentation-error-reporting-and-logging
Instrumentation, Error Reporting, and Logging
Oban provides integration with Telemetry, a dispatching library for
metrics. It is easy to report Oban metrics to any backend by attaching to
:oban events.
Here is an example of a sample unstructured log handler:
defmodule MyApp.ObanLogger do
require Logger
def handle_event([:oban, :job, :start], measure, meta, _) do
Logger.warn("[Oban] :started #{meta.worker} at #{measure.system_time}")
end
def handle_event([:oban, :job, event], measure, meta, _) do
Logger.warn("[Oban] #{event} #{meta.worker} ran in #{measure.duration}")
end
endAttach the handler to success and failure events in application.ex:
events = [[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]]
:telemetry.attach_many("oban-logger", events, &MyApp.ObanLogger.handle_event/4, [])The Oban.Telemetry module provides a robust structured logger that handles all
of Oban's telemetry events. As in the example above, attach it within your
application.ex module:
:ok = Oban.Telemetry.attach_default_logger()For more details on the default structured logger and information on event
metadata see docs for the Oban.Telemetry module.
reporting-errors
Reporting Errors
Another great use of execution data is error reporting. Here is an example of integrating with Sentry to report job failures:
defmodule ErrorReporter do
def handle_event([:oban, :job, :exception], measure, meta, _) do
extra =
meta.job
|> Map.take([:id, :args, :meta, :queue, :worker])
|> Map.merge(measure)
Sentry.capture_exception(meta.reason, stacktrace: meta.stacktrace, extra: extra)
end
end
:telemetry.attach(
"oban-errors",
[:oban, :job, :exception],
&ErrorReporter.handle_event/4,
[]
)You can use exception events to send error reports to Honeybadger, Rollbar, AppSignal or any other application monitoring platform.
instance-and-database-isolation
Instance and Database Isolation
You can run multiple Oban instances with different prefixes on the same system and have them entirely isolated, provided you give each supervisor a distinct id.
Here we configure our application to start three Oban supervisors using the "public", "special" and "private" prefixes, respectively:
def start(_type, _args) do
children = [
Repo,
Endpoint,
{Oban, name: ObanA, repo: Repo},
{Oban, name: ObanB, repo: Repo, prefix: "special"},
{Oban, name: ObanC, repo: Repo, prefix: "private"}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
umbrella-apps
Umbrella Apps
If you need to run Oban from an umbrella application where more than one of
the child apps need to interact with Oban, you may need to set the :name for
each child application that configures Oban.
For example, your umbrella contains two apps: MyAppA and MyAppB. MyAppA is
responsible for inserting jobs, while only MyAppB actually runs any queues.
Configure Oban with a custom name for MyAppA:
config :my_app_a, Oban,
name: MyAppA.Oban,
repo: MyApp.RepoThen configure Oban for MyAppB with a different name:
config :my_app_b, Oban,
name: MyAppB.Oban,
repo: MyApp.Repo,
queues: [default: 10]Now, use the configured name when calling functions like Oban.insert/2,
Oban.insert_all/2, Oban.drain_queue/2, etc., to reference the correct Oban
process for the current application.
Oban.insert(MyAppA.Oban, MyWorker.new(%{}))
Oban.insert_all(MyAppB.Oban, multi, :multiname, [MyWorker.new(%{})])
Oban.drain_queue(MyAppB.Oban, queue: :default)
database-prefixes
Database Prefixes
Oban supports namespacing through PostgreSQL schemas, also called "prefixes" in Ecto. With prefixes your jobs table can reside outside of your primary schema (usually public) and you can have multiple separate job tables.
To use a prefix you first have to specify it within your migration:
defmodule MyApp.Repo.Migrations.AddPrefixedObanJobsTable do
use Ecto.Migration
def up do
Oban.Migrations.up(prefix: "private")
end
def down do
Oban.Migrations.down(prefix: "private")
end
endThe migration will create the "private" schema and all tables, functions and triggers within that schema. With the database migrated you'll then specify the prefix in your configuration:
config :my_app, Oban,
prefix: "private",
repo: MyApp.Repo,
queues: [default: 10]Now all jobs are inserted and executed using the private.oban_jobs table. Note
that Oban.insert/2,4 will write jobs in the private.oban_jobs table, you'll
need to specify a prefix manually if you insert jobs directly through a repo.
Not only is the oban_jobs table isolated within the schema, but all
notification events are also isolated. That means that insert/update events will
only dispatch new jobs for their prefix.
dynamic-repositories
Dynamic Repositories
Oban supports Ecto dynamic repositories through the
:get_dynamic_repo option. To make this work, you need to run a separate Oban
instance per each dynamic repo instance. Most often it's worth bundling each
Oban and repo instance under the same supervisor:
def start_repo_and_oban(instance_id) do
children = [
{MyDynamicRepo, name: nil, url: repo_url(instance_id)},
{Oban, name: instance_id, get_dynamic_repo: fn -> repo_pid(instance_id) end}
]
Supervisor.start_link(children, strategy: :one_for_one)
endThe function repo_pid/1 must return the pid of the repo for the given
instance. You can use Registry to register the repo (for example in the repo's
init/2 callback) and discover it.
If your application exclusively uses dynamic repositories and doesn't specify
all credentials upfront, you must implement an init/1 callback in your Ecto
Repo. Doing so provides the Postgres notifier with the correct credentials on
init, allowing jobs to process as expected.
Link to this section Summary
Functions
Cancel many jobs based on a queryable and mark them as cancelled to prevent them from running.
Any currently executing jobs are killed while the others are ignored.
Cancel an executing, available, scheduled or retryable job and mark it as cancelled to
prevent it from running. If the job is currently executing it will be killed and otherwise it
is ignored.
Check the current state of a queue producer.
Returns a specification to start this module under a supervisor.
Retrieve the Oban.Config struct for a named Oban supervision tree.
Synchronously execute all available jobs in a queue.
Insert a new job into the database for execution.
Put a job insert operation into an Ecto.Multi.
Similar to insert/3, but raises an Ecto.InvalidChangesetError if the job can't be inserted.
Insert multiple jobs into the database for execution.
Put an insert_all operation into an Ecto.Multi.
Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.
Resume executing jobs in a paused queue.
Retries all jobs that match on the given queryable. Please note that no matter the
queryable constraints, it will never retry available, executing or scheduled jobs.
Sets a job as available, adding attempts if already maxed out. If the job is currently
available, executing or scheduled it will be ignored. The job is scheduled for immediate
execution.
Scale the concurrency for a queue.
Starts an Oban supervision tree linked to the current process.
Start a new supervised queue.
Shutdown a queue's supervision tree and stop running jobs for that queue.
Returns the pid of the root Oban process for the given name.
Link to this section Types
@type changeset_or_fun() :: Oban.Job.changeset() | Oban.Job.changeset_fun()
@type changeset_wrapper() :: %{ :changesets => Oban.Job.changeset_list(), optional(atom()) => term() }
@type changesets_or_wrapper() :: Oban.Job.changeset_list() | changeset_wrapper()
@type changesets_or_wrapper_or_fun() :: changesets_or_wrapper() | Oban.Job.changeset_list_fun()
@type drain_option() :: {:queue, queue_name()} | {:with_limit, pos_integer()} | {:with_recursion, boolean()} | {:with_safety, boolean()} | {:with_scheduled, boolean() | DateTime.t()}
@type drain_result() :: %{ cancelled: non_neg_integer(), discard: non_neg_integer(), failure: non_neg_integer(), snoozed: non_neg_integer(), success: non_neg_integer() }
@type multi() :: Ecto.Multi.t()
@type multi_name() :: Ecto.Multi.name()
@type name() :: term()
@type option() :: {:dispatch_cooldown, pos_integer()} | {:engine, module()} | {:get_dynamic_repo, nil | (() -> pid() | atom())} | {:log, false | Logger.level()} | {:name, name()} | {:node, String.t()} | {:notifier, module()} | {:peer, false | module()} | {:plugins, false | [module() | {module() | Keyword.t()}]} | {:prefix, String.t()} | {:queues, false | [{queue_name(), pos_integer() | Keyword.t()}]} | {:repo, module()} | {:shutdown_grace_period, timeout()} | {:stage_interval, timeout()} | {:testing, :disabled | :inline | :manual}
@type queue_option() :: {:queue, queue_name()} | {:limit, pos_integer()} | {:local_only, boolean()} | {:node, String.t()}
@type queue_state() :: %{ :limit => pos_integer(), :node => binary(), :paused => boolean(), :queue => queue_name(), :running => [pos_integer()], :started_at => DateTime.t(), :updated_at => DateTime.t(), optional(atom()) => any() }
Link to this section Functions
@spec cancel_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
Cancel many jobs based on a queryable and mark them as cancelled to prevent them from running.
Any currently executing jobs are killed while the others are ignored.
If executing jobs happen to fail before cancellation then the state is set to cancelled.
However, any that complete successfully will remain completed.
Only jobs with the statuses executing, available, scheduled, or retryable can be cancelled.
example
Example
Cancel all jobs:
Oban.cancel_all_jobs(Oban.Job)
{:ok, 9}Cancel all jobs for a specific worker:
Oban.Job
|> Ecto.Query.where(worker: "MyApp.MyWorker")
|> Oban.cancel_all_jobs()
{:ok, 2}
@spec cancel_job(name(), job_id :: pos_integer()) :: :ok
Cancel an executing, available, scheduled or retryable job and mark it as cancelled to
prevent it from running. If the job is currently executing it will be killed and otherwise it
is ignored.
If an executing job happens to fail before it can be cancelled the state is set to cancelled.
However, if it manages to complete successfully then the state will still be completed.
example
Example
Cancel a scheduled job with the id 1:
Oban.cancel_job(1)
:ok
@spec check_queue(name(), opts :: [{:queue, queue_name()}]) :: queue_state()
Check the current state of a queue producer.
This allows you to introspect on a queue's health by retrieving key attributes of the producer's
state; values such as the current limit, the running job ids, and when the producer was
started.
options
Options
:queue- a string or atom specifying the queue to check, required
example
Example
Oban.check_queue(queue: :default)
%{
limit: 10,
node: "me@local",
paused: false,
queue: "default",
running: [100, 102],
started_at: ~D[2020-10-07 15:31:00],
updated_at: ~D[2020-10-07 15:31:00]
}
@spec child_spec([option()]) :: Supervisor.child_spec()
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec config(name()) :: Oban.Config.t()
Retrieve the Oban.Config struct for a named Oban supervision tree.
example
Example
Retrieve the default Oban instance config:
%Oban.Config{} = Oban.config()Retrieve the config for an instance started with a custom name:
%Oban.Config{} = Oban.config(MyCustomOban)
@spec drain_queue(name(), [drain_option()]) :: drain_result()
Synchronously execute all available jobs in a queue.
All execution happens within the current process and it is guaranteed not to raise an error or exit.
Draining a queue from within the current process is especially useful for testing. Jobs that are
enqueued by a process when Ecto is in sandbox mode are only visible to that process. Calling
drain_queue/2 allows you to control when the jobs are executed and to wait synchronously for
all jobs to complete.
failures-retries
Failures & Retries
Draining a queue uses the same execution mechanism as regular job dispatch. That means that any job failures or crashes are captured and result in a retry. Retries are scheduled in the future with backoff and won't be retried immediately.
By default jobs are executed in safe mode, just as they are in production. Safe mode catches
any errors or exits and records the formatted error in the job's errors array. That means
exceptions and crashes are not bubbled up to the calling process.
If you expect jobs to fail, would like to track failures, or need to check for specific errors
you can pass the with_safety: false flag. See the "Options" section below for more details.
scheduled-jobs
Scheduled Jobs
By default, drain_queue/2 will execute all currently available jobs. In order to execute
scheduled jobs, you may pass the with_scheduled: true which will cause all scheduled jobs to
be marked as available beforehand. To run jobs scheduled up to a specific point in time, pass
a DateTime instead.
options
Options
:queue- a string or atom specifying the queue to drain, required:with_limit— the maximum number of jobs to drain at once. When recursion is enabled this is how many jobs are processed per-iteration.:with_recursion— whether to keep draining a queue repeatedly when jobs insert more jobs:with_safety— whether to silently catch errors when draining, defaults totrue. Whenfalse, raised exceptions or unhandled exits are reraised (unhandled exits are wrapped inOban.CrashError).:with_scheduled— whether to include any scheduled jobs when draining, defaultfalse. Whentrue, drains all scheduled jobs. When aDateTimeis provided, drains all jobs scheduled up to, and including, that point in time.
example
Example
Drain a queue with three available jobs, two of which succeed and one of which fails:
Oban.drain_queue(queue: :default)
%{failure: 1, snoozed: 0, success: 2}Drain a queue including any scheduled jobs:
Oban.drain_queue(queue: :default, with_scheduled: true)
%{failure: 0, snoozed: 0, success: 1}Drain a queue including jobs scheduled up to a minute:
Oban.drain_queue(queue: :default, with_scheduled: DateTime.add(DateTime.utc_now(), 60, :second))Drain a queue and assert an error is raised:
assert_raise RuntimeError, fn -> Oban.drain_queue(queue: :risky, with_safety: false) endDrain a queue repeatedly until there aren't any more jobs to run. This is particularly useful for testing jobs that enqueue other jobs:
Oban.drain_queue(queue: :default, with_recursion: true)
%{failure: 1, snoozed: 0, success: 2}Drain only the top (by scheduled time and priority) five jobs off a queue:
Oban.drain_queue(queue: :default, with_limit: 5)
%{failure: 0, snoozed: 0, success: 1}Drain a queue recursively, only one job at a time:
Oban.drain_queue(queue: :default, with_limit: 1, with_recursion: true)
%{failure: 0, snoozed: 0, success: 3}
@spec insert(name(), Oban.Job.changeset(), Keyword.t()) :: {:ok, Oban.Job.t()} | {:error, Oban.Job.changeset() | term()}
@spec insert(multi(), multi_name(), changeset_or_fun()) :: multi()
Insert a new job into the database for execution.
This and the other insert variants are the recommended way to enqueue jobs because they
support features like unique jobs.
See the section on "Unique Jobs" for more details.
example
Example
Insert a single job:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}))Insert a job while ensuring that it is unique within the past 30 seconds:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}, unique: [period: 30]))Insert a job using a custom timeout:
{:ok, job} = Oban.insert(MyApp.Worker.new(%{id: 1}), timeout: 10_000)Insert a job using an alternative instance name:
{:ok, job} = Oban.insert(MyOban, MyApp.Worker.new(%{id: 1}))
@spec insert(name(), multi(), multi_name(), changeset_or_fun(), Keyword.t()) :: multi()
Put a job insert operation into an Ecto.Multi.
Like insert/2, this variant is recommended over Ecto.Multi.insert because it supports all of
Oban's features, i.e. unique jobs.
See the section on "Unique Jobs" for more details.
example
Example
Ecto.Multi.new()
|> Oban.insert("job-1", MyApp.Worker.new(%{id: 1}))
|> Oban.insert("job-2", fn _ -> MyApp.Worker.new(%{id: 2}) end)
|> MyApp.Repo.transaction()
@spec insert!(name(), Oban.Job.changeset(), opts :: Keyword.t()) :: Oban.Job.t()
Similar to insert/3, but raises an Ecto.InvalidChangesetError if the job can't be inserted.
example
Example
Insert a single job:
job = Oban.insert!(MyApp.Worker.new(%{id: 1}))Insert a job using a custom timeout:
job = Oban.insert!(MyApp.Worker.new(%{id: 1}), timeout: 10_000)Insert a job using an alternative instance name:
job = Oban.insert!(MyOban, MyApp.Worker.new(%{id: 1}))
insert_all(name \\ __MODULE__, changesets, opts \\ [])
View Source (since 0.9.0)@spec insert_all( name() | multi(), changesets_or_wrapper() | multi_name(), Keyword.t() | changesets_or_wrapper_or_fun() ) :: [Oban.Job.t()] | multi()
Insert multiple jobs into the database for execution.
There are a few important differences between this function and Ecto.Repo.insert_all/3:
- This function always returns a list rather than a tuple of
{count, records} - This function accepts a list of changesets rather than a list of maps or keyword lists
🌟 Unique Jobs and Batching
Only the SmartEngine in Oban Pro supports bulk unique jobs and automatic batching. With the basic engine, you must use
insert/3for unique support.
options
Options
Accepts any of Ecto's "Shared Options" such as timeout and log.
example
Example
Insert 100 jobs with a single operation:
1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all()Insert with a custom timeout:
1..100
|> Enum.map(&MyApp.Worker.new(%{id: &1}))
|> Oban.insert_all(timeout: 10_000)Insert with an alternative instance name:
changesets = Enum.map(1..100, &MyApp.Worker.new(%{id: &1}))
jobs = Oban.insert_all(MyOban, changesets)
insert_all(name, multi, multi_name, changesets, opts)
View Source (since 0.9.0)@spec insert_all( name(), multi(), multi_name(), changesets_or_wrapper_or_fun(), Keyword.t() ) :: multi()
Put an insert_all operation into an Ecto.Multi.
This function supports the same features and has the same caveats as insert_all/2.
example
Example
Insert job changesets within a multi:
changesets = Enum.map(0..100, &MyApp.Worker.new(%{id: &1}))
Ecto.Multi.new()
|> Oban.insert_all(:jobs, changesets)
|> MyApp.Repo.transaction()Insert job changesets using a function:
Ecto.Multi.new()
|> Ecto.Multi.insert(:user, user_changeset)
|> Oban.insert_all(:jobs, fn %{user: user} ->
email_job = EmailWorker.new(%{id: user.id})
staff_job = StaffWorker.new(%{id: user.id})
[email_job, staff_job]
end)
|> MyApp.Repo.transaction()
@spec pause_queue(name(), opts :: [queue_option()]) :: :ok
Pause a running queue, preventing it from executing any new jobs. All running jobs will remain running until they are finished.
When shutdown begins all queues are paused.
options
Options
:queue- a string or atom specifying the queue to pause, required:local_only- whether the queue will be paused only on the local node, default:false:node- restrict pausing to a particular node
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true as even if the queue does not exist locally, it might be running on
another node.
example
Example
Pause the default queue:
Oban.pause_queue(queue: :default)
:okPause the default queue, but only on the local node:
Oban.pause_queue(queue: :default, local_only: true)
:okPause the default queue only on a particular node:
Oban.pause_queue(queue: :default, node: "worker.1")
:ok
@spec resume_queue(name(), opts :: [queue_option()]) :: :ok
Resume executing jobs in a paused queue.
options
Options
:queue- a string or atom specifying the queue to resume, required:local_only- whether the queue will be resumed only on the local node, default:false:node- restrict resuming to a particular node
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true as even if the queue does not exist locally, it might be running on
another node.
example
Example
Resume a paused default queue:
Oban.resume_queue(queue: :default)
:okResume the default queue, but only on the local node:
Oban.resume_queue(queue: :default, local_only: true)
:okResume the default queue only on a particular node:
Oban.resume_queue(queue: :default, node: "worker.1")
:ok
@spec retry_all_jobs(name(), queryable :: Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
Retries all jobs that match on the given queryable. Please note that no matter the
queryable constraints, it will never retry available, executing or scheduled jobs.
If no queryable is given, Oban will retry all jobs in retryable states.
example
Example
Retries all retryable jobs
Oban.retry_all_jobs(Oban.Job)
{:ok, 9}Retries all retryable jobs with priority 0
Oban.Job
|> Ecto.Query.where(priority: 0)
|> Oban.retry_all_jobs()
{:ok, 5}
@spec retry_job(name :: atom(), job_id :: pos_integer()) :: :ok
Sets a job as available, adding attempts if already maxed out. If the job is currently
available, executing or scheduled it will be ignored. The job is scheduled for immediate
execution.
example
Example
Retry a discarded job with the id 1:
Oban.retry_job(1)
:ok
@spec scale_queue(name(), opts :: [queue_option()]) :: :ok
Scale the concurrency for a queue.
options
Options
:queue- a string or atom specifying the queue to scale, required:limit— the new concurrency limit, required:local_only— whether the queue will be scaled only on the local node, default:false:node- restrict scaling to a particular node
In addition, all engine-specific queue options are passed along after validation.
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true as even if the queue does not exist locally, it might be running on
another node.
example
Example
Scale a queue up, triggering immediate execution of queued jobs:
Oban.scale_queue(queue: :default, limit: 50)
:okScale the queue back down, allowing executing jobs to finish:
Oban.scale_queue(queue: :default, limit: 5)
:okScale the queue only on the local node:
Oban.scale_queue(queue: :default, limit: 10, local_only: true)
:okScale the queue on a particular node:
Oban.scale_queue(queue: :default, limit: 10, node: "worker.1")
:ok
@spec start_link([option()]) :: Supervisor.on_start()
Starts an Oban supervision tree linked to the current process.
options
Options
These options are required; without them the supervisor won't start:
:repo— specifies the Ecto repo used to insert and retrieve jobs
primary-options
Primary Options
These options determine what the system does at a high level, i.e. which queues to run:
:engine— facilitates inserting, fetching, and otherwise managing jobs.There are three built-in engines:
Oban.Engines.Basicfor Postgres databases,Oban.Engines.Litefor SQLite3 databases, andOban.Engines.Inlinefor simplified testing (only available for:inlinetesting mode).When the
Oban.Engines.Liteengine is used the:notifierand:peerare automatically set toPGandisolatedmode, respectively.Additional engines, such as Oban Pro's
SmartEnginewith advanced functionality for Postgres, are also available as an add-on.Defaults to the
Basicengine for Postgres.:log— eitherfalseto disable logging or a standard log level (:error,:warn,:info,:debug). This determines whether queries are logged or not; overriding the repo's configured log level. Defaults tofalse, where no queries are logged.:name— used for supervisor registration, it must be unique across an entire VM instance. Defaults toObanwhen no name is provided.:node— used to identify the node that the supervision tree is running in. If no value is provided it will use thenodename in a distributed system, or thehostnamein an isolated node. See "Node Name" below.:notifier— used to relay messages between processes, nodes, and the Postgres database.There are two built-in notifiers:
Oban.Notifiers.Postgres, which uses Postgres PubSub; andOban.Notifiers.PG, which uses process groups with distributed erlang. Defaults to the Postgres notifier.:peer— used to specify which peer module to use for cluster leadership.There are two built-in peers:
Oban.Peers.Postgres, which uses table-based leadership through theoban_peerstable; andOban.Peers.Global, which uses global locks through distributed Erlang.Leadership can be disabled by setting
peer: false, but note that centralized plugins likeCronwon't run without leadership.Defaults to the
Postgrespeer.:plugins— a list or modules or module/option tuples that are started as children of an Oban supervisor. Any supervisable module is a valid plugin, i.e. aGenServeror anAgent.:prefix— the query prefix, or schema, to use for inserting and executing jobs. Anoban_jobstable must exist within the prefix. See the "Prefix Support" section in the module documentation for more details.:queues— a keyword list where the keys are queue names and the values are the concurrency setting or a keyword list of queue options. For example, setting queues to[default: 10, exports: 5]would start the queuesdefaultandexportswith a combined concurrency level of 15. The concurrency setting specifies how many jobs each queue will run concurrently.Queues accept additional override options to customize their behavior, e.g. by setting
pausedordispatch_cooldownfor a specific queue.:testing— a mode that controls how an instance is configured for testing. When set to:inlineor:manualqueues, peers, and plugins are automatically disabled. Defaults to:disabled, no test mode.
twiddly-options
Twiddly Options
Additional options used to tune system behaviour. These are primarily useful for testing or troubleshooting and don't usually need modification.
:dispatch_cooldown— the minimum number of milliseconds a producer will wait before fetching and running more jobs. A slight cooldown period prevents a producer from flooding with messages and thrashing the database. The cooldown period directly impacts a producer's throughput: jobs per second for a single queue is calculated by(1000 / cooldown) * limit. For example, with a5mscooldown and a queue limit of25a single queue can run 5,000 jobs/sec.The default is
5msand the minimum is1ms, which is likely faster than the database can return new jobs to run.:shutdown_grace_period— the amount of time a queue will wait for executing jobs to complete before hard shutdown, specified in milliseconds. The default is15_000, or 15 seconds.:stage_interval— the number of milliseconds between making scheduled jobs available and notifying relevant queues that jobs are available. This is directly tied to the resolution ofscheduledorretryablejobs and how frequently the database is checked for jobs to run. To minimize database load, only5_000jobs are staged at each interval.Only the leader node stages jobs and notifies queues when the
:notifier'spubsub notifications are functional. If pubusb messages can't get through then staging switches to a less efficient "local" mode in which all nodes poll for jobs to run.Setting the interval to
:infinitydisables staging entirely. The default is1_000ms.
example
Example
Start a stand-alone Oban instance:
{:ok, pid} = Oban.start_link(repo: MyApp.Repo, queues: [default: 10])To start an Oban instance within an application's supervision tree:
def start(_type, _args) do
children = [MyApp.Repo, {Oban, repo: MyApp.Repo, queues: [default: 10]}]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
endStart multiple, named Oban supervisors within a supervision tree:
children = [
MyApp.Repo,
{Oban, name: Oban.A, repo: MyApp.Repo, queues: [default: 10]},
{Oban, name: Oban.B, repo: MyApp.Repo, queues: [special: 10]},
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)Start a local Oban instance for SQLite:
{:ok, pid} = Oban.start_link(engine: Oban.Engines.Lite, repo: MyApp.Repo)
node-name
Node Name
When the node value hasn't been configured it is generated based on the environment:
- In a distributed system the node name is used
- In a Heroku environment the system environment's
DYNOvalue is used - Otherwise, the system hostname is used
Start a new supervised queue.
By default this starts a new supervised queue across all nodes running Oban on the same database
and prefix. You can pass the option local_only: true if you prefer to start the queue only on
the local node.
options
Options
:queue- a string or atom specifying the queue to start, required:local_only- whether the queue will be started only on the local node, default:false:limit- set the concurrency limit, required:paused— set whether the queue starts in the "paused" state, optional
In addition, all engine-specific queue options are passed along after validation.
example
Example
Start the :priority queue with a concurrency limit of 10 across the connected nodes.
Oban.start_queue(queue: :priority, limit: 10)
:okStart the :media queue with a concurrency limit of 5 only on the local node.
Oban.start_queue(queue: :media, limit: 5, local_only: true)
:okStart the :media queue on a particular node.
Oban.start_queue(queue: :media, limit: 5, node: "worker.1")
:okStart the :media queue in a paused state.
Oban.start_queue(queue: :media, limit: 5, paused: true)
:ok
@spec stop_queue(name(), opts :: [queue_option()]) :: :ok
Shutdown a queue's supervision tree and stop running jobs for that queue.
By default this action will occur across all the running nodes. Still, if you prefer to stop the
queue's supervision tree and stop running jobs for that queue only on the local node, you can
pass the option: local_only: true
The shutdown process pauses the queue first and allows current jobs to exit gracefully, provided they finish within the shutdown limit.
Note: by default, Oban does not verify that the given queue exists unless :local_only
is set to true as even if the queue does not exist locally, it might be running on
another node.
options
Options
:queue- a string or atom specifying the queue to stop, required:local_only- whether the queue will be stopped only on the local node, default:false:node- restrict stopping to a particular node
example
Example
Stop a running queue on all nodes:
Oban.stop_queue(queue: :default)
:okStop the queue only on the local node:
Oban.stop_queue(queue: :media, local_only: true)
:okStop the queue only on a particular node:
Oban.stop_queue(queue: :media, node: "worker.1")
:ok
Returns the pid of the root Oban process for the given name.
example
Example
Find the default instance:
Oban.whereis(Oban)Find a dynamically named instance:
Oban.whereis({:oban, 1})