ExEssentials.Core.Runner (ExEssentials v0.8.0)

View Source

A small, composable flow runner for building step-by-step pipelines with optional asynchronous work.

ExEssentials.Core.Runner lets you accumulate results across named steps while keeping a single immutable changes map that is passed to each step function.

It supports two kinds of steps:

- **Synchronous steps** via `run/3`, executed immediately and stored in `changes`.
- **Asynchronous steps** via `run_async/3`, executed in a separate `Task` and merged into `changes`
  when the runner is about to continue synchronously (or when `finish/2` is called).

The runner is fail-fast:

- When a step returns `{:error, reason}`, the runner is marked as failed and subsequent calls to
  `put/3`, `run/3`, or `run_async/3` become no-ops.
- Step names must be unique within a flow. Reusing a step name raises an `ArgumentError`.

Asynchronous steps are awaited using Task.yield_many/2 with the configured :timeout (in milliseconds). Tasks that do not respond within the timeout are shutdown and their results are not merged.

When you are done building the flow, call finish/2 to await any remaining async work and receive a final result in the shape of either {:ok, changes} or {:error, failed_step, reason, changes_before_error}.

Summary

Functions

Finalizes the flow and returns the output of function.

Creates a new runner.

Inserts a value into the runner under step_name.

Executes a synchronous step and stores its result in changes.

Spawns an asynchronous step as a Task.

Types

step()

@type step() :: {:sync, atom(), any()} | {:async, atom(), Task.t()}

t()

@type t() :: %ExEssentials.Core.Runner{
  async_tasks: [Task.t()],
  changes: map(),
  error_reason: any(),
  failed?: boolean(),
  failed_step: atom(),
  steps: [step()],
  timeout: integer()
}

Functions

finish(runner, function)

@spec finish(
  runner :: t(),
  function :: (changes :: {:ok, map()} | {:error, atom(), any()} -> any())
) ::
  any()

Finalizes the flow and returns the output of function.

This function awaits any pending async tasks (subject to the runner's :timeout) and then calls function with one of the following results:

- `{:ok, changes}` when all executed steps completed successfully.
- `{:error, failed_step, reason, changes_before_error}` when a step failed.

changes_before_error contains only the accumulated values up to (but not including) the failing step.

## Examples

  iex> ExEssentials.Core.Runner.new()
  ...> |> ExEssentials.Core.Runner.put(:a, 1)
  ...> |> ExEssentials.Core.Runner.run(:b, fn ch -> {:ok, ch.a + 1} end)
  ...> |> ExEssentials.Core.Runner.finish(fn result -> result end)
  {:ok, %{a: 1, b: 2}}

  iex> ExEssentials.Core.Runner.new()
  ...> |> ExEssentials.Core.Runner.put(:a, 1)
  ...> |> ExEssentials.Core.Runner.run(:b, fn _ch -> {:error, :nope} end)
  ...> |> ExEssentials.Core.Runner.run(:c, fn _ch -> {:ok, :never_runs} end)
  ...> |> ExEssentials.Core.Runner.finish(fn result -> result end)
  {:error, :b, :nope, %{a: 1}}

new(opts \\ [])

@spec new(opts :: Keyword.t()) :: t()

Creates a new runner.

Options:

- `:timeout` - the timeout (in milliseconds) used when awaiting async steps (default: `5000`).

## Examples

  iex> runner = ExEssentials.Core.Runner.new(timeout: 1_000)
  iex> runner.timeout
  1000

put(runner, step_name, value)

@spec put(runner :: t(), step_name :: atom(), value :: any()) :: t()

Inserts a value into the runner under step_name.

This is useful for seeding the flow with inputs or precomputed values.

If the runner is already marked as failed, this function returns the runner unchanged.

## Examples

  iex> runner = ExEssentials.Core.Runner.new()
  iex> runner = ExEssentials.Core.Runner.put(runner, :user_id, 123)
  iex> runner.changes
  %{user_id: 123}

run(runner, step_name, function)

@spec run(
  runner :: t(),
  step_name :: atom(),
  function :: (changes :: map() ->
                 {:ok, result :: any()} | {:error, reason :: any()})
) :: t()

Executes a synchronous step and stores its result in changes.

Before running the step, the runner will await any pending async tasks so that their results are available to the step.

The step function receives the current changes map and must return either:

- `{:ok, result}` to continue the flow, storing `result` under `step_name`.
- `{:error, reason}` to fail the flow, storing `reason` under `step_name` and marking the runner as failed.

Step names must be unique; reusing the same step_name raises an ArgumentError.

If the runner is already marked as failed, this function returns the runner unchanged.

## Examples

  iex> runner = ExEssentials.Core.Runner.new()
  iex> runner = ExEssentials.Core.Runner.put(runner, :a, 2)
  iex> runner = ExEssentials.Core.Runner.run(runner, :b, fn changes -> {:ok, changes.a * 3} end)
  iex> runner.changes
  %{a: 2, b: 6}

  iex> runner = ExEssentials.Core.Runner.new()
  iex> runner = ExEssentials.Core.Runner.run(runner, :step, fn _changes -> {:error, :boom} end)
  iex> {runner.failed?, runner.failed_step, runner.error_reason}
  {true, :step, :boom}

run_async(runner, step_name, function)

@spec run_async(
  runner :: t(),
  step_name :: atom(),
  function :: (changes :: map() ->
                 {:ok, result :: any()} | {:error, reason :: any()})
) :: t()

Spawns an asynchronous step as a Task.

The step function receives the current changes map as it exists at the time the task is spawned. The task result is merged into changes the next time the runner needs to synchronize (on run/3 or finish/2).

If the runner is already marked as failed, this function returns the runner unchanged.

Step names must be unique; reusing the same step_name raises an ArgumentError.

Notes on timeouts:

- Async tasks are awaited with the runner's configured `:timeout`.
- If a task does not respond within the timeout, it is shutdown and its result is not merged.

## Examples

  iex> runner = ExEssentials.Core.Runner.new(timeout: 5_000)
  iex> runner = ExEssentials.Core.Runner.put(runner, :base, 10)
  iex> runner = ExEssentials.Core.Runner.run_async(runner, :double, fn changes -> {:ok, changes.base * 2} end)
  iex> runner = ExEssentials.Core.Runner.run(runner, :plus_one, fn changes -> {:ok, changes.base + 1} end)
  iex> Map.take(runner.changes, [:base, :double, :plus_one])
  %{base: 10, double: 20, plus_one: 11}