Batch Worker

🌟 This worker is available through Oban.Pro

A Batch worker links the execution of many jobs as a group and runs optional callbacks after jobs are processed. This allows your application to coordinate the execution of tens, hundreds or thousands of jobs in parallel. It is built as an ergonomic abstraction over the top of the standard Oban.Worker.

Using and Configuring

While Batch workers are built within your application, they all rely on the BatchManager plugin from Oban.Pro. The BatchManager is responsible for tracking the execution of jobs within a batch and reliably enqueuing callback jobs.

Start by adding BatchManager to your list of Oban plugins in config.exs:

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.BatchManager]
  ...

With the BatchManager plugin set to run we can create our batch workers. Let's define a worker that delivers daily emails in a large batch:

defmodule MyApp.EmailBatch do
  use Oban.Pro.Workers.Batch, queue: :mailers

  @impl true
  def process(%Job{args: %{"email" => email}}) do
    MyApp.Mailer.daily_update(email)
  end
end

Note that we define a process/1 callback instead of perform/1 because perform/1 is used behind the scenes to coordinate regular execution and callbacks within the same worker. The process/1 function receives an Oban.Job struct, just like perform/1 would and it should return the same accepted values, i.e. :ok, {:ok, value}, {:error, error}, etc.

The process/1 function above only looks for an "email" key in the job args, but a "batch_id" is also available. We'll modify the function to extract the batch_id as well:

def process(%Job{args: %{"batch_id" => batch_id, "email" => email}}) do
  {:ok, reply} = MyApp.Mailer.daily_update(email)

  track_delivery(batch_id, reply)

  :ok
end

Now the hypothetical track_delivery/2 function will store the delivery details for retrieval later, possibly by one of our handler callbacks.

Inserting Batches

The preferred way to create batches is to call new_batch/1,2 with a list of job args and a list of options:

[%{email: "foo@example.com"}, %{email: "bar@example.com"}]
|> MyApp.EmailBatch.new_batch()
|> Oban.insert_all()

# Generate the batch with options

[%{email: "foo@example.com"}, %{email: "bar@example.com"}]
|> MyApp.EmailBatch.new_batch(schedule_in: 60, priority: 1, max_attempts: 3)
|> Oban.insert_all()

The new_batch/1,2 function generates a list of job changesets and automatically injects a unique batch_id into each job's args. A Batch worker is a regular Oban.Worker under the hood, which means you can use new/2 to insert jobs as well, provided you use a deterministic batch id.

Handler Callbacks

After jobs in the batch are processed the BatchManager may insert callback jobs for the same worker. There are three batch handler callbacks that your worker may define:

  • handle_attempted — called after all jobs in the batch were attempted at least once, regardless of whether they succeeded or not.

  • handle_completed — called after all jobs in the batch have a completed state. This handler may never be called if one or more jobs keep failing or any are discarded.

  • handle_discarded — called after any jobs in the batch have a discarded state.

Each handler callback receives a single Oban.Job struct with the batch_id as an argument and should return :ok. The callbacks are executed as separate isolated jobs, so they may be retried or discarded like any other job.

Here we'll implement each of the optional handler callbacks and have them print out the batch status along with the batch_id:

defmodule MyApp.BatchWorker do
  use Oban.Pro.Workers.Batch

  @impl true
  def handle_attempted(%Job{args: %{"batch_id" => batch_id}}) do
    IO.puts("Attempted #{batch_id}")
  end

  @impl true
  def handle_completed(%Job{args: %{"batch_id" => batch_id}}) do
    IO.puts("Completed #{batch_id}")
  end

  @impl true
  def handle_discarded(%Job{args: %{"batch_id" => batch_id}}) do
    IO.puts("Discarded #{batch_id}")
  end

  ...
end

The callbacks receive the same batch_id as the regular job invocations, which enables patterns such as map/reduce.

Generating Batch IDs

By default a batch_id is generated as a version 4 random UUID. This is more than sufficient to ensure that batches are unique between workers and nodes for any period of time. However, if you require more control you can override batch_id generation at the worker level or pass a value directly to the new_batch/2 function.

To override the batch_id for a particular worker you override the gen_id callback:

defmodule MyApp.BatchWorker do
  use Oban.Pro.Workers.Batch

  # Generate a 24 character long random string instead
  @impl true
  def gen_id do
    24
    |> :crypto.strong_rand_bytes()
    |> Base.encode64()
  end

  ...
end

The gen_id/0 callback is suited for random/non-deterministic id generation. If you'd prefer to use a deterministic id instead you can pass the batch_id in as an option to new_batch/2:

MyApp.BatchWorker.new_batch(list_of_args, batch_id: "custom-batch-id")

Using this technique you can verify the batch_id in tests or append to the batch manually after it was originally created. For example, you can add to a batch that is scheduled for the future:

batch_id = "daily-batch-#{Date.utc_today()}"
midnight =
  Date.utc_today()
  |> NaiveDateTime.new(~T[11:59:59])
  |> elem(1)
  |> DateTime.from_naive!("Etc/UTC")

# Create the initial batch
initial_args
|> MyApp.BatchWorker.new_batch(batch_id: batch_id, schedule_at: midnight)
|> Oban.insert_all()

# Add items to the batch later in the day
%{batch_id: batch_id, other_arg: "other"}
|> MyApp.BatchWorker.new(schedule_at: midnight)
|> Oban.insert()

When jobs in the batch execute later that day at midnight they'll all be tracked together.

Implementation Notes

  • Callback jobs are only enqueued if your worker defines the corresponding callback, e.g. a worker that only defines handle_attempted/1 will only have a callback for that event.

  • Callback jobs are unique, with an infinite period.

  • The BatchManager uses debouncing to minimize queries and reduce overall load on your database.