Opus.Pipeline (Opus v0.8.4) View Source

Defines a pipeline.

A pipeline defines a single entry point function to start running the defined stages.

A simple pipeline module can be defined as:

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step :to_integer, with: &:erlang.binary_to_integer/1
  step :double, with: & &1 * 2
end

# Invoke it with:
ArithmeticPipeline.call "42"
# => {:ok, 84}

The pipeline can be run calling a call/1 function which is defined by Opus.Pipeline. Pipelines are intended to have a single parameter and always return a tagged tuple {:ok, value} | {:error, error}. A stage returning {:error, error} halts the pipeline. The error value is an Opus.PipelineError struct which contains useful information to detect where the error was caused and why.

Stage Definition

The following types of stages can be defined:

Stage Options

  • :with: The function to call to fulfill this stage. It can be an Atom referring to a public function of the module, an anonymous function or a function reference.
  • :if: Makes a stage conditional, it can be either an Atom referring to a public function of the module, an anonymous function or a function reference. For the stage to be executed, the condition must return true. When the stage is skipped, the input is forwarded to the next step if there's one.
  • :unless: The opposite of the :if option, executes the step only when the callback function returns false.
  • :raise: A list of exceptions to not rescue. Defaults to false which converts all exceptions to {:error, %Opus.PipelineError{}} values halting the pipeline.
  • :error_message: An error message to replace the original error when a stage fails. It can be a String or Atom, which will be used directly in place of the original message, or an anonymous function, which receives the input of the failed stage and must return the error message to be used.
  • :retry_times: How many times to retry a failing stage, before halting the pipeline.
  • :retry_backoff: A backoff function to provide delay values for retries. It can be an Atom referring to a public function in the module, an anonymous function or a function reference. It must return an Enumerable.t yielding at least as many numbers as the retry_times.
  • :instrument?: A boolean which defaults to true. Set to false to skip instrumentation for a stage.

Exception Handling

All exceptions are converted to {:error, exception} tuples by default. You may let a stage raise an exception by providing the :raise option to a stage as follows:

defmodule ArithmeticPipeline do
  use Opus.Pipeline

  step :to_integer, &:erlang.binary_to_integer/1, raise: [ArgumentError]
end

Stage Filtering

You can select the stages of a pipeline to run using call/2 with the :except and :only options. Example:

# Runs only the stage with the :validate_params name
CreateUserPipeline.call(params, only: [:validate_params]
# Runs all the stages except the selected ones
CreateUserPipeline.call(params, except: :send_notification)

Link to this section Summary

Link to this section Types

Specs

opts() :: [only: [atom()], except: [atom()]]

Specs

result() :: {:ok, any()} | {:error, Opus.PipelineError.t()}

Link to this section Functions

Link to this macro

check(name, opts \\ [])

View Source (macro)
Link to this macro

link(name, opts \\ [])

View Source (macro)
Link to this macro

skip(name, opts \\ [])

View Source (macro)
Link to this macro

step(name, opts \\ [])

View Source (macro)
Link to this macro

tee(name, opts \\ [])

View Source (macro)