ExResilience.Pipeline (ex_resilience v0.4.0)

Copy Markdown View Source

Composable pipeline of resilience layers.

A pipeline is an ordered list of layers that wrap a function call inside-out. The first layer added is the outermost wrapper.

Execution Order

Given layers [bulkhead, circuit_breaker, retry], execution is:

Bulkhead -> Circuit Breaker -> Retry -> function

Each layer wraps the next, so the function sees retry first, then circuit breaker, then bulkhead.

Examples

pipeline = ExResilience.Pipeline.new(:my_pipeline)
|> ExResilience.Pipeline.add(:bulkhead, max_concurrent: 5)
|> ExResilience.Pipeline.add(:circuit_breaker, failure_threshold: 3)
|> ExResilience.Pipeline.add(:retry, max_attempts: 2)

Summary

Functions

Generates a supervised pipeline module.

Adds a layer to the pipeline.

Executes fun through the pipeline layers.

Returns the child process name for a layer in this pipeline.

Creates a new pipeline with the given name.

Starts all GenServer-backed layers in the pipeline.

Sets the error classifier for all layers in the pipeline that support it.

Types

layer()

@type layer() :: {atom(), keyword()}

t()

@type t() :: %ExResilience.Pipeline{layers: [layer()], name: atom()}

Functions

__using__(opts)

(macro)

Generates a supervised pipeline module.

When a module does use ExResilience.Pipeline, it gets:

  • child_spec/1 -- returns a supervisor child spec
  • start_link/1 -- starts the pipeline supervisor
  • pipeline/0 -- returns the built %Pipeline{} struct
  • call/1 -- executes a function through the pipeline
  • call/2 -- executes a function through the pipeline (second arg reserved for future options)

Options

  • :name -- pipeline name atom. Defaults to the module name converted to an atom.
  • Layer keys -- any of [:bulkhead, :circuit_breaker, :retry, :rate_limiter, :coalesce, :hedge, :chaos, :fallback, :cache]. Each takes a keyword list of options for that layer. Layers are added in the order specified.

Examples

defmodule MyApp.Resilience do
  use ExResilience.Pipeline,
    name: :my_service,
    bulkhead: [max_concurrent: 10],
    circuit_breaker: [failure_threshold: 5],
    retry: [max_attempts: 3]
end

# In your supervision tree:
children = [MyApp.Resilience]
Supervisor.start_link(children, strategy: :one_for_one)

# Then call through the pipeline:
MyApp.Resilience.call(fn -> HTTPClient.get(url) end)

add(pipeline, layer, opts \\ [])

@spec add(t(), atom(), keyword()) :: t()

Adds a layer to the pipeline.

Layers are executed in the order they are added (outermost first).

Supported Layers

call(pipeline, fun)

@spec call(t(), (-> term())) :: term()

Executes fun through the pipeline layers.

Layers must be started first via start/1 or individually.

child_name(pipeline_name, layer)

@spec child_name(atom(), atom()) :: atom()

Returns the child process name for a layer in this pipeline.

new(name)

@spec new(atom()) :: t()

Creates a new pipeline with the given name.

The name is used as a prefix for child process names and in telemetry metadata.

start(pipeline)

@spec start(t()) :: {:ok, [pid()]}

Starts all GenServer-backed layers in the pipeline.

Returns {:ok, pids} where pids is a list of started process pids. Layers that don't require a process (like retry) are skipped.

with_classifier(pipeline, classifier)

@spec with_classifier(t(), module()) :: t()

Sets the error classifier for all layers in the pipeline that support it.

Layers that already have an :error_classifier option are not overwritten. Only :circuit_breaker, :retry, and :fallback layers are affected.

Examples

pipeline = Pipeline.new(:my_pipe)
|> Pipeline.add(:circuit_breaker, failure_threshold: 3)
|> Pipeline.add(:retry, max_attempts: 2)
|> Pipeline.with_classifier(MyApp.Classifier)