Fault Tolerance
View SourceSnakepit provides fault tolerance mechanisms to build resilient applications that gracefully handle worker failures and transient errors.
Overview
| Component | Purpose |
|---|---|
| CircuitBreaker | Prevents cascading failures by stopping calls to failing services |
| RetryPolicy | Configurable retry with exponential backoff and jitter |
| HealthMonitor | Tracks worker crashes within a rolling time window |
| Executor | Convenience wrappers combining fault tolerance patterns |
Circuit Breaker
The Snakepit.CircuitBreaker implements the circuit breaker pattern to prevent cascading failures.
States
| State | Description |
|---|---|
:closed | Normal operation. All calls allowed. |
:open | Threshold exceeded. Calls rejected with {:error, :circuit_open}. |
:half_open | Testing recovery. Limited probe calls allowed. |
Configuration
| Option | Default | Description |
|---|---|---|
:name | nil | GenServer registration name |
:failure_threshold | 5 | Failures before opening |
:reset_timeout_ms | 30000 | Time before half-open |
:half_open_max_calls | 1 | Probe calls in half-open |
API
# Start circuit breaker
{:ok, cb} = Snakepit.CircuitBreaker.start_link(
name: :my_cb, failure_threshold: 5, reset_timeout_ms: 30_000
)
# Execute through circuit breaker
case Snakepit.CircuitBreaker.call(cb, fn -> risky_operation() end) do
{:ok, result} -> handle_success(result)
{:error, :circuit_open} -> handle_circuit_open()
{:error, reason} -> handle_error(reason)
end
# Check state and stats
state = Snakepit.CircuitBreaker.state(cb) # => :closed | :open | :half_open
stats = Snakepit.CircuitBreaker.stats(cb)
# => %{state: :closed, failure_count: 2, success_count: 150, failure_threshold: 5}
# Manual reset
Snakepit.CircuitBreaker.reset(cb)Example
defmodule MyApp.ExternalService do
alias Snakepit.CircuitBreaker
def start_link do
CircuitBreaker.start_link(name: :api_cb, failure_threshold: 5, reset_timeout_ms: 30_000)
end
def call_api(params) do
case CircuitBreaker.call(:api_cb, fn -> do_api_call(params) end) do
{:ok, result} -> {:ok, result}
{:error, :circuit_open} -> {:ok, get_cached_result(params)}
{:error, reason} -> {:error, reason}
end
end
endRetry Policies
The Snakepit.RetryPolicy provides configurable retry behavior with exponential backoff.
RetryPolicy.new/1
| Option | Default | Description |
|---|---|---|
:max_attempts | 3 | Maximum retry attempts |
:backoff_ms | [100, 200, 400, 800, 1600] | Backoff delays per attempt |
:base_backoff_ms | 100 | Base delay for exponential calculation |
:backoff_multiplier | 2.0 | Multiplier for exponential backoff |
:max_backoff_ms | 30000 | Maximum backoff cap |
:jitter | false | Enable random jitter |
:jitter_factor | 0.25 | Jitter range as fraction of delay |
:retriable_errors | [:timeout, :unavailable, :connection_refused, :worker_crash] | Errors to retry |
API
policy = Snakepit.RetryPolicy.new(
max_attempts: 3, backoff_ms: [100, 200, 400], jitter: true,
retriable_errors: [:timeout, :unavailable]
)
Snakepit.RetryPolicy.should_retry?(policy, attempt) # More retries available?
Snakepit.RetryPolicy.retry_for_error?(policy, error) # Is error retriable?
Snakepit.RetryPolicy.backoff_for_attempt(policy, attempt) # Get delay for attemptExponential Backoff with Jitter
With jitter enabled: delay = base_delay +/- (base_delay * jitter_factor)
For 100ms delay with 0.25 jitter: actual delay is 75-125ms. This prevents "thundering herd" problems.
Example
defmodule MyApp.ResilientClient do
alias Snakepit.RetryPolicy
def fetch_with_retry(url) do
policy = RetryPolicy.new(max_attempts: 3, backoff_ms: [100, 200, 400], jitter: true)
do_fetch(url, policy, 1)
end
defp do_fetch(url, policy, attempt) do
case HTTPoison.get(url) do
{:ok, response} -> {:ok, response}
{:error, _} = error ->
if RetryPolicy.should_retry?(policy, attempt) and RetryPolicy.retry_for_error?(policy, error) do
Process.sleep(RetryPolicy.backoff_for_attempt(policy, attempt))
do_fetch(url, policy, attempt + 1)
else
error
end
end
end
endHealth Monitoring
The Snakepit.HealthMonitor tracks crashes within a rolling window.
Configuration
| Option | Default | Description |
|---|---|---|
:name | required | GenServer registration name |
:pool | required | Pool name to monitor |
:max_crashes | 10 | Max crashes before unhealthy |
:crash_window_ms | 60000 | Rolling window duration |
:check_interval_ms | 30000 | Cleanup interval |
API
{:ok, hm} = Snakepit.HealthMonitor.start_link(
name: :pool_health, pool: :default, max_crashes: 10, crash_window_ms: 60_000
)
Snakepit.HealthMonitor.record_crash(hm, "worker_1", %{reason: :segfault})
Snakepit.HealthMonitor.healthy?(hm) # => true | false
stats = Snakepit.HealthMonitor.stats(hm)
# => %{pool: :default, total_crashes: 15, crashes_in_window: 3, is_healthy: true}
health = Snakepit.HealthMonitor.worker_health(hm, "worker_1")
# => %{healthy: false, crash_count: 5, last_crash_time: 1703836800000}Example
defmodule MyApp.HealthAwarePool do
alias Snakepit.HealthMonitor
def execute_with_health_check(pool_name, fun) do
hm = :"#{pool_name}_health"
unless HealthMonitor.healthy?(hm), do: Logger.warning("Pool unhealthy")
case fun.() do
{:error, {:worker_crash, info}} = error ->
HealthMonitor.record_crash(hm, info.worker_id, %{reason: info.reason})
error
result -> result
end
end
endExecutor Helpers
The Snakepit.Executor provides convenience wrappers.
execute_with_timeout/2
result = Snakepit.Executor.execute_with_timeout(fn -> slow_op() end, timeout_ms: 5000)
# => {:ok, value} | {:error, :timeout}execute_with_retry/2
result = Snakepit.Executor.execute_with_retry(fn -> api_call() end,
max_attempts: 3, backoff_ms: [100, 200, 400], jitter: true
)execute_with_protection/3
Combines retry with circuit breaker for defense in depth.
{:ok, cb} = Snakepit.CircuitBreaker.start_link(failure_threshold: 5)
result = Snakepit.Executor.execute_with_protection(cb, fn -> risky_op() end,
max_attempts: 3, backoff_ms: [100, 200, 400]
)execute_batch/2
Executes multiple functions in parallel.
functions = [fn -> fetch(1) end, fn -> fetch(2) end, fn -> fetch(3) end]
results = Snakepit.Executor.execute_batch(functions, timeout_ms: 10_000, max_concurrency: 5)
# => [{:ok, r1}, {:ok, r2}, {:error, :not_found}]Combined Protection Example
defmodule MyApp.ResilientService do
alias Snakepit.{CircuitBreaker, Executor}
def start_link do
CircuitBreaker.start_link(name: :service_cb, failure_threshold: 5, reset_timeout_ms: 30_000)
end
def call_service(params) do
Executor.execute_with_protection(:service_cb,
fn -> Snakepit.execute("service", params, timeout: 5000) end,
max_attempts: 3, backoff_ms: [100, 500, 1000], jitter: true
)
end
def health_status do
%{
state: CircuitBreaker.state(:service_cb),
stats: CircuitBreaker.stats(:service_cb)
}
end
endTelemetry Events
| Event | Measurements | Metadata |
|---|---|---|
[:snakepit, :circuit_breaker, :opened] | %{failure_count: n} | %{pool: name} |
[:snakepit, :circuit_breaker, :closed] | %{} | %{pool: name} |
[:snakepit, :retry, :attempt] | %{attempt: n, delay_ms: ms} | %{} |
[:snakepit, :retry, :exhausted] | %{attempts: n} | %{last_error: error} |
[:snakepit, :worker, :crash] | %{} | %{pool: name, worker_id: id} |