View Source Chunk Worker

🌟 This worker is available through Oban.Pro

A Chunk worker processes jobs "broadway style", in groups based on size or a timeout, but with the robust error handling semantics of Oban. Chunking operates at the worker level, allowing many chunks to run in parallel within the same queue.

Usage

To declare a Chunk worker you use Oban.Pro.Workers.Chunk rather than the basic Oban.Worker. Let's define a worker that sends SMS messages in chunks, rather than individually:

defmodule MyApp.ChunkWorker do
  use Oban.Pro.Workers.Chunk, queue: :messages, size: 100, timeout: 1000

  @impl true
  def process([_ | _] = jobs) do
    jobs
    |> Enum.map(& &1.args)
    |> MyApp.Messaging.send_batch()

    :ok
  end
end

Notice that we declared a size and a timeout along with a queue for the worker. If size or timeout are omitted they fall back to their defaults, 100 for size and 1000ms for timeout. To process larger batches less frequently, we can increase both values:

use Oban.Pro.Workers.Chunk, size: 500, timeout: :timer.seconds(5)

Now chunks will run with up to 500 jobs or every 5 seconds, whichever comes first.

Like other Pro workers, we define a process/1 callback rather than perform/1. The Chunk worker's process/1 is a little different, as it receives a list of jobs rather than a single struct. Fittingly, the expected return values are different as well.

Chunk Results and Error Handling

Chunk worker's result type is tailored to handling multiple jobs at once. For reference, here are the types and callback definition for process/1:

@type jobs :: [Job.t()]
@type reason :: term()
@type result ::
        :ok
        | {:ok, term()}
        | {:error, reason(), jobs()}
        | {:discard, reason(), jobs()}
        | [{:error, {reason(), jobs()}} | {:discard, {reason(), jobs()}}]

@callback process(jobs()) :: result()

The result types allow you to succeed an entire chunk or selectively fail parts of it. Here are each of the result options explained:

  • :ok — The chunk succeeded and all jobs can be marked complete
  • {:ok, result} — Like :ok, but with a result for testing
  • {:error, reason, jobs} — One or more jobs in the chunk failed and may be retried, any unlisted jobs are a success
  • {:discard, reason, jobs} — One or more jobs in the chunk should be discarded, any unlisted jobs are a success
  • [{:error, {reason, jobs}} | {:discard, {reason, jobs}}] — Retry some jobs and discard some other jobs, leaving any jobs not in either list a success

To illustrate using chunk results let's expand on the message processing example from earlier. We'll extend it to complete the whole batch when all messages are delivered or discard undeliverable messages:

def process([_ | _] = jobs) do
  notifications =
    jobs
    |> Enum.map(& &1.args)
    |> MyApp.Messaging.send_batch()

  bad_token = fn %{response: response} -> response == :bad_token end

  if Enum.any?(notifications, bad_token) do
    discards =
      notifications
      |> Enum.zip(jobs)
      |> Enum.filter(fn {notification, _job} -> bad_token.(notification) end)
      |> Enum.map(&elem(&1, 1))

    {:discard, :bad_token, discards}
  else
    {:ok, notifications}
  end
end

In the event of an ephemeral crash, like a network issue, the entire batch may be retried if there are additional attempts.

Implementation Notes

  • Chunks are ran by a leader job. When the leader executes it determines whether a complete chunk is available or if enough time has elapsed to run anyhow. If neither case applies then the leader will delay until the timeout elapsed and execute with as many jobs as it can find.

  • Only the leader job may be cancelled as it is the only one tracked by a producer. Cancelling any other jobs in the chunk won't stop the chunk from running.

  • Jobs are processed as a chunk and are tracked individually. If a single job keeps erroring between batches it will transition to discarded by itself.