View Source Pro Worker

🌟 This worker is available through Oban.Pro

The Oban.Pro.Worker is a replacement for Oban.Worker with expanded capabilities such as encryption, enforced structure, and output recording. In addition, because Batch, Chunk, and Workflow workers are based on the Pro worker, you can use all of the advanced options there as well.

Usage

Using Oban.Pro.Worker is identical to using Oban.Worker, with a few additional options. All of the basic options such as queue, priority, and unique are still available along with more advanced options.

To create a basic Pro worker point use at Oban.Pro.Worker and define a process/1 callback:

def MyApp.Worker do
  use Oban.Pro.Worker

  @impl true
  def process(%Job{} = job) do
    # Do stuff with the job
  end
end

If you have existing workers that you'd like to convert you only need to change the use definition and replace perform/1 with process/1.

Without any of the advanced Pro features there isn't any difference between the basic and pro workers—so let's take a look!

Typespecs

📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding options.

Structured Jobs

@type structured :: [keys: [atom()], required: [atom()]]

Structured workers help you catch typos within your jobs by validating keys on insert and enforcing keys during execution. They also automatically generate a struct for compile-time checks and friendly dot access.

Defining a Worker

On a structured worker the keys and required options determine which keys are allowed at all, and which are required. A notable benefit is that the args passed to process/1 are converted into a struct:

defmodule MyApp.StructuredWorker do
  use Oban.Pro.Worker, structured: [keys: [:a, :b, :c], required: [:a, :c]]

  def process(%Job{args: %__MODULE__{a: a, c: c} = args}) do
    # Use the matched keys or access them on args
  end
end

The keys option is mandatory, but required is optional. If you provide a list of required keys they must be a subset of the full keys list.

Recorded Jobs

@type recorded :: true | [to: atom(), limit: pos_integer()]

Sometimes the output of a job is just as important as any side effects. When that's the case, you can use the recorded option to stash a job's output back into the job itself. Results are compressed and safely encoded for retrieval later, either manually, in a batch callback, or a in downstream workflow job.

Defining a Worker

defmodule MyApp.RecordedWorker do
  use Oban.Pro.Worker, recorded: true

  @impl true
  def process(%Job{args: args}) do
    # Do your typical work here.
  end
end

If your process function returns an {:ok, value} tuple, it is recorded. Any other value, i.e. an plain :ok, error, or snooze, is ignored.

The example above uses recorded: true to opt into recording with the defaults. That means an output limit of 32kb after compression and encoding—anything larger than the configured limit will return an error tuple. If you expect larger results (and you want them stored in the database) you can override the limit. For example, to set the limit to 64kb instead:

use Oban.Pro.Worker, recorded: [limit: 64_000]

Retrieving Results

@spec fetch_recorded(job :: Job.t()) :: {:ok, term()} | {:error, :missing}

Any worker that uses the recorded option has a fetch_recorded/1 function injected automatically. That's the ticket to extracting recorded results. If a job has ran and recorded a value, it will return an {:ok, result} tuple:

job = MyApp.Repo.get(Oban.Job, job_id)

case MyApp.RecordedWorker.fetch_recorded(job) do
  {:ok, result} ->
    # Use the result

  {:error, :missing} ->
    # Nothing recorded yet
end

Encrypted Jobs

@type encrypted :: [key: mfa()]

Some applications have strong regulations around the storage of personal information. For example, medical records, financial details, social security numbers, or other data that should never leak. The encrypted option lets you store all job data at rest with encryption so sensitive data can't be seen.

Defining a Worker

Encryption is handled transparently as jobs are inserted and executed. All you need to do is flag the worker as encrypted and configure it to fetch a secret key:

defmodule MyApp.SensitiveWorker do
  use Oban.Pro.Worker, encrypted: [key: {module, fun, args}]

  @impl true
  def process(%Job{args: args}) do
    # Args are decrypted, use them as you normally would
  end
end

Now job args are encrypted before insertion into the database and decrypted when the job runs.

Generating Keys

Encryption requires a 32 byte, Base 64 encoded key. You can generate one with the :crypto and Base modules:

key = 32 |> :crypto.strong_rand_bytes() |> Base.encode64()

The result will look something like this "w7xGJClzEh1pbWuq6zsZfKfwdINu2VIkgCe3IO0hpsA=".

While it's possible to use the generated key in your worker directly, that defeats the purpose of encrypting sensitive data because anybody with access to the codebase can read the encryption key. That's why it is highly recommended that you use an MFA to retrieve the key dynamically at runtime. For example, here's how you could pull the key from the Application environment:

use Oban.Pro.Worker, encrypted: [key: {Application, :fetch_key!, [:enc_key]}]

Implementation Details

  • Erlang's crypto module is used with the aes_256_ctr cipher for encryption.

  • Encoding and decoding stacktraces are pruned to prevent leaking the private key or initialization vector.

  • Only args are encrypted, meta is kept as plaintext. You can use that to your advantage for uniqueness, but be careful not to put anything sensitive in meta.

  • Error messages and stacktraces aren't encrypted and are stored as plaintext. Be careful not to expose sensitive data when raising errors.

  • Args are encrypted at rest as well as in Oban Web. You won't be able to view or search encrypted args in the Web dashboard.

  • Uniqueness works for encrypted jobs, but not for arguments because the same args are encrypted differently every time. Favor meta over args to enforce uniqueness for encrypted jobs.