Batch Worker
🌟 This worker is available through Oban.Pro
A Batch
worker links the execution of many jobs as a group and runs optional
callbacks after jobs are processed. This allows your application to coordinate
the execution of tens, hundreds or thousands of jobs in parallel. It is built as
an ergonomic abstraction over the top of the standard Oban.Worker
.
Using and Configuring
While Batch
workers are built within your application, they all rely on the
BatchManager
plugin from Oban.Pro
. The BatchManager
is responsible for
tracking the execution of jobs within a batch and reliably enqueuing callback
jobs.
Start by adding BatchManager
to your list of Oban plugins in config.exs
:
config :my_app, Oban,
plugins: [Oban.Pro.Plugins.BatchManager]
...
With the BatchManager
plugin set to run we can create our batch workers. Let's
define a worker that delivers daily emails in a large batch:
defmodule MyApp.EmailBatch do
use Oban.Pro.Workers.Batch, queue: :mailers
@impl true
def process(%Job{args: %{"email" => email}}) do
MyApp.Mailer.daily_update(email)
end
end
Note that we define a process/1
callback instead of perform/1
because
perform/1
is used behind the scenes to coordinate regular execution and
callbacks within the same worker. The process/1
function receives an
Oban.Job
struct, just like perform/1
would and it should return the same
accepted values, i.e. :ok
, {:ok, value}
, {:error, error}
, etc.
The process/1
function above only looks for an "email"
key in the job args,
but a "batch_id"
is also available. We'll modify the function to extract the
batch_id
as well:
def process(%Job{args: %{"batch_id" => batch_id, "email" => email}}) do
{:ok, reply} = MyApp.Mailer.daily_update(email)
track_delivery(batch_id, reply)
:ok
end
Now the hypothetical track_delivery/2
function will store the delivery details
for retrieval later, possibly by one of our handler
callbacks.
Inserting Batches
The preferred way to create batches is to call new_batch/1,2
with a list of job
args and a list of options:
[%{email: "foo@example.com"}, %{email: "bar@example.com"}]
|> MyApp.EmailBatch.new_batch()
|> Oban.insert_all()
# Generate the batch with options
[%{email: "foo@example.com"}, %{email: "bar@example.com"}]
|> MyApp.EmailBatch.new_batch(schedule_in: 60, priority: 1, max_attempts: 3)
|> Oban.insert_all()
The new_batch/1,2
function generates a list of job changesets and
automatically injects a unique batch_id
into each job's args. A Batch
worker
is a regular Oban.Worker
under the hood, which means you can use new/2
to
insert jobs as well, provided you use a deterministic batch
id.
Handler Callbacks
After jobs in the batch are processed the BatchManager
may insert callback
jobs for the same worker. There are three batch handler callbacks that your
worker may define:
handle_attempted
— called after all jobs in the batch were attempted at least once, regardless of whether they succeeded or not.handle_completed
— called after all jobs in the batch have acompleted
state. This handler may never be called if one or more jobs keep failing or any are discarded.handle_discarded
— called after any jobs in the batch have adiscarded
state.
Each handler callback receives a single Oban.Job
struct with the batch_id
as
an argument and should return :ok
. The callbacks are executed as separate
isolated jobs, so they may be retried or discarded like any other job.
Here we'll implement each of the optional handler callbacks and have them print
out the batch status along with the batch_id
:
defmodule MyApp.BatchWorker do
use Oban.Pro.Workers.Batch
@impl true
def handle_attempted(%Job{args: %{"batch_id" => batch_id}}) do
IO.puts("Attempted #{batch_id}")
end
@impl true
def handle_completed(%Job{args: %{"batch_id" => batch_id}}) do
IO.puts("Completed #{batch_id}")
end
@impl true
def handle_discarded(%Job{args: %{"batch_id" => batch_id}}) do
IO.puts("Discarded #{batch_id}")
end
...
end
The callbacks receive the same batch_id
as the regular job invocations, which
enables patterns such as map/reduce.
Generating Batch IDs
By default a batch_id
is generated as a version 4 random UUID. This is
more than sufficient to ensure that batches are unique between workers and nodes
for any period of time. However, if you require more control you can override
batch_id
generation at the worker level or pass a value directly to
the new_batch/2
function.
To override the batch_id
for a particular worker you override the gen_id
callback:
defmodule MyApp.BatchWorker do
use Oban.Pro.Workers.Batch
# Generate a 24 character long random string instead
@impl true
def gen_id do
24
|> :crypto.strong_rand_bytes()
|> Base.encode64()
end
...
end
The gen_id/0
callback is suited for random/non-deterministic id generation. If
you'd prefer to use a deterministic id instead you can pass the batch_id
in as
an option to new_batch/2
:
MyApp.BatchWorker.new_batch(list_of_args, batch_id: "custom-batch-id")
Using this technique you can verify the batch_id
in tests or append to the
batch manually after it was originally created. For example, you can add to a
batch that is scheduled for the future:
batch_id = "daily-batch-#{Date.utc_today()}"
midnight =
Date.utc_today()
|> NaiveDateTime.new(~T[11:59:59])
|> elem(1)
|> DateTime.from_naive!("Etc/UTC")
# Create the initial batch
initial_args
|> MyApp.BatchWorker.new_batch(batch_id: batch_id, schedule_at: midnight)
|> Oban.insert_all()
# Add items to the batch later in the day
%{batch_id: batch_id, other_arg: "other"}
|> MyApp.BatchWorker.new(schedule_at: midnight)
|> Oban.insert()
When jobs in the batch execute later that day at midnight they'll all be tracked together.
Implementation Notes
Callback jobs are only enqueued if your worker defines the corresponding callback, e.g. a worker that only defines
handle_attempted/1
will only have a callback for that event.Callback jobs are unique, with an infinite period.
The
BatchManager
uses debouncing to minimize queries and reduce overall load on your database.