View Source Jido.Workflow (Jido v1.0.0)
Workflows provide a robust framework for executing and managing workflows (Action sequences) in a distributed system.
This module offers functionality to:
- Run workflows synchronously or asynchronously
- Manage timeouts and retries
- Cancel running workflows
- Normalize and validate input parameters and context
- Emit telemetry events for monitoring and debugging
Workflows are defined as modules (Actions) that implement specific callbacks, allowing for a standardized way of defining and executing complex workflows across a distributed system.
Features
- Synchronous and asynchronous workflow execution
- Automatic retries with exponential backoff
- Timeout handling for long-running workflows
- Parameter and context normalization
- Comprehensive error handling and reporting
- Telemetry integration for monitoring and tracing
- Cancellation of running workflows
Usage
Workflows are executed using the run/4
or run_async/4
functions:
Jido.Workflow.run(MyAction, %{param1: "value"}, %{context_key: "context_value"})
See Jido.Action
for how to define an Action.
For asynchronous execution:
async_ref = Jido.Workflow.run_async(MyAction, params, context)
# ... do other work ...
result = Jido.Workflow.await(async_ref)
Integrating with OTP
For correct supervision of async tasks, ensure you start a Task.Supervisor
under your
application's supervision tree, for example:
def start(_type, _args) do
children = [
{Task.Supervisor, name: Jido.Workflow.TaskSupervisor},
...
]
Supervisor.start_link(children, strategy: :one_for_one)
end
This way, any async tasks spawned by run_async/4
will be supervised by the Task Supervisor.
Summary
Functions
Waits for the result of an asynchronous Action execution.
Cancels a running asynchronous Action execution.
Executes a Action synchronously with the given parameters and context.
Executes a Action asynchronously with the given parameters and context.
Types
Functions
@spec await(async_ref(), timeout()) :: {:ok, map()} | {:error, Jido.Error.t()}
Waits for the result of an asynchronous Action execution.
Parameters
async_ref
: The reference returned byrun_async/4
.timeout
: Maximum time (in ms) to wait for the result (default: 5000).
Returns
{:ok, result}
if the Action executes successfully.{:error, reason}
if an error occurs during execution or if the workflow times out.
Examples
iex> async_ref = Jido.Workflow.run_async(MyAction, %{input: "value"})
iex> Jido.Workflow.await(async_ref, 10_000)
{:ok, %{result: "processed value"}}
iex> async_ref = Jido.Workflow.run_async(SlowAction, %{input: "value"})
iex> Jido.Workflow.await(async_ref, 100)
{:error, %Jido.Error{type: :timeout, message: "Async workflow timed out after 100ms"}}
@spec cancel(async_ref() | pid()) :: :ok | {:error, Jido.Error.t()}
Cancels a running asynchronous Action execution.
Parameters
async_ref
: The reference returned byrun_async/4
, or just the PID of the process to cancel.
Returns
:ok
if the cancellation was successful.{:error, reason}
if the cancellation failed or the input was invalid.
Examples
iex> async_ref = Jido.Workflow.run_async(LongRunningAction, %{input: "value"})
iex> Jido.Workflow.cancel(async_ref)
:ok
iex> Jido.Workflow.cancel("invalid")
{:error, %Jido.Error{type: :invalid_async_ref, message: "Invalid async ref for cancellation"}}
Executes a Action synchronously with the given parameters and context.
Parameters
action
: The module implementing the Action behavior.params
: A map of input parameters for the Action.context
: A map providing additional context for the Action execution.opts
: Options controlling the execution::timeout
- Maximum time (in ms) allowed for the Action to complete (default: 5000).:max_retries
- Maximum number of retry attempts (default: 1).:backoff
- Initial backoff time in milliseconds, doubles with each retry (default: 1000).
Returns
{:ok, result}
if the Action executes successfully.{:error, reason}
if an error occurs during execution.
Examples
iex> Jido.Workflow.run(MyAction, %{input: "value"}, %{user_id: 123})
{:ok, %{result: "processed value"}}
iex> Jido.Workflow.run(MyAction, %{invalid: "input"}, %{}, timeout: 1000)
{:error, %Jido.Error{type: :validation_error, message: "Invalid input"}}
Executes a Action asynchronously with the given parameters and context.
This function immediately returns a reference that can be used to await the result or cancel the workflow.
Note: This approach integrates with OTP by spawning tasks under a Task.Supervisor
.
Make sure {Task.Supervisor, name: Jido.Workflow.TaskSupervisor}
is part of your supervision tree.
Parameters
action
: The module implementing the Action behavior.params
: A map of input parameters for the Action.context
: A map providing additional context for the Action execution.opts
: Options controlling the execution (same asrun/4
).
Returns
An async_ref
map containing:
:ref
- A unique reference for this async workflow.:pid
- The PID of the process executing the Action.
Examples
iex> async_ref = Jido.Workflow.run_async(MyAction, %{input: "value"}, %{user_id: 123})
%{ref: #Reference<0.1234.5678>, pid: #PID<0.234.0>}
iex> result = Jido.Workflow.await(async_ref)
{:ok, %{result: "processed value"}}