ExEssentials.Core.Runner (ExEssentials v0.8.0)
View SourceA small, composable flow runner for building step-by-step pipelines with optional asynchronous work.
ExEssentials.Core.Runner lets you accumulate results across named steps while keeping a single
immutable changes map that is passed to each step function.
It supports two kinds of steps:
- **Synchronous steps** via `run/3`, executed immediately and stored in `changes`.
- **Asynchronous steps** via `run_async/3`, executed in a separate `Task` and merged into `changes`
when the runner is about to continue synchronously (or when `finish/2` is called).The runner is fail-fast:
- When a step returns `{:error, reason}`, the runner is marked as failed and subsequent calls to
`put/3`, `run/3`, or `run_async/3` become no-ops.
- Step names must be unique within a flow. Reusing a step name raises an `ArgumentError`. Asynchronous steps are awaited using Task.yield_many/2 with the configured :timeout (in milliseconds).
Tasks that do not respond within the timeout are shutdown and their results are not merged.
When you are done building the flow, call finish/2 to await any remaining async work and receive a
final result in the shape of either {:ok, changes} or {:error, failed_step, reason, changes_before_error}.
Summary
Types
Functions
@spec finish( runner :: t(), function :: (changes :: {:ok, map()} | {:error, atom(), any()} -> any()) ) :: any()
Finalizes the flow and returns the output of function.
This function awaits any pending async tasks (subject to the runner's :timeout) and then calls
function with one of the following results:
- `{:ok, changes}` when all executed steps completed successfully.
- `{:error, failed_step, reason, changes_before_error}` when a step failed. changes_before_error contains only the accumulated values up to (but not including) the failing step.
## Examples
iex> ExEssentials.Core.Runner.new()
...> |> ExEssentials.Core.Runner.put(:a, 1)
...> |> ExEssentials.Core.Runner.run(:b, fn ch -> {:ok, ch.a + 1} end)
...> |> ExEssentials.Core.Runner.finish(fn result -> result end)
{:ok, %{a: 1, b: 2}}
iex> ExEssentials.Core.Runner.new()
...> |> ExEssentials.Core.Runner.put(:a, 1)
...> |> ExEssentials.Core.Runner.run(:b, fn _ch -> {:error, :nope} end)
...> |> ExEssentials.Core.Runner.run(:c, fn _ch -> {:ok, :never_runs} end)
...> |> ExEssentials.Core.Runner.finish(fn result -> result end)
{:error, :b, :nope, %{a: 1}}
Creates a new runner.
Options:
- `:timeout` - the timeout (in milliseconds) used when awaiting async steps (default: `5000`).## Examples
iex> runner = ExEssentials.Core.Runner.new(timeout: 1_000)
iex> runner.timeout
1000
Inserts a value into the runner under step_name.
This is useful for seeding the flow with inputs or precomputed values.
If the runner is already marked as failed, this function returns the runner unchanged.
## Examples
iex> runner = ExEssentials.Core.Runner.new()
iex> runner = ExEssentials.Core.Runner.put(runner, :user_id, 123)
iex> runner.changes
%{user_id: 123}
@spec run( runner :: t(), step_name :: atom(), function :: (changes :: map() -> {:ok, result :: any()} | {:error, reason :: any()}) ) :: t()
Executes a synchronous step and stores its result in changes.
Before running the step, the runner will await any pending async tasks so that their results are available to the step.
The step function receives the current changes map and must return either:
- `{:ok, result}` to continue the flow, storing `result` under `step_name`.
- `{:error, reason}` to fail the flow, storing `reason` under `step_name` and marking the runner as failed. Step names must be unique; reusing the same step_name raises an ArgumentError.
If the runner is already marked as failed, this function returns the runner unchanged.
## Examples
iex> runner = ExEssentials.Core.Runner.new()
iex> runner = ExEssentials.Core.Runner.put(runner, :a, 2)
iex> runner = ExEssentials.Core.Runner.run(runner, :b, fn changes -> {:ok, changes.a * 3} end)
iex> runner.changes
%{a: 2, b: 6}
iex> runner = ExEssentials.Core.Runner.new()
iex> runner = ExEssentials.Core.Runner.run(runner, :step, fn _changes -> {:error, :boom} end)
iex> {runner.failed?, runner.failed_step, runner.error_reason}
{true, :step, :boom}
@spec run_async( runner :: t(), step_name :: atom(), function :: (changes :: map() -> {:ok, result :: any()} | {:error, reason :: any()}) ) :: t()
Spawns an asynchronous step as a Task.
The step function receives the current changes map as it exists at the time the task is spawned.
The task result is merged into changes the next time the runner needs to synchronize (on run/3
or finish/2).
If the runner is already marked as failed, this function returns the runner unchanged.
Step names must be unique; reusing the same step_name raises an ArgumentError.
Notes on timeouts:
- Async tasks are awaited with the runner's configured `:timeout`.
- If a task does not respond within the timeout, it is shutdown and its result is not merged.## Examples
iex> runner = ExEssentials.Core.Runner.new(timeout: 5_000)
iex> runner = ExEssentials.Core.Runner.put(runner, :base, 10)
iex> runner = ExEssentials.Core.Runner.run_async(runner, :double, fn changes -> {:ok, changes.base * 2} end)
iex> runner = ExEssentials.Core.Runner.run(runner, :plus_one, fn changes -> {:ok, changes.base + 1} end)
iex> Map.take(runner.changes, [:base, :double, :plus_one])
%{base: 10, double: 20, plus_one: 11}