ExEssentials.Core.Runner (ExEssentials v0.10.1)

View Source

A small flow builder inspired by Ecto.Multi.

ExEssentials.Core.Runner helps you model and execute a flow as a sequence of named steps. Each step contributes a value to a shared map called changes.

This module is designed to be built first and executed later:

The execution model is fail-fast:

  • steps run in the order they were registered
  • when a step returns {:error, reason}, the flow stops immediately
  • the error includes the failing step name and the changes accumulated so far

Steps

Steps are always registered with a unique name (atom()). Names are used:

  • as keys in the resulting changes map
  • to identify the failing step in case of an error

Supported step types:

  • put/3 - seeds a value into changes
  • run/3 - registers a synchronous step executed in order
  • run_async/3 - registers an asynchronous step executed concurrently
  • recover/3 - registers an error recovery handler that can inject steps when an error occurs

Asynchronous steps

Asynchronous steps are queued as the flow is traversed and are executed concurrently when the flow needs to synchronize results:

  • before running a synchronous step (run/3)
  • at the end of the flow

Each asynchronous step runs with a snapshot of changes from the moment the async step is registered during execution.

Return values

finish/1 returns one of these tuples:

  • {:ok, changes}
  • {:error, step, reason, changes_before}

finish/2 executes the flow and forwards the result tuple to a user function, allowing you to transform it into any shape you want.

Examples

Basic flow

runner =
  ExEssentials.Core.Runner.new(timeout: 5_000)
  |> ExEssentials.Core.Runner.put(:value, 1)
  |> ExEssentials.Core.Runner.run(:double, fn %{value: v} -> {:ok, v * 2} end)
  |> ExEssentials.Core.Runner.run(:triple, fn %{value: v} -> {:ok, v * 3} end)

ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 1, double: 2, triple: 3}}

Mixing sync and async

runner =
  ExEssentials.Core.Runner.new(timeout: 5_000)
  |> ExEssentials.Core.Runner.put(:value, 1)
  |> ExEssentials.Core.Runner.run(:sync_step, fn _ -> {:ok, 2} end)
  |> ExEssentials.Core.Runner.run_async(:async_step, fn _ -> {:ok, 3} end)
  |> ExEssentials.Core.Runner.run(:sum, fn %{value: a, sync_step: b, async_step: c} -> {:ok, a + b + c} end)

ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 1, sync_step: 2, async_step: 3, sum: 6}}

Error handling

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:value, 1)
  |> ExEssentials.Core.Runner.run(:may_fail, fn _ -> {:error, :boom} end)
  |> ExEssentials.Core.Runner.run(:never_runs, fn _ -> {:ok, :ignored} end)

ExEssentials.Core.Runner.finish(runner)
#=> {:error, :may_fail, :boom, %{value: 1}}

Transforming the result with finish/2

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:value, 1)
  |> ExEssentials.Core.Runner.run(:double, fn %{value: v} -> {:ok, v * 2} end)

ExEssentials.Core.Runner.finish(runner, fn
  {:ok, %{double: result}} -> {:ok, result}
  {:error, step, reason, _changes} -> {:error, step, reason}
end)
#=> {:ok, 2}

Summary

Types

Result returned by finish/1.

A single registered step in a flow.

t()

The runner struct.

Functions

Conditionally applies on_true when predicate evaluates to true.

Executes the flow and returns the execution result.

Executes the flow and passes the result to the given function.

Creates a new runner.

Seeds a value into changes under step_name and registers a :put step.

Registers an error recovery handler.

Registers a synchronous step.

Registers an asynchronous step.

Selects and applies a continuation at runtime during finish/1, based on the current changes map.

Applies a continuation function to the runner.

Types

finish_result()

@type finish_result() ::
  {:ok, changes :: map()}
  | {:error, step :: atom(), reason :: any(), changes_before :: map()}

Result returned by finish/1.

On success, returns the final changes map. On error, returns the failing step name, the reason, and the changes accumulated so far.

step()

@type step() ::
  {:put, step :: atom(), value :: any()}
  | {:sync, step :: atom(), (map() -> {:ok, any()} | {:error, any()})}
  | {:async, step :: atom(), (map() -> {:ok, any()} | {:error, any()})}
  | {:branch, predicate :: (map() -> boolean()), on_true :: (t() -> t())}
  | {:switch, selector :: (map() -> (t() -> t()))}
  | {:recover,
     predicate :: ({step :: atom(), reason :: any(), changes_before :: map()} ->
                     boolean()),
     on_error :: (runner :: t(),
                  {step :: atom(), reason :: any(), changes_before :: map()} ->
                    t())}

A single registered step in a flow.

Steps are registered during build time and executed during finish/1.

  • {:put, name, value} - seeds a value
  • {:sync, name, fun} - runs synchronously
  • {:async, name, fun} - runs concurrently
  • {:branch, predicate, on_true} - conditionally injects more steps at runtime
  • {:switch, selector} - injects more steps at runtime based on changes
  • {:recover, predicate, on_error} - conditionally intercepts errors and injects more steps at runtime

t()

@type t() :: %ExEssentials.Core.Runner{
  changes: map(),
  error_reason: any(),
  failed?: boolean(),
  failed_step: step :: atom() | nil,
  steps: [step()],
  timeout: integer()
}

The runner struct.

The steps field stores the planned execution steps and changes stores seeded values.

Functions

branch(runner, predicate, on_true)

@spec branch(
  runner :: t(),
  predicate :: (changes :: map() -> boolean()),
  on_true :: (runner :: t() -> t())
) :: t()

Conditionally applies on_true when predicate evaluates to true.

The predicate is evaluated at runtime during finish/1, using the current changes map.

Examples

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:status, :rejected)
  |> ExEssentials.Core.Runner.branch(
    fn %{status: status} -> status == :rejected end,
    fn r -> ExEssentials.Core.Runner.put(r, :compensate, true) end
  )

ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{status: :rejected, compensate: true}}

finish(runner)

@spec finish(runner :: t()) ::
  {:ok, changes :: map()}
  | {:error, step :: atom(), reason :: any(), changes_before :: map()}

Executes the flow and returns the execution result.

Returns:

  • {:ok, changes} when all steps succeed
  • {:error, step, reason, changes_before} when a step fails

Examples

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:value, 1)
  |> ExEssentials.Core.Runner.run(:double, fn %{value: v} -> {:ok, v * 2} end)

ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 1, double: 2}}

finish(runner, function)

@spec finish(runner :: t(), function :: (finish_result() -> any())) :: any()

Executes the flow and passes the result to the given function.

This is useful when you want to normalize the output or extract a single value.

Examples

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:value, 1)
  |> ExEssentials.Core.Runner.run(:double, fn %{value: v} -> {:ok, v * 2} end)

ExEssentials.Core.Runner.finish(runner, fn
  {:ok, %{double: result}} -> {:ok, result}
  {:error, step, reason, _changes} -> {:error, step, reason}
end)
#=> {:ok, 2}

new(opts \\ [])

@spec new(opts :: keyword()) :: t()

Creates a new runner.

Options

  • :timeout - timeout (in milliseconds) used when awaiting asynchronous steps.

Examples

iex> ExEssentials.Core.Runner.new().timeout
5000

iex> ExEssentials.Core.Runner.new(timeout: 10_000).timeout
10000

put(runner, step_name, value)

@spec put(runner :: t(), step_name :: atom(), value :: any()) :: t()

Seeds a value into changes under step_name and registers a :put step.

Values seeded via put/3 are immediately available in runner.changes, which is useful when composing flows with branch/3 and switch/2.

Raises ArgumentError if the step name has already been used.

Examples

iex> runner = ExEssentials.Core.Runner.new() |> ExEssentials.Core.Runner.put(:a, 1)
iex> runner.changes
%{a: 1}

recover(runner, predicate, on_error)

@spec recover(
  runner :: t(),
  predicate :: ({step :: atom(), reason :: any(), changes_before :: map()} ->
                  boolean()),
  on_error :: (runner :: t(),
               {step :: atom(), reason :: any(), changes_before :: map()} ->
                 t())
) :: t()

Registers an error recovery handler.

By default, the runner is fail-fast: when a step returns {:error, reason} the flow stops immediately. recover/3 lets you intercept an error and inject new steps to continue the flow.

The recovery handler is evaluated only when an error happens.

  • predicate receives {step, reason, changes_before} and must return true when the error should be recovered.
  • on_error receives the current runner and the same {step, reason, changes_before} tuple and must return a runner containing the steps to be injected.

When a recovery matches, the runner:

  • skips steps until the matching recover/3
  • injects the steps returned by on_error
  • continues with the remaining steps after the matching recover/3

If no recovery matches, the runner returns the original error tuple.

Example

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.run(:may_fail, fn _ -> {:error, {:http, 500}} end)
  |> ExEssentials.Core.Runner.recover(
    fn {_step, reason, _changes} -> match?({:http, 500}, reason) end,
    fn r, {_step, reason, _changes} ->
      ExEssentials.Core.Runner.run(r, :enqueue_retry, fn _ -> {:ok, {:scheduled, reason}} end)
    end
  )
  |> ExEssentials.Core.Runner.run(:after, fn %{enqueue_retry: v} -> {:ok, v} end)

ExEssentials.Core.Runner.finish(runner)

run(runner, step_name, function)

@spec run(
  runner :: t(),
  step_name :: atom(),
  function :: (changes :: map() ->
                 {:ok, result :: any()} | {:error, reason :: any()})
) :: t()

Registers a synchronous step.

The given function is executed during finish/1 and receives the current changes map.

The function must return:

  • {:ok, result} - stores result under the step name
  • {:error, reason} - stops the flow with an error

Raises ArgumentError if the step name has already been used.

Examples

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:value, 2)
  |> ExEssentials.Core.Runner.run(:square, fn %{value: v} -> {:ok, v * v} end)

ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 2, square: 4}}

run_async(runner, step_name, function)

@spec run_async(
  runner :: t(),
  step_name :: atom(),
  function :: (changes :: map() ->
                 {:ok, result :: any()} | {:error, reason :: any()})
) :: t()

Registers an asynchronous step.

Asynchronous steps are executed concurrently during finish/1. Their results are merged into changes under their step names.

Raises ArgumentError if the step name has already been used.

Examples

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:value, 2)
  |> ExEssentials.Core.Runner.run_async(:double, fn %{value: v} -> {:ok, v * 2} end)
  |> ExEssentials.Core.Runner.run(:sum, fn %{value: v, double: d} -> {:ok, v + d} end)

ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{value: 2, double: 4, sum: 6}}

switch(runner, selector)

@spec switch(
  runner :: t(),
  selector :: (changes :: map() -> (runner :: t() -> t()))
) :: t()

Selects and applies a continuation at runtime during finish/1, based on the current changes map.

The selector must return a function that receives the runner.

Examples

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:status, :settled)
  |> ExEssentials.Core.Runner.switch(fn
    %{status: :settled} -> fn r -> ExEssentials.Core.Runner.put(r, :final, :ok) end
    _ -> fn r -> ExEssentials.Core.Runner.put(r, :final, :error) end
  end)

ExEssentials.Core.Runner.finish(runner)
#=> {:ok, %{status: :settled, final: :ok}}

then(runner, fun)

@spec then(
  runner :: t(),
  function :: (runner :: t() -> t())
) :: t()

Applies a continuation function to the runner.

This is a convenience for composing flow-building functions.

Examples

runner =
  ExEssentials.Core.Runner.new()
  |> ExEssentials.Core.Runner.put(:a, 1)
  |> ExEssentials.Core.Runner.then(fn r -> ExEssentials.Core.Runner.put(r, :b, 2) end)

runner.changes
#=> %{a: 1, b: 2}