ExEssentials.Core.Runner (ExEssentials v0.10.1)
View SourceA small flow builder inspired by Ecto.Multi.
ExEssentials.Core.Runner helps you model and execute a flow as a sequence of named steps.
Each step contributes a value to a shared map called changes.
This module is designed to be built first and executed later:
put/3,run/3, andrun_async/3only register steps- execution happens only when calling
finish/1orfinish/2
The execution model is fail-fast:
- steps run in the order they were registered
- when a step returns
{:error, reason}, the flow stops immediately - the error includes the failing step name and the
changesaccumulated so far
Steps
Steps are always registered with a unique name (atom()). Names are used:
- as keys in the resulting
changesmap - to identify the failing step in case of an error
Supported step types:
put/3- seeds a value intochangesrun/3- registers a synchronous step executed in orderrun_async/3- registers an asynchronous step executed concurrentlyrecover/3- registers an error recovery handler that can inject steps when an error occurs
Asynchronous steps
Asynchronous steps are queued as the flow is traversed and are executed concurrently when the flow needs to synchronize results:
- before running a synchronous step (
run/3) - at the end of the flow
Each asynchronous step runs with a snapshot of changes from the moment the async step
is registered during execution.
Return values
finish/1 returns one of these tuples:
{:ok, changes}{:error, step, reason, changes_before}
finish/2 executes the flow and forwards the result tuple to a user function, allowing you to
transform it into any shape you want.
Examples
Basic flow
runner =
ExEssentials.Core.Runner.new(timeout: 5_000)
|> ExEssentials.Core.Runner.put(:value, 1)
|> ExEssentials.Core.Runner.run(:double, fn %{value: v} -> {:ok, v * 2} end)
|> ExEssentials.Core.Runner.run(:triple, fn %{value: v} -> {:ok, v * 3} end)
ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 1, double: 2, triple: 3}}Mixing sync and async
runner =
ExEssentials.Core.Runner.new(timeout: 5_000)
|> ExEssentials.Core.Runner.put(:value, 1)
|> ExEssentials.Core.Runner.run(:sync_step, fn _ -> {:ok, 2} end)
|> ExEssentials.Core.Runner.run_async(:async_step, fn _ -> {:ok, 3} end)
|> ExEssentials.Core.Runner.run(:sum, fn %{value: a, sync_step: b, async_step: c} -> {:ok, a + b + c} end)
ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 1, sync_step: 2, async_step: 3, sum: 6}}Error handling
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:value, 1)
|> ExEssentials.Core.Runner.run(:may_fail, fn _ -> {:error, :boom} end)
|> ExEssentials.Core.Runner.run(:never_runs, fn _ -> {:ok, :ignored} end)
ExEssentials.Core.Runner.finish(runner)
#=> {:error, :may_fail, :boom, %{value: 1}}Transforming the result with finish/2
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:value, 1)
|> ExEssentials.Core.Runner.run(:double, fn %{value: v} -> {:ok, v * 2} end)
ExEssentials.Core.Runner.finish(runner, fn
{:ok, %{double: result}} -> {:ok, result}
{:error, step, reason, _changes} -> {:error, step, reason}
end)
#=> {:ok, 2}
Summary
Functions
Conditionally applies on_true when predicate evaluates to true.
Executes the flow and returns the execution result.
Executes the flow and passes the result to the given function.
Creates a new runner.
Seeds a value into changes under step_name and registers a :put step.
Registers an error recovery handler.
Registers a synchronous step.
Registers an asynchronous step.
Selects and applies a continuation at runtime during finish/1, based on the current changes map.
Applies a continuation function to the runner.
Types
@type finish_result() :: {:ok, changes :: map()} | {:error, step :: atom(), reason :: any(), changes_before :: map()}
Result returned by finish/1.
On success, returns the final changes map.
On error, returns the failing step name, the reason, and the changes accumulated so far.
@type step() :: {:put, step :: atom(), value :: any()} | {:sync, step :: atom(), (map() -> {:ok, any()} | {:error, any()})} | {:async, step :: atom(), (map() -> {:ok, any()} | {:error, any()})} | {:branch, predicate :: (map() -> boolean()), on_true :: (t() -> t())} | {:switch, selector :: (map() -> (t() -> t()))} | {:recover, predicate :: ({step :: atom(), reason :: any(), changes_before :: map()} -> boolean()), on_error :: (runner :: t(), {step :: atom(), reason :: any(), changes_before :: map()} -> t())}
A single registered step in a flow.
Steps are registered during build time and executed during finish/1.
{:put, name, value}- seeds a value{:sync, name, fun}- runs synchronously{:async, name, fun}- runs concurrently{:branch, predicate, on_true}- conditionally injects more steps at runtime{:switch, selector}- injects more steps at runtime based onchanges{:recover, predicate, on_error}- conditionally intercepts errors and injects more steps at runtime
@type t() :: %ExEssentials.Core.Runner{ changes: map(), error_reason: any(), failed?: boolean(), failed_step: step :: atom() | nil, steps: [step()], timeout: integer() }
The runner struct.
The steps field stores the planned execution steps and changes stores seeded values.
Functions
@spec branch( runner :: t(), predicate :: (changes :: map() -> boolean()), on_true :: (runner :: t() -> t()) ) :: t()
Conditionally applies on_true when predicate evaluates to true.
The predicate is evaluated at runtime during finish/1, using the current changes map.
Examples
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:status, :rejected)
|> ExEssentials.Core.Runner.branch(
fn %{status: status} -> status == :rejected end,
fn r -> ExEssentials.Core.Runner.put(r, :compensate, true) end
)
ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{status: :rejected, compensate: true}}
@spec finish(runner :: t()) :: {:ok, changes :: map()} | {:error, step :: atom(), reason :: any(), changes_before :: map()}
Executes the flow and returns the execution result.
Returns:
{:ok, changes}when all steps succeed{:error, step, reason, changes_before}when a step fails
Examples
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:value, 1)
|> ExEssentials.Core.Runner.run(:double, fn %{value: v} -> {:ok, v * 2} end)
ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 1, double: 2}}
@spec finish(runner :: t(), function :: (finish_result() -> any())) :: any()
Executes the flow and passes the result to the given function.
This is useful when you want to normalize the output or extract a single value.
Examples
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:value, 1)
|> ExEssentials.Core.Runner.run(:double, fn %{value: v} -> {:ok, v * 2} end)
ExEssentials.Core.Runner.finish(runner, fn
{:ok, %{double: result}} -> {:ok, result}
{:error, step, reason, _changes} -> {:error, step, reason}
end)
#=> {:ok, 2}
Creates a new runner.
Options
:timeout- timeout (in milliseconds) used when awaiting asynchronous steps.
Examples
iex> ExEssentials.Core.Runner.new().timeout
5000
iex> ExEssentials.Core.Runner.new(timeout: 10_000).timeout
10000
Seeds a value into changes under step_name and registers a :put step.
Values seeded via put/3 are immediately available in runner.changes, which is useful when
composing flows with branch/3 and switch/2.
Raises ArgumentError if the step name has already been used.
Examples
iex> runner = ExEssentials.Core.Runner.new() |> ExEssentials.Core.Runner.put(:a, 1)
iex> runner.changes
%{a: 1}
@spec recover( runner :: t(), predicate :: ({step :: atom(), reason :: any(), changes_before :: map()} -> boolean()), on_error :: (runner :: t(), {step :: atom(), reason :: any(), changes_before :: map()} -> t()) ) :: t()
Registers an error recovery handler.
By default, the runner is fail-fast: when a step returns {:error, reason} the flow stops immediately.
recover/3 lets you intercept an error and inject new steps to continue the flow.
The recovery handler is evaluated only when an error happens.
predicatereceives{step, reason, changes_before}and must returntruewhen the error should be recovered.on_errorreceives the current runner and the same{step, reason, changes_before}tuple and must return a runner containing the steps to be injected.
When a recovery matches, the runner:
- skips steps until the matching
recover/3 - injects the steps returned by
on_error - continues with the remaining steps after the matching
recover/3
If no recovery matches, the runner returns the original error tuple.
Example
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.run(:may_fail, fn _ -> {:error, {:http, 500}} end)
|> ExEssentials.Core.Runner.recover(
fn {_step, reason, _changes} -> match?({:http, 500}, reason) end,
fn r, {_step, reason, _changes} ->
ExEssentials.Core.Runner.run(r, :enqueue_retry, fn _ -> {:ok, {:scheduled, reason}} end)
end
)
|> ExEssentials.Core.Runner.run(:after, fn %{enqueue_retry: v} -> {:ok, v} end)
ExEssentials.Core.Runner.finish(runner)
@spec run( runner :: t(), step_name :: atom(), function :: (changes :: map() -> {:ok, result :: any()} | {:error, reason :: any()}) ) :: t()
Registers a synchronous step.
The given function is executed during finish/1 and receives the current changes map.
The function must return:
{:ok, result}- storesresultunder the step name{:error, reason}- stops the flow with an error
Raises ArgumentError if the step name has already been used.
Examples
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:value, 2)
|> ExEssentials.Core.Runner.run(:square, fn %{value: v} -> {:ok, v * v} end)
ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 2, square: 4}}
@spec run_async( runner :: t(), step_name :: atom(), function :: (changes :: map() -> {:ok, result :: any()} | {:error, reason :: any()}) ) :: t()
Registers an asynchronous step.
Asynchronous steps are executed concurrently during finish/1. Their results are merged into
changes under their step names.
Raises ArgumentError if the step name has already been used.
Examples
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:value, 2)
|> ExEssentials.Core.Runner.run_async(:double, fn %{value: v} -> {:ok, v * 2} end)
|> ExEssentials.Core.Runner.run(:sum, fn %{value: v, double: d} -> {:ok, v + d} end)
ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 2, double: 4, sum: 6}}
Selects and applies a continuation at runtime during finish/1, based on the current changes map.
The selector must return a function that receives the runner.
Examples
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:status, :settled)
|> ExEssentials.Core.Runner.switch(fn
%{status: :settled} -> fn r -> ExEssentials.Core.Runner.put(r, :final, :ok) end
_ -> fn r -> ExEssentials.Core.Runner.put(r, :final, :error) end
end)
ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{status: :settled, final: :ok}}
Applies a continuation function to the runner.
This is a convenience for composing flow-building functions.
Examples
runner =
ExEssentials.Core.Runner.new()
|> ExEssentials.Core.Runner.put(:a, 1)
|> ExEssentials.Core.Runner.then(fn r -> ExEssentials.Core.Runner.put(r, :b, 2) end)
runner.changes
#=> %{a: 1, b: 2}