View Source Workflow Worker
๐ This worker is available through Oban.Pro
Workflow
workers compose together with arbitrary dependencies between jobs,
allowing sequential, fan-out and fan-in execution workflows. Workflows are fault
tolerant, can be homogeneous or heterogeneous, and scale horizontally across all
available nodes. Workflows are unique to Obanโonly possible due to historic job
retention.
Usage
You build Workflow
workers within your application. Each worker within a
workflow must use Oban.Pro.Workers.Workflow
rather than Oban.Worker
,
otherwise jobs will execute without any dependency management.
Our example workflow will coordinate archiving an account. To properly archive the account we must perform the following workers:
FinalReceipt
โ generate a final receipt and email it to the account ownerEmailSubscriber
โ email each subscriber on the accountBackupPost
โ up all important posts on the account to external storageDeleteAccount
โ delete the account from the database
We need step 1 to run before step 2 or 3, but both steps 2 and 3 can run in
parallel. After all of the jobs in steps 2 and 3 finish we can finally delete
the account from the database. For demonstration purposes we'll define one of
the workers with a stubbed-in process/1
function:
defmodule MyApp.FinalReceipt do
use Oban.Pro.Workers.Workflow, queue: :default
@impl true
def process(%Job{} = _job) do
# Generate the final receipt.
:ok
end
end
Note that we define a process/1
callback instead of perform/1
because
the perform/1
function coordinates the workflow. The process/1
function
receives an Oban.Job
struct, just like perform/1
and it should return
similar values, i.e. :ok
, {:ok, value}
.
Assuming that you've defined the other workers, we can construct a workflow to enforce sequential execution:
alias MyApp.{BackupPost, DeleteAccount, EmailSubscriber, FinalReceipt}
def archive_account(acc_id) do
FinalReceipt.new_workflow()
|> FinalReceipt.add(:receipt, FinalReceipt.new(%{id: acc_id}))
|> FinalReceipt.add(:email, EmailSubscriber.new(%{id: acc_id}), deps: [:receipt])
|> FinalReceipt.add(:backup, BackupPost.new(%{id: acc_id}), deps: [:backup])
|> FinalReceipt.add(:delete, DeleteAccount.new(%{id: acc_id}), deps: [:backup, :receipt])
|> Oban.insert_all()
end
Here we use new_workflow/0,1
to initialize a new workflow (later on we will
look at how to customize workflow behaviour with various options). Jobs are then
added to the workflow with add/3,4
, where the first argument is an existing
workflow, the second argument is unique name, the third is a job changeset, and
the fourth an optional list of dependencies. Finally, the workflow passes
directly to Oban.insert_all/1,2
, which handles inserting each of the jobs into
the database atomically.
All workers that use Workflow
have new_workflow
and add
functions defined,
so it doesn't matter which module you use to initialize or to add new jobs.
Dependency resolution guarantees that jobs execute in the order receipt -> email -> backup -> delete
, even if one of the jobs fails and needs to retry.
Any job that defines a dependency will wait for each upstream dependency to
complete before it starts.
Recall how our original specification stated that there could be multiple
email
and backup
jobs? The workflow we've built only handles one email or
backup job at a time.
Let's modify the workflow to fan-out to multiple email
and backup
jobs and then fan-in to the final delete
job:
def archive_account_workflow(acc_id) do
subscribers = subscribers_for_account(acc_id)
documents = documents_for_account(acc_id)
email_deps = Enum.map(subscribers, &"email_#{&1.id}")
backup_deps = Enum.map(documents, &"backup_#{&1.id}")
delete_deps = email_deps ++ backup_deps
FinalReceipt.new_workflow()
|> FinalReceipt.add(:receipt, FinalReceipt.new(%{id: acc_id}))
|> add_email_jobs(subscribers)
|> add_backup_jobs(documents)
|> DeleteAccount.add(:delete, DeleteAccount.new(%{id: acc_id}), deps: delete_deps)
end
defp add_email_jobs(workflow, subscribers) do
Enum.reduce(subscribers, workflow, fn %{id: id, email: email}, acc ->
EmailSubscriber.add(acc, "email_#{id}", EmailSubscriber.new(%{email: email}), deps: [:receipt])
end)
end
defp add_backup_jobs(workflow, subscribers) do
Enum.reduce(subscribers, workflow, fn %{id: id}, acc ->
BackupPost.add(acc, "backup_#{id}", BackupPost.new(%{id: id}), deps: [:receipt])
end)
end
Now the workflow will run all of the email
and backup
jobs we need, before
deleting the account. To confirm the flow without inserting and executing jobs
we can visualize it.
Typespecs
๐ In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.
@type name :: atom() | String.t()
@type add_option :: {:deps, [name()]}
@type new_option ::
{:ignore_cancelled, boolean()}
| {:ignore_discarded, boolean()}
| {:waiting_delay, pos_integer()}
| {:waiting_limit, pos_integer()}
| {:waiting_snooze, pos_integer()}
| {:workflow_id, String.t()}
@type append_option :: new_option() | {:check_deps, boolean()}
Creating Workflows
@callback new_workflow([new_option()]) :: Workflow.t()
@callback add(Workflow.t(), name(), Changeset.t(), [add_option()]) :: Workflow.t()
Workflows use conservative defaults for safe, and relatively quick, dependency resolution. You can customize the safety checks, waiting times and resolution intensity by providing a few top-level options:
ignore_cancelled
โ regardcancelled
dependencies as completed rather than halting the workflow. Defaults tofalse
.ignore_discarded
โ regarddiscarded
dependencies as completed rather than halting the workflow. Defaults tofalse
.waiting_delay
โ the number of milliseconds to wait between dependency checks. This value, combined with thewaiting_limit
, dictates how long a job will await upstream dependencies. Defaults to1000ms
.waiting_limit
โ the number of times to retry dependency checks. This value, combined withwaiting_delay
, dictates how long a job will await upstream dependencies. Defaults to10
.waiting_snooze
โ the number of seconds a job will snooze after awaiting executing upstream dependencies. If upstream dependencies arescheduled
orretryable
then the job snoozes until the latest dependency'sscheduled_at
timestamp.
The following example creates a workflow with all of the available options:
workflow = MyWorkflow.new_workflow(
ignore_cancelled: true,
ignore_discarded: true,
waiting_delay: :timer.seconds(5),
waiting_limit: 5,
waiting_snooze: 10
)
Internally, the meta
field stores options for each job. That makes it possible
to set or override workflow options per-job. For example, configure a single job
to ignore cancelled dependencies and another to snooze longer:
MyWorkflow.new_workflow()
|> MyWorkflow.add(:a, MyWorkflow.new(%{}))
|> MyWorkflow.add(:b, MyWorkflow.new(%{}, meta: %{ignore_cancelled: true}), deps: [:a])
|> MyWorkflow.add(:c, MyWorkflow.new(%{}, meta: %{waiting_snooze: 30}), deps: [:b])
Generating Workflow IDs
@callback gen_id() :: String.t()
By default workflow_id
is a version 4 random UUID. This is more than
sufficient to ensure that workflows are unique for any period of time. However,
if you require more control you can override workflow_id
generation at the
worker level, or pass a value directly to the new_workflow/2
function.
To override the workflow_id
for a particular workflow you override the gen_id
callback:
defmodule MyApp.Workflow do
use Oban.Pro.Workers.Workflow
# Generate a 24 character long random string instead
@impl true
def gen_id do
24
|> :crypto.strong_rand_bytes()
|> Base.encode64()
end
...
end
The gen_id/0
callback works for random/non-deterministic id generation. If
you'd prefer to use a deterministic id instead you can pass the workflow_id
in
as an option to new_workflow/2
:
MyApp.Workflow.new_workflow(workflow_id: "custom-id")
Using this technique you can verify the workflow_id
in tests or append to the
workflow manually after it was originally created.
Streaming Workflow Jobs
@callback stream_workflow_jobs(Job.t(), Keyword.t()) :: Enum.t()
The stream_workflow_jobs/1,2
function makes it possible to lazily load all
jobs in a workflow from within your workers. For example, to fetch all of the
completed
jobs in a workflow:
defmodule MyApp.Workflow do
use Oban.Pro.Workers.Workflow
@impl Workflow
def process(%Job{} = job) do
{:ok, workflow_jobs} =
MyApp.Repo.transaction(fn ->
job
|> stream_workflow_jobs()
|> Stream.filter(& &1.state == "completed")
|> Enum.to_list()
end)
do_things_with_jobs(workflow_jobs)
:ok
end
end
Note that streaming is provided by Ecto's Repo.stream
, and it must take place
within a transaction. Using a stream lets you control the number of jobs loaded
from the database, minimizing memory usage for large workflows.
Appending to a Workflow
@callback append_workflow(Job.t() | [Job.t()], [append_option()]) :: Workflow.t()
Sometimes all jobs aren't known when the workflow is created. In that case, you
can add more jobs with optional dependency checking using append_workflow/2
.
An appended workflow starts with one or more jobs, which reuses the original
workflow id, and optionally builds a set of dependencies for checking.
In this example we disable deps checking with check_deps: false
:
defmodule MyApp.WorkflowWorker do
use Oban.Pro.Workers.Workflow
@impl Workflow
def process(%Job{} = job) do
jobs =
job
|> append_workflow(check_deps: false)
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
{:ok, jobs}
end
end
The new jobs specify deps on preexisting jobs named :a
, :b
, and :c
, but
there isn't any guarantee those jobs actually exist. That could lead to an
incomplete workflow where the new jobs may never complete.
To be safe and check jobs while appending we'll fetch all of the previous jobs and feed them in:
defmodule MyApp.WorkflowWorker do
use Oban.Pro.Workers.Workflow
@impl Workflow
def process(%Job{} = job) do
{:ok, jobs} =
MyApp.Repo.transaction(fn ->
job
|> stream_workflow_jobs()
|> Enum.to_list()
end)
jobs
|> append_workflow()
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
:ok
end
end
Now there isn't any risk of an incomplete workflow, at the expense of loading some extraneous jobs.
Visualizing Workflows
@callback to_dot(Workflow.t()) :: String.t()
Workflows are a type of Directed Acyclic Graph, also known as a DAG. That means we can describe a workflow as a graph of jobs and dependencies, where execution flows between jobs. By converting the workflow into DOT notation, a standard graph description language, we can render visualizations!
Dot generation relies on libgraph, which is an optional dependency. You'll need to specify it as a dependency before generating dot output:
def deps do
[{:libgraph, "~> 0.7"}]
end
Once you've installed libgraph
, we can use to_dot/1
to convert a workflow.
As with new_workflow
and add
, all workflow workers define a to_dot/1
function that takes a workflow and returns a dot formatted string. For example,
calling to_dot/1
with the account archiving workflow from above:
FinalReceipt.to_dot(archive_account_workflow(123))
Generates the following dot output, where each vertex is a combination of the job's name in the workflow and its worker module:
strict digraph {
"delete (MyApp.DeleteAccount)"
"backup_1 (MyApp.BackupPost)"
"backup_2 (MyApp.BackupPost)"
"backup_3 (MyApp.BackupPost)"
"receipt (MyApp.FinalReceipt)"
"email_1 (MyApp.EmailSubscriber)"
"email_2 (MyApp.EmailSubscriber)"
"backup_1 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
"backup_2 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
"backup_3 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "backup_1 (MyApp.BackupPost)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "backup_2 (MyApp.BackupPost)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "backup_3 (MyApp.BackupPost)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "email_1 (MyApp.EmailSubscriber)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "email_2 (MyApp.EmailSubscriber)" [weight=1]
"email_1 (MyApp.EmailSubscriber)" -> "delete (MyApp.DeleteAccount)" [weight=1]
"email_2 (MyApp.EmailSubscriber)" -> "delete (MyApp.DeleteAccount)" [weight=1]
}
Now we can take that dot output and render it using a tool like graphviz. The following example function accepts a workflow and renders it out as an SVG:
defmodule WorkflowRenderer do
alias Oban.Pro.Workers.Workflow
def render(workflow) do
dot_path = "workflow.dot"
svg_path = "workflow.svg"
File.write!(dot_path, Workflow.to_dot(workflow))
System.cmd("dot", ["-T", "svg", "-o", svg_path, dot_path])
end
end
With graphviz installed, that will generate a SVG of the workflow:
Looking at the visualized graph we can clearly see how the workflow starts with
a single render
job, fans-out to multiple email
and backup
jobs, and
finally fans-in to the delete
jobโexactly as we planned!
Implementation Notes
Jobs aren't executed until all upstream dependencies have completed. This includes waiting on retries, scheduled jobs, or snoozing.
Dependency resolution relies on jobs lingering in the database after execution. If your system prunes job dependencies then the workflow may never complete.