Workflow Worker

🌟 This worker is available through Oban.Pro

Workflow workers compose together with arbitrary dependencies between jobs, allowing sequential, fan-out and fan-in execution workflows. Workflows are fault tolerant, can be homogeneous or heterogeneous, and scale horizontally across all available nodes. Workflows are unique to Obanβ€”only possible due to historic job retention.

Using and Configuring

Though you build Workflow workers within your application, they rely on Oban Pro's WorkflowManager plugin for coordination.

Start by adding WorkflowManager to your Oban plugins in config.exs:

config :my_app, Oban, plugins: [Oban.Pro.Plugins.WorkflowManager]

The WorkflowManager doesn't require any configuration. Once added, you can start creating workflows. Each worker within a workflow must use Oban.Pro.Workers.Workflow rather than Oban.Worker, otherwise jobs will execute without any dependency management.

Our example workflow will coordinate archiving an account. To properly archive the account we must perform the following workers:

  1. FinalReceipt β€” generate a final receipt and email it to the account owner
  2. EmailSubscriber β€” email each subscriber on the account
  3. BackupPost β€” up all important posts on the account to external storage
  4. DeleteAccount β€” delete the account from the database

We need step 1 to run before step 2 or 3, but both steps 2 and 3 can run in parallel. After all of the jobs in steps 2 and 3 finish we can finally delete the account from the database. For demonstration purposes we'll define one of the workers with a stubbed-in process/1 function:

defmodule MyApp.FinalReceipt do
  use Oban.Pro.Workers.Workflow, queue: :default

  @impl true
  def process(%Job{} = _job) do
    # Generate the final receipt.

    :ok
  end
end

Note that we define a process/1 callback instead of perform/1 because the perform/1 function coordinates the workflow. The process/1 function receives an Oban.Job struct, just like perform/1 and it should return similar values, i.e. :ok, {:ok, value}.

Assuming that you've defined the other workers, we can construct a workflow to enforce sequential execution:

alias MyApp.{BackupPost, DeleteAccount, EmailSubscriber, FinalReceipt}

def archive_account(acc_id) do
  FinalReceipt.new_workflow()
  |> FinalReceipt.add(:receipt, FinalReceipt.new(%{id: acc_id}))
  |> FinalReceipt.add(:email, EmailSubscriber.new(%{id: acc_id}), deps: [:receipt])
  |> FinalReceipt.add(:backup, BackupPost.new(%{id: acc_id}), deps: [:backup])
  |> FinalReceipt.add(:delete, DeleteAccount.new(%{id: acc_id}), deps: [:backup, :receipt])
  |> Oban.insert_all()
end

Here we use new_workflow/0,1 to initialize a new workflow (later on we will look at how to customize workflow behaviour with various options). Jobs are then added to the workflow with add/3,4, where the first argument is an existing workflow, the second argument is unique name, the third is a job changeset, and the fourth an optional list of dependencies. Finally, the workflow passes directly to Oban.insert_all/1,2, which handles inserting each of the jobs into the database atomically.

All workers that use Workflow have new_workflow and add functions defined, so it doesn't matter which module you use to initialize or to add new jobs.

Dependency resolution guarantees that jobs execute in the order receipt -> email -> backup -> delete, even if one of the jobs fails and needs to retry. Any job that defines a dependency will wait for each upstream dependency to complete before it starts.

Recall how our original specification stated that there could be multiple email and backup jobs? The workflow we've built only handles one email or backup job at a time.

Let's modify the workflow to fan-out to multiple email and backup jobs and then fan-in to the final delete job:

def archive_account_workflow(acc_id) do
  subscribers = subscribers_for_account(acc_id)
  documents = documents_for_account(acc_id)

  email_deps = Enum.map(subscribers, &"email_#{&1.id}")
  backup_deps = Enum.map(documents, &"backup_#{&1.id}")
  delete_deps = email_deps ++ backup_deps

  FinalReceipt.new_workflow()
  |> FinalReceipt.add(:receipt, FinalReceipt.new(%{id: acc_id}))
  |> add_email_jobs(subscribers)
  |> add_backup_jobs(documents)
  |> DeleteAccount.add(:delete, DeleteAccount.new(%{id: acc_id}), deps: delete_deps)
end

defp add_email_jobs(workflow, subscribers) do
  Enum.reduce(subscribers, workflow, fn %{id: id, email: email}, acc ->
    EmailSubscriber.add("email_#{id}", EmailSubscriber.new(%{email: email}), deps: [:receipt])
  end)
end

defp add_backup_jobs(workflow, subscribers) do
  Enum.reduce(subscribers, workflow, fn %{id: id}, acc ->
    BackupPost.add("backup_#{id}", BackupPost.new(%{id: id}), deps: [:receipt])
  end)
end

Now the workflow will run all of the email and backup jobs we need, before deleting the account. To confirm the flow without inserting and executing jobs we can visualize it.

Visualizing Workflows

Workflows are a type of Directed Acyclic Graph, also known as a DAG. That means we can describe a workflow as a graph of jobs and dependencies, where execution flows between jobs. By converting the workflow into DOT notation, a standard graph description language, we can render visualizations!

Dot generation relies on libgraph, which is an optional dependency. You'll need to specify it as a dependency before generating dot output:

def deps do
  [{:libgraph, "~> 0.7"}]
end

Once you've installed libgraph, we can use to_dot/1 to convert a workflow. As with new_workflow and add, all workflow workers define a to_dot/1 function that takes a workflow and returns a dot formatted string. For example, calling to_dot/1 with the account archiving workflow from above:

FinalReceipt.to_dot(archive_account_workflow(123))

Generates the following dot output, where each vertex is a combination of the job's name in the workflow and its worker module:

strict digraph {
    "delete (MyApp.DeleteAccount)"
    "backup_1 (MyApp.BackupPost)"
    "backup_2 (MyApp.BackupPost)"
    "backup_3 (MyApp.BackupPost)"
    "receipt (MyApp.FinalReceipt)"
    "email_1 (MyApp.EmailSubscriber)"
    "email_2 (MyApp.EmailSubscriber)"
    "backup_1 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
    "backup_2 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
    "backup_3 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "backup_1 (MyApp.BackupPost)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "backup_2 (MyApp.BackupPost)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "backup_3 (MyApp.BackupPost)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "email_1 (MyApp.EmailSubscriber)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "email_2 (MyApp.EmailSubscriber)" [weight=1]
    "email_1 (MyApp.EmailSubscriber)" -> "delete (MyApp.DeleteAccount)" [weight=1]
    "email_2 (MyApp.EmailSubscriber)" -> "delete (MyApp.DeleteAccount)" [weight=1]
}

Now we can take that dot output and render it using a tool like graphviz. The following example function accepts a workflow and renders it out as an SVG:

defmodule WorkflowRenderer do
  alias Oban.Pro.Workers.Workflow

  def render(workflow) do
    dot_path = "workflow.dot"
    svg_path = "workflow.svg"

    File.write!(dot_path, Workflow.to_dot(workflow))

    System.cmd("dot", ["-T", "svg", "-o", svg_path, dot_path])
  end
end

With graphviz installed, that will generate a SVG of the workflow:

<g class="graph"><path fill="#fff" stroke="transparent" d="M0 188V0h1381.58v188H0z"/><g class="node" transform="translate(4 184)"><ellipse fill="none" stroke="#000" cx="663.44" cy="-18" rx="122.68" ry="18"/><text text-anchor="middle" x="663.44" y="-14.3" font-family="Times,serif" font-size="14">delete (MyApp.DeleteAccount)</text></g><g class="node" transform="translate(4 184)"><ellipse fill="none" stroke="#000" cx="125.44" cy="-90" rx="125.38" ry="18"/><text text-anchor="middle" x="125.44" y="-86.3" font-family="Times,serif" font-size="14">backup_1 (MyApp.BackupPost)</text></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M214.88-77.36c96.49 12.55 249.69 32.48 349.69 45.5"/><path stroke="#000" d="M565.21-35.31l9.47 4.76-10.37 2.18.9-6.94z"/></g><g class="node" transform="translate(4 184)"><ellipse fill="none" stroke="#000" cx="394.44" cy="-90" rx="125.38" ry="18"/><text text-anchor="middle" x="394.44" y="-86.3" font-family="Times,serif" font-size="14">backup_2 (MyApp.BackupPost)</text></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M452.15-73.98C494.34-63 551.7-48.08 596.01-36.55"/><path stroke="#000" d="M597.03-39.9l8.8 5.91-10.56.87 1.76-6.78z"/></g><g class="node" transform="translate(4 184)"><ellipse fill="none" stroke="#000" cx="663.44" cy="-90" rx="125.38" ry="18"/><text text-anchor="middle" x="663.44" y="-86.3" font-family="Times,serif" font-size="14">backup_3 (MyApp.BackupPost)</text></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M663.44-71.7v25.59"/><path stroke="#000" d="M666.94-46.1l-3.5 10-3.5-10h7z"/></g><g class="node" transform="translate(4 184)"><ellipse fill="none" stroke="#000" cx="663.44" cy="-162" rx="118.08" ry="18"/><text text-anchor="middle" x="663.44" y="-158.3" font-family="Times,serif" font-size="14">receipt (MyApp.FinalReceipt)</text></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M576.7-149.71c-95.97 12.48-250.38 32.57-351.35 45.71"/><path stroke="#000" d="M225.51-100.49l-10.36-2.18 9.46-4.76.9 6.94z"/></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M606.39-146.15c-42.15 10.96-99.7 25.94-144.2 37.52"/><path stroke="#000" d="M462.89-105.2l-10.56-.87 8.8-5.9 1.76 6.77z"/></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M663.44-143.7v25.59"/><path stroke="#000" d="M666.94-118.1l-3.5 10-3.5-10h7z"/></g><g class="node" transform="translate(4 184)"><ellipse fill="none" stroke="#000" cx="944.44" cy="-90" rx="137.28" ry="18"/><text text-anchor="middle" x="944.44" y="-86.3" font-family="Times,serif" font-size="14">email_1 (MyApp.EmailSubscriber)</text></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M722.35-146.33c44.17 11.01 104.81 26.11 151.56 37.76"/><path stroke="#000" d="M874.81-111.95l8.86 5.81-10.55.98 1.69-6.79z"/></g><g class="node" transform="translate(4 184)"><ellipse fill="none" stroke="#000" cx="1236.44" cy="-90" rx="137.28" ry="18"/><text text-anchor="middle" x="1236.44" y="-86.3" font-family="Times,serif" font-size="14">email_2 (MyApp.EmailSubscriber)</text></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M752.66-150.1c101.88 12.45 268.46 32.8 377.11 46.07"/><path stroke="#000" d="M1130.27-107.5l9.5 4.69-10.35 2.26.85-6.95z"/></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M883.47-73.81c-44.37 11.05-104.5 26.03-150.69 37.54"/><path stroke="#000" d="M733.4-32.82l-10.55-.98 8.85-5.81 1.7 6.79z"/></g><g class="edge" transform="translate(4 184)"><path fill="none" stroke="#000" d="M1139.89-77.2L764.77-31.38"/><path stroke="#000" d="M765.05-27.89l-10.35-2.26 9.5-4.69.85 6.95z"/></g></g>

Looking at the visualized graph we can clearly see how the workflow starts with a single render job, fans-out to multiple email and backup jobs, and finally fans-in to the delete jobβ€”exactly as we planned!

Workflow Options

Workflows use conservative defaults for safe, and relatively quick, dependency resolution. You can customize the safety checks, waiting times and resolution intensity by providing a few top-level options:

  • ignore_cancelled β€” regard cancelled dependencies as completed rather than halting the workflow. Defaults to false.

  • ignore_discarded β€” regard discarded dependencies as completed rather than halting the workflow. Defaults to false.

  • waiting_delay β€” the number of milliseconds to wait between dependency checks. This value, combined with the waiting_limit, dictates how long a job will await upstream dependencies. Defaults to 1000ms.

  • waiting_limit β€” the number of times to retry dependency checks. This value, combined with waiting_delay, dictates how long a job will await upstream dependencies. Defaults to 10.

  • waiting_snooze β€” the number of seconds a job will snooze after awaiting executing upstream dependencies. If upstream dependencies are scheduled or retryable then the job snoozes until the latest dependency's scheduled_at timestamp.

The following example creates a workflow with all of the available options:

workflow = MyWorkflow.new(
  ignore_cancelled: true,
  ignore_discarded: true,
  waiting_delay: :timer.seconds(5),
  waiting_limit: 5,
  waiting_snooze: 10
)

Internally, the meta field stores options for each job. That makes it possible to set or override workflow options per-job. For example, configure a single job to ignore cancelled dependencies and another to snooze longer:

MyWorkflow.new()
|> MyWorkflow.add(:a, MyWorkflow.new(%{}))
|> MyWorkflow.add(:b, MyWorkflow.new(%{}, meta: %{ignore_cancelled: true}), deps: [:a])
|> MyWorkflow.add(:c, MyWorkflow.new(%{}, meta: %{waiting_snooze: 30}), deps: [:b])

Generating Workflow IDs

By default workflow_id is a version 4 random UUID. This is more than sufficient to ensure that workflows are unique for any period of time. However, if you require more control you can override workflow_id generation at the worker level, or pass a value directly to the new_workflow/2 function.

To override the workflow_id for a particular workflow you override the gen_id callback:

defmodule MyApp.Workflow do
  use Oban.Pro.Workers.Workflow

  # Generate a 24 character long random string instead
  @impl true
  def gen_id do
    24
    |> :crypto.strong_rand_bytes()
    |> Base.encode64()
  end

  ...
end

The gen_id/0 callback works for random/non-deterministic id generation. If you'd prefer to use a deterministic id instead you can pass the workflow_id in as an option to new_workflow/2:

MyApp.Workflow.new_workflow(workflow_id: "custom-id")

Using this technique you can verify the workflow_id in tests or append to the workflow manually after it was originally created.

Implementation Notes

  • Jobs aren't executed until all upstream dependencies have completed. This includes waiting on retries, scheduled jobs or snoozing.

  • Dependency resolution relies on jobs lingering in the database after execution. If your system prunes job dependencies then the workflow may never complete.