Opus.Pipeline (Opus v0.8.4) View Source
Defines a pipeline.
A pipeline defines a single entry point function to start running the defined stages.
A simple pipeline module can be defined as:
defmodule ArithmeticPipeline do
use Opus.Pipeline
step :to_integer, with: &:erlang.binary_to_integer/1
step :double, with: & &1 * 2
end
# Invoke it with:
ArithmeticPipeline.call "42"
# => {:ok, 84}
The pipeline can be run calling a call/1
function which is defined by Opus.Pipeline
.
Pipelines are intended to have a single parameter and always return a tagged tuple {:ok, value} | {:error, error}
.
A stage returning {:error, error}
halts the pipeline. The error value is an Opus.PipelineError
struct which
contains useful information to detect where the error was caused and why.
Stage Definition
The following types of stages can be defined:
step
- seeOpus.Pipeline.Stage.Step
tee
- seeOpus.Pipeline.Stage.Tee
check
- seeOpus.Pipeline.Stage.Check
link
- seeOpus.Pipeline.Stage.Link
skip
- seeOpus.Pipeline.Stage.Skip
Stage Options
:with
: The function to call to fulfill this stage. It can be an Atom referring to a public function of the module, an anonymous function or a function reference.:if
: Makes a stage conditional, it can be either an Atom referring to a public function of the module, an anonymous function or a function reference. For the stage to be executed, the condition must returntrue
. When the stage is skipped, the input is forwarded to the next step if there's one.:unless
: The opposite of the:if
option, executes the step only when the callback function returnsfalse
.:raise
: A list of exceptions to not rescue. Defaults tofalse
which converts all exceptions to{:error, %Opus.PipelineError{}}
values halting the pipeline.:error_message
: An error message to replace the original error when a stage fails. It can be a String or Atom, which will be used directly in place of the original message, or an anonymous function, which receives the input of the failed stage and must return the error message to be used.:retry_times
: How many times to retry a failing stage, before halting the pipeline.:retry_backoff
: A backoff function to provide delay values for retries. It can be an Atom referring to a public function in the module, an anonymous function or a function reference. It must return anEnumerable.t
yielding at least as many numbers as theretry_times
.:instrument?
: A boolean which defaults totrue
. Set tofalse
to skip instrumentation for a stage.
Exception Handling
All exceptions are converted to {:error, exception}
tuples by default.
You may let a stage raise an exception by providing the :raise
option to a stage as follows:
defmodule ArithmeticPipeline do
use Opus.Pipeline
step :to_integer, &:erlang.binary_to_integer/1, raise: [ArgumentError]
end
Stage Filtering
You can select the stages of a pipeline to run using call/2
with the :except
and :only
options.
Example:
# Runs only the stage with the :validate_params name
CreateUserPipeline.call(params, only: [:validate_params]
# Runs all the stages except the selected ones
CreateUserPipeline.call(params, except: :send_notification)
Link to this section Summary
Link to this section Types
Specs
Specs
result() :: {:ok, any()} | {:error, Opus.PipelineError.t()}