Extep (Extep v0.3.1)
View SourceA tiny and friendly step runner for Elixir pipelines.
Extep is a simple and dependency-free utility that helps you compose Elixir pipelines using a shared context. It's useful for building multi-step workflows that can gracefully halt or error along the way. Extep is an implementation of the Railway-oriented programming and was inspired by Ecto.Multi and Sage.
Example
iex> params = %{user_id: 1, plan: "super-power-plus"}
iex> Extep.new(%{params: params})
...> |> Extep.run(:params, &validate_params/1)
...> |> Extep.async(:user, &fetch_user/1)
...> |> Extep.async(:items, &fetch_items/1)
...> |> Extep.return(&create_subscription/1, label_error: true)
{:ok, %{id: 123, object: "subscription", user_id: 1, items: [%{code: "item1"}, %{code: "item2"}]}}
Summary
Functions
Asynchronously runs a checker function on the context without modifying it.
Asynchronously runs a mutator function that updates the context under the given key.
Awaits completion of all pending async tasks and processes their results.
Creates a new Extep struct with an initial context map.
Returns a final result from your pipeline with optional configuration.
Runs a checker function on the context without modifying it.
Runs a mutator function that updates the context with the result under the given key.
Types
Functions
@spec async(t(), context_checker_fun()) :: t()
Asynchronously runs a checker function on the context without modifying it.
This is the async version of run/2. It starts a task to execute the checker function
in parallel while the pipeline continues. The task result is processed when await/1
is called. Like run/2, the context remains unchanged regardless of the function's
return value.
Function Return Values
The async checker function must return the same values as run/2:
:ok- Pipeline continues with unchanged context{:ok, value}- Pipeline continues with unchanged context (value is ignored){:halt, reason}- Pipeline stops with:haltedstatus, reason stored in message{:error, reason}- Pipeline stops with:errorstatus, reason stored in message
Examples
iex> Extep.new(%{user_id: 1})
...> |> Extep.async(fn _ctx -> :ok end)
...> |> Extep.async(fn _ctx -> {:ok, "validated"} end)
...> |> Extep.await()
%Extep{status: :ok, context: %{user_id: 1}, message: nil}See run/2 for detailed behavior and error handling patterns.
@spec async(t(), context_key(), context_mutator_fun()) :: t()
Asynchronously runs a mutator function that updates the context under the given key.
This is the async version of run/3. It starts a task to execute the mutator function
in parallel while the pipeline continues. The task result is processed when await/1
is called, and the context will be updated with the result under the specified key.
Function Return Values
The async mutator function must return the same values as run/3:
{:ok, value}- Pipeline continues,valueis stored undercontext_key{:halt, reason}- Pipeline stops with:haltedstatus, reason stored in message{:error, reason}- Pipeline stops with:errorstatus, reason stored in message
Examples
iex> Extep.new(%{user_id: 1})
...> |> Extep.async(:user, fn ctx -> {:ok, %{id: ctx.user_id, name: "Alice"}} end)
...> |> Extep.async(:items, fn _ctx -> {:ok, [%{code: "item1"}]} end)
...> |> Extep.await()
%Extep{status: :ok, context: %{user_id: 1, user: %{id: 1, name: "Alice"}, items: [%{code: "item1"}]}, message: nil}See run/3 for detailed behavior and error handling patterns.
Awaits completion of all pending async tasks and processes their results.
This function waits for all tasks started by async/2 and async/3 to complete,
then processes their results in the order they were started. If any task returns
an error or halt, the pipeline stops and subsequent task results are ignored.
Options
:timeout— how long to wait (in ms) for all tasks to reply. Defaults to10_000. You can also pass:infinity.This function uses
Task.await_many/2. If the timeout elapses before all tasks reply, the caller exits with{:timeout, {Task, :await_many, ...}}.
Behavior
- If the extep has no pending tasks, returns the extep unchanged
- If the extep status is already
:haltedor:error, shuts down all tasks and returns the extep - For
:okstatus, waits for all tasks and merges their context changes - Stops at the first task that returns an error or halt
Examples
iex> Extep.new(%{user_id: 1})
...> |> Extep.async(:user, fn ctx -> {:ok, %{id: ctx.user_id, name: "Alice"}} end)
...> |> Extep.async(:items, fn _ctx -> {:ok, [%{code: "item1"}]} end)
...> |> Extep.await(timeout: 10)
%Extep{status: :ok, context: %{user_id: 1, user: %{id: 1, name: "Alice"}, items: [%{code: "item1"}]}, message: nil}Note: await/1 is automatically called by run/2, run/3, and return/2 when there are pending tasks.
@spec new() :: t()
Creates a new Extep struct with an empty context.
Examples
iex> Extep.new()
%Extep{status: :ok, context: %{}, message: nil}
Creates a new Extep struct with an initial context map.
Examples
iex> Extep.new(%{foo: "bar"})
%Extep{status: :ok, context: %{foo: "bar"}, message: nil}
@spec return(t(), context_mutator_fun() | context_key(), opts()) :: any()
Returns a final result from your pipeline with optional configuration.
This function extracts the final result from your pipeline based on the pipeline's status:
:okstatus: Executes the given function or returns the context value for the given key:haltedstatus: Returns the halt message (ignores all options):errorstatus: Returns an error tuple, optionally with or without context labels
Parameters
extep- The Extep struct containing your pipeline statefun_or_key- Either a function(context -> result)or an atom key to extract from contextopts- Keyword list of options (defaults to[])
Options
label_error(boolean, default:false):- When
false: Returns clean error messages like{:error, "message"} - When
true: Returns labeled errors like{:error, %{fun_name_or_step_name: "message"}}
- When
Examples
Successful Pipeline with Function
When the pipeline succeeds, the function is executed with the context:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 1} end)
...> |> Extep.return(fn ctx -> {:ok, ctx.bar + 2} end)
{:ok, 4}Successful Pipeline with Context Key
When the pipeline succeeds, the value for the given key is returned:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 1} end)
...> |> Extep.run(:baz, fn ctx -> {:ok, ctx.bar + 2} end)
...> |> Extep.return(:bar)
{:ok, 2}Error Pipeline with Default Behavior (Clean Errors)
When the pipeline errors and no options are provided, returns clean error messages:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn _ctx -> {:error, "error message"} end)
...> |> Extep.return(:foo)
{:error, "error message"}Error Pipeline with Function and Labeled Errors
When label_error: true is set and a named function causes the error,
the error is labeled with the function name:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 1} end)
...> |> Extep.return(&return_error_tuple/1, label_error: true)
{:error, %{return_error_tuple: "error message"}}Error Pipeline with Anonymous Function and Labeled Errors
When label_error: true is set and an anonymous function causes the error,
the error is labeled with :no_label:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 1} end)
...> |> Extep.return(fn _ctx -> {:error, "error message"} end, label_error: true)
{:error, %{no_label: "error message"}}Error Pipeline with Context Key and Labeled Errors
When label_error: true is set and a step with a context key caused the error,
the error is labeled with that step's name:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn _ctx -> {:error, "error message"} end)
...> |> Extep.return(:foo, label_error: true)
{:error, %{bar: "error message"}}Halted Pipeline Returning Function Result
When a function in the pipeline halts with a result, that result is returned directly:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 1} end)
...> |> Extep.return(fn _ctx -> {:halt, {:ok, "halt message"}} end)
{:ok, "halt message"}Halted Pipeline Returning Custom Tuple
Halt messages can be any term, not just ok tuples:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 1} end)
...> |> Extep.return(fn _ctx -> {:halt, {:cancel, "cancel message"}} end)
{:cancel, "cancel message"}Previously Halted Pipeline
When the pipeline was already halted in a previous step, the halt message is returned regardless of what you're trying to return:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 1} end)
...> |> Extep.run(:baz, fn _ctx -> {:halt, {:ok, "halt message"}} end)
...> |> Extep.return(:bar)
{:ok, "halt message"}
@spec run(t(), context_checker_fun()) :: t()
Runs a checker function on the context without modifying it.
This function is useful for validation, checking conditions, or performing side effects without changing the pipeline context. The context remains unchanged regardless of the function's return value.
Function Return Values
The checker function must return one of these values:
:ok- Pipeline continues with unchanged context{:ok, value}- Pipeline continues with unchanged context (value is ignored){:halt, reason}- Pipeline stops with:haltedstatus, reason stored in message{:error, reason}- Pipeline stops with:errorstatus, reason stored in message
Error Message Labeling
When errors occur, the message is stored differently based on the function type:
- Named functions: Error labeled with function name (e.g.,
%{validate_user: "error"}) - Anonymous functions: Error labeled with
:no_label(e.g.,%{no_label: "error"})
Examples
Successful Validation - Simple OK
Basic validation that passes without returning a value:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(fn _ctx -> :ok end)
%Extep{status: :ok, context: %{foo: 1}, message: nil}Successful Validation - OK tuple
Validation that passes and returns an ok tuple (value is ignored, context unchanged):
iex> Extep.new(%{foo: 1})
...> |> Extep.run(fn _ctx -> {:ok, "is valid"} end)
%Extep{status: :ok, context: %{foo: 1}, message: nil}Error with Named Function
When a named function returns an error tuple, the error is labeled with the function name:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(&return_error_tuple/1)
%Extep{status: :error, context: %{foo: 1}, message: %{return_error_tuple: "error message"}}Error with Anonymous Function
When an anonymous function returns an error tuple, the error is labeled with :no_label:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(fn _ctx -> {:error, "error message"} end)
%Extep{status: :error, context: %{foo: 1}, message: %{no_label: "error message"}}Halt Example
When a function returns a halt tuple, the pipeline stops and the halt reason is stored:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(fn _ctx -> {:halt, {:ok, "halt message"}} end)
%Extep{status: :halted, context: %{foo: 1}, message: {:ok, "halt message"}}It's useful when you want to stop the pipeline and return a specific message, like a cancel tuple for an Oban job, for example:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(fn _ctx -> {:halt, {:cancel, "cancel message"}} end)
...> |> Extep.return(fn ctx -> {:ok, ctx.bar + 2} end)
{:cancel, "cancel message"}Pipeline Behavior
- If the Extep struct already has
:haltedor:errorstatus, this function does nothing - Only processes when the current status is
:ok - The context is never modified by this function
@spec run(t(), context_key(), context_mutator_fun()) :: t()
Runs a mutator function that updates the context with the result under the given key.
This function executes a mutator function and stores its result in the context under the
specified key. Unlike run/2, this function is designed to modify the context by adding
or updating values.
Parameters
extep- The current Extep structcontext_key- An atom representing the key where the result will be storedfun- A function that takes the context and returns a result
Function Return Values
The mutator function must return one of these values:
{:ok, value}- Pipeline continues,valueis stored undercontext_key{:halt, reason}- Pipeline stops with:haltedstatus, reason stored in message{:error, reason}- Pipeline stops with:errorstatus, reason stored in message
Note: Unlike run/2, this function does NOT accept plain :ok returns. It requires
a tuple with a value to store in the context.
Error Message Labeling
When errors occur, the message is stored differently based on the function type:
- Named functions: Error labeled with function name (e.g.,
%{fetch_user: "error"}) - Anonymous functions: Error labeled with the context key (e.g.,
%{user: "error"})
Examples
Successful Context Updates
This example shows how values are stored and can be used by subsequent steps:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:foo, fn ctx -> {:ok, ctx.foo + 1} end)
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 2} end)
%Extep{status: :ok, context: %{foo: 2, bar: 4}, message: nil}Explanation:
- First
run/3updates:foofrom 1 to 2 - Second
run/3adds:barwith value 4 (using the updated foo value: 2 + 2) - Both operations succeed, context now contains both updated values
Error with Named Function - Pipeline Stops
When a named function returns an error, the pipeline stops and subsequent steps are skipped:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:foo, &return_error_tuple/1)
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 2} end)
%Extep{status: :error, context: %{foo: 1}, message: %{return_error_tuple: "error message"}}Explanation:
- First
run/3callsreturn_error_tuple/1which returns{:error, "error message"} - Pipeline status becomes
:error, original context is preserved - Error message is labeled with function name (
return_error_tuple) - Second
run/3is skipped because pipeline is already in error state
Error with Anonymous Function - Pipeline Stops
When an anonymous function returns an error, the error is labeled with the context key:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:foo, fn _ctx -> {:error, "error message"} end)
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 2} end)
%Extep{status: :error, context: %{foo: 1}, message: %{foo: "error message"}}Explanation:
- First
run/3tries to update:foobut returns an error - Pipeline status becomes
:error, original context is preserved - Error message is labeled with the context key (
:foo) - Second
run/3is skipped because pipeline is already in error state
Halt with Custom Message - Pipeline Stops
When a function returns halt, the pipeline stops gracefully with the halt message:
iex> Extep.new(%{foo: 1})
...> |> Extep.run(:foo, fn _ctx -> {:halt, {:ok, "halt message"}} end)
...> |> Extep.run(:bar, fn ctx -> {:ok, ctx.foo + 2} end)
%Extep{status: :halted, context: %{foo: 1}, message: {:ok, "halt message"}}Explanation:
- First
run/3returns a halt tuple with{:ok, "halt message"} - Pipeline status becomes
:halted, original context is preserved - Halt message can be any term (here it's
{:ok, "halt message"}) - Second
run/3is skipped because pipeline is already halted
Pipeline Behavior
- If the Extep struct already has
:haltedor:errorstatus, this function does nothing - Only processes when the current status is
:ok - On success, the context is updated with the new value under the specified key
- On failure or halt, the context remains unchanged
- Context keys can be new (adding) or existing (updating)