View Source Chunk Worker
🌟 This worker is available through Oban.Pro
A Chunk
worker processes jobs "broadway style", in groups based on size or a
timeout, but with the robust error handling semantics of Oban. Chunking operates
at the worker level, allowing many chunks to run in parallel within the same
queue.
Usage
To declare a Chunk
worker you use Oban.Pro.Workers.Chunk
rather than the
basic Oban.Worker
. Let's define a worker that sends SMS messages in chunks,
rather than individually:
defmodule MyApp.ChunkWorker do
use Oban.Pro.Workers.Chunk, queue: :messages, size: 100, timeout: 1000
@impl true
def process([_ | _] = jobs) do
jobs
|> Enum.map(& &1.args)
|> MyApp.Messaging.send_batch()
:ok
end
end
Notice that we declared a size
and a timeout
along with a queue
for the
worker. If size
or timeout
are omitted they fall back to their defaults, 100
for size
and 1000ms for timeout
. To process larger batches less frequently,
we can increase both values:
use Oban.Pro.Workers.Chunk, size: 500, timeout: :timer.seconds(5)
Now chunks will run with up to 500 jobs or every 5 seconds, whichever comes first.
Like other Pro workers, we define a process/1
callback rather than
perform/1
. The Chunk
worker's process/1
is a little different, as it
receives a list of jobs rather than a single struct. Fittingly, the expected return
values are different as well.
Chunk Results and Error Handling
Chunk
worker's result type is tailored to handling multiple jobs at once. For
reference, here are the types and callback definition for process/1
:
@type jobs :: [Job.t()]
@type reason :: term()
@type result ::
:ok
| {:ok, term()}
| {:error, reason(), jobs()}
| {:discard, reason(), jobs()}
| [{:error, {reason(), jobs()}} | {:discard, {reason(), jobs()}}]
@callback process(jobs()) :: result()
The result types allow you to succeed an entire chunk or selectively fail parts of it. Here are each of the result options explained:
:ok
— The chunk succeeded and all jobs can be marked complete{:ok, result}
— Like:ok
, but with a result for testing{:error, reason, jobs}
— One or more jobs in the chunk failed and may be retried, any unlisted jobs are a success{:discard, reason, jobs}
— One or more jobs in the chunk should be discarded, any unlisted jobs are a success[{:error, {reason, jobs}} | {:discard, {reason, jobs}}]
— Retry some jobs and discard some other jobs, leaving any jobs not in either list a success
To illustrate using chunk results let's expand on the message processing example from earlier. We'll extend it to complete the whole batch when all messages are delivered or discard undeliverable messages:
def process([_ | _] = jobs) do
notifications =
jobs
|> Enum.map(& &1.args)
|> MyApp.Messaging.send_batch()
bad_token = fn %{response: response} -> response == :bad_token end
if Enum.any?(notifications, bad_token) do
discards =
notifications
|> Enum.zip(jobs)
|> Enum.filter(fn {notification, _job} -> bad_token.(notification) end)
|> Enum.map(&elem(&1, 1))
{:discard, :bad_token, discards}
else
{:ok, notifications}
end
end
In the event of an ephemeral crash, like a network issue, the entire batch may be retried if there are additional attempts.
Implementation Notes
Chunks are ran by a leader job. When the leader executes it determines whether a complete chunk is available or if enough time has elapsed to run anyhow. If neither case applies then the leader will delay until the timeout elapsed and execute with as many jobs as it can find.
Only the leader job may be cancelled as it is the only one tracked by a producer. Cancelling any other jobs in the chunk won't stop the chunk from running.
Jobs are processed as a chunk and are tracked individually. If a single job keeps erroring between batches it will transition to discarded by itself.