View Source Jido.Workflow (Jido v1.0.0)

Workflows provide a robust framework for executing and managing workflows (Action sequences) in a distributed system.

This module offers functionality to:

  • Run workflows synchronously or asynchronously
  • Manage timeouts and retries
  • Cancel running workflows
  • Normalize and validate input parameters and context
  • Emit telemetry events for monitoring and debugging

Workflows are defined as modules (Actions) that implement specific callbacks, allowing for a standardized way of defining and executing complex workflows across a distributed system.

Features

  • Synchronous and asynchronous workflow execution
  • Automatic retries with exponential backoff
  • Timeout handling for long-running workflows
  • Parameter and context normalization
  • Comprehensive error handling and reporting
  • Telemetry integration for monitoring and tracing
  • Cancellation of running workflows

Usage

Workflows are executed using the run/4 or run_async/4 functions:

Jido.Workflow.run(MyAction, %{param1: "value"}, %{context_key: "context_value"})

See Jido.Action for how to define an Action.

For asynchronous execution:

async_ref = Jido.Workflow.run_async(MyAction, params, context)
# ... do other work ...
result = Jido.Workflow.await(async_ref)

Integrating with OTP

For correct supervision of async tasks, ensure you start a Task.Supervisor under your application's supervision tree, for example:

def start(_type, _args) do
  children = [
    {Task.Supervisor, name: Jido.Workflow.TaskSupervisor},
    ...
  ]
  Supervisor.start_link(children, strategy: :one_for_one)
end

This way, any async tasks spawned by run_async/4 will be supervised by the Task Supervisor.

Summary

Functions

Waits for the result of an asynchronous Action execution.

Cancels a running asynchronous Action execution.

Executes a Action synchronously with the given parameters and context.

Executes a Action asynchronously with the given parameters and context.

Types

action()

@type action() :: module()

async_ref()

@type async_ref() :: %{ref: reference(), pid: pid()}

context()

@type context() :: map()

params()

@type params() :: map()

run_opts()

@type run_opts() :: [{:timeout, non_neg_integer()}]

Functions

await(map, timeout \\ 5000)

@spec await(async_ref(), timeout()) :: {:ok, map()} | {:error, Jido.Error.t()}

Waits for the result of an asynchronous Action execution.

Parameters

  • async_ref: The reference returned by run_async/4.
  • timeout: Maximum time (in ms) to wait for the result (default: 5000).

Returns

  • {:ok, result} if the Action executes successfully.
  • {:error, reason} if an error occurs during execution or if the workflow times out.

Examples

iex> async_ref = Jido.Workflow.run_async(MyAction, %{input: "value"})
iex> Jido.Workflow.await(async_ref, 10_000)
{:ok, %{result: "processed value"}}

iex> async_ref = Jido.Workflow.run_async(SlowAction, %{input: "value"})
iex> Jido.Workflow.await(async_ref, 100)
{:error, %Jido.Error{type: :timeout, message: "Async workflow timed out after 100ms"}}

cancel(pid)

@spec cancel(async_ref() | pid()) :: :ok | {:error, Jido.Error.t()}

Cancels a running asynchronous Action execution.

Parameters

  • async_ref: The reference returned by run_async/4, or just the PID of the process to cancel.

Returns

  • :ok if the cancellation was successful.
  • {:error, reason} if the cancellation failed or the input was invalid.

Examples

iex> async_ref = Jido.Workflow.run_async(LongRunningAction, %{input: "value"})
iex> Jido.Workflow.cancel(async_ref)
:ok

iex> Jido.Workflow.cancel("invalid")
{:error, %Jido.Error{type: :invalid_async_ref, message: "Invalid async ref for cancellation"}}

run(action, params \\ %{}, context \\ %{}, opts \\ [])

@spec run(action(), params(), context(), run_opts()) ::
  {:ok, map()} | {:error, Jido.Error.t()}

Executes a Action synchronously with the given parameters and context.

Parameters

  • action: The module implementing the Action behavior.
  • params: A map of input parameters for the Action.
  • context: A map providing additional context for the Action execution.
  • opts: Options controlling the execution:
    • :timeout - Maximum time (in ms) allowed for the Action to complete (default: 5000).
    • :max_retries - Maximum number of retry attempts (default: 1).
    • :backoff - Initial backoff time in milliseconds, doubles with each retry (default: 1000).

Returns

  • {:ok, result} if the Action executes successfully.
  • {:error, reason} if an error occurs during execution.

Examples

iex> Jido.Workflow.run(MyAction, %{input: "value"}, %{user_id: 123})
{:ok, %{result: "processed value"}}

iex> Jido.Workflow.run(MyAction, %{invalid: "input"}, %{}, timeout: 1000)
{:error, %Jido.Error{type: :validation_error, message: "Invalid input"}}

run_async(action, params \\ %{}, context \\ %{}, opts \\ [])

@spec run_async(action(), params(), context(), run_opts()) :: async_ref()

Executes a Action asynchronously with the given parameters and context.

This function immediately returns a reference that can be used to await the result or cancel the workflow.

Note: This approach integrates with OTP by spawning tasks under a Task.Supervisor. Make sure {Task.Supervisor, name: Jido.Workflow.TaskSupervisor} is part of your supervision tree.

Parameters

  • action: The module implementing the Action behavior.
  • params: A map of input parameters for the Action.
  • context: A map providing additional context for the Action execution.
  • opts: Options controlling the execution (same as run/4).

Returns

An async_ref map containing:

  • :ref - A unique reference for this async workflow.
  • :pid - The PID of the process executing the Action.

Examples

iex> async_ref = Jido.Workflow.run_async(MyAction, %{input: "value"}, %{user_id: 123})
%{ref: #Reference<0.1234.5678>, pid: #PID<0.234.0>}

iex> result = Jido.Workflow.await(async_ref)
{:ok, %{result: "processed value"}}