View Source Pro Worker
🌟 This worker is available through Oban.Pro
The Oban.Pro.Worker
is a replacement for Oban.Worker
with expanded
capabilities such as encryption, enforced structure, and output
recording. In addition, because Batch
, Chunk
, and Workflow
workers
are based on the Pro worker, you can use all of the advanced options there as
well.
Usage
Using Oban.Pro.Worker
is identical to using Oban.Worker
, with a few
additional options. All of the basic options such as queue
, priority
, and
unique
are still available along with more advanced options.
To create a basic Pro worker point use
at Oban.Pro.Worker
and define a
process/1
callback:
def MyApp.Worker do
use Oban.Pro.Worker
@impl true
def process(%Job{} = job) do
# Do stuff with the job
end
end
If you have existing workers that you'd like to convert you only need to change
the use
definition and replace perform/1
with process/1
.
Without any of the advanced Pro features there isn't any difference between the basic and pro workers—so let's take a look!
Typespecs
📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding options.
Structured Jobs
@type structured :: [keys: [atom()], required: [atom()]]
Structured workers help you catch typos within your jobs by validating keys on insert and enforcing keys during execution. They also automatically generate a struct for compile-time checks and friendly dot access.
Defining a Worker
On a structured worker the keys
and required
options determine which keys
are allowed at all, and which are required. A notable benefit is that the args
passed to process/1
are converted into a struct:
defmodule MyApp.StructuredWorker do
use Oban.Pro.Worker, structured: [keys: [:a, :b, :c], required: [:a, :c]]
def process(%Job{args: %__MODULE__{a: a, c: c} = args}) do
# Use the matched keys or access them on args
end
end
The keys
option is mandatory, but required
is optional. If you provide a
list of required
keys they must be a subset of the full keys
list.
Recorded Jobs
@type recorded :: true | [to: atom(), limit: pos_integer()]
Sometimes the output of a job is just as important as any side effects. When
that's the case, you can use the recorded
option to stash a job's output back
into the job itself. Results are compressed and safely encoded for retrieval
later, either manually, in a batch callback, or a in downstream workflow job.
Defining a Worker
defmodule MyApp.RecordedWorker do
use Oban.Pro.Worker, recorded: true
@impl true
def process(%Job{args: args}) do
# Do your typical work here.
end
end
If your process function returns an {:ok, value}
tuple, it is recorded. Any
other value, i.e. an plain :ok
, error, or snooze, is ignored.
The example above uses recorded: true
to opt into recording with the defaults.
That means an output limit
of 32kb after compression and encoding—anything
larger than the configured limit will return an error tuple. If you expect
larger results (and you want them stored in the database) you can override the
limit. For example, to set the limit to 64kb instead:
use Oban.Pro.Worker, recorded: [limit: 64_000]
Retrieving Results
@spec fetch_recorded(job :: Job.t()) :: {:ok, term()} | {:error, :missing}
Any worker that uses the recorded
option has a fetch_recorded/1
function
injected automatically. That's the ticket to extracting recorded results. If a
job has ran and recorded a value, it will return an {:ok, result}
tuple:
job = MyApp.Repo.get(Oban.Job, job_id)
case MyApp.RecordedWorker.fetch_recorded(job) do
{:ok, result} ->
# Use the result
{:error, :missing} ->
# Nothing recorded yet
end
Encrypted Jobs
@type encrypted :: [key: mfa()]
Some applications have strong regulations around the storage of personal
information. For example, medical records, financial details, social security
numbers, or other data that should never leak. The encrypted
option lets you
store all job data at rest with encryption so sensitive data can't be seen.
Defining a Worker
Encryption is handled transparently as jobs are inserted and executed. All you need to do is flag the worker as encrypted and configure it to fetch a secret key:
defmodule MyApp.SensitiveWorker do
use Oban.Pro.Worker, encrypted: [key: {module, fun, args}]
@impl true
def process(%Job{args: args}) do
# Args are decrypted, use them as you normally would
end
end
Now job args are encrypted before insertion into the database and decrypted when the job runs.
Generating Keys
Encryption requires a 32 byte, Base 64 encoded key. You can generate one with
the :crypto
and Base
modules:
key = 32 |> :crypto.strong_rand_bytes() |> Base.encode64()
The result will look something like this "w7xGJClzEh1pbWuq6zsZfKfwdINu2VIkgCe3IO0hpsA="
.
While it's possible to use the generated key in your worker directly, that defeats the purpose of encrypting sensitive data because anybody with access to the codebase can read the encryption key. That's why it is highly recommended that you use an MFA to retrieve the key dynamically at runtime. For example, here's how you could pull the key from the Application environment:
use Oban.Pro.Worker, encrypted: [key: {Application, :fetch_key!, [:enc_key]}]
Implementation Details
Erlang's
crypto
module is used with theaes_256_ctr
cipher for encryption.Encoding and decoding stacktraces are pruned to prevent leaking the private key or initialization vector.
Only
args
are encrypted,meta
is kept as plaintext. You can use that to your advantage for uniqueness, but be careful not to put anything sensitive inmeta
.Error messages and stacktraces aren't encrypted and are stored as plaintext. Be careful not to expose sensitive data when raising errors.
Args are encrypted at rest as well as in Oban Web. You won't be able to view or search encrypted args in the Web dashboard.
Uniqueness works for encrypted jobs, but not for arguments because the same args are encrypted differently every time. Favor
meta
overargs
to enforce uniqueness for encrypted jobs.