View Source KafkaEx.Support.Retry (kafka_ex v1.0.0-rc.1)
Unified retry logic with exponential backoff for KafkaEx operations.
This module provides:
- Exponential backoff calculation with optional cap
- Generic retry wrapper function
- Error classifiers for common Kafka error patterns
Usage
# Simple retry with defaults (3 retries, 100ms base delay)
Retry.with_retry(fn -> some_operation() end)
# Custom retry configuration
Retry.with_retry(
fn -> some_operation() end,
max_retries: 5,
base_delay_ms: 200,
max_delay_ms: 5000,
retryable?: &Retry.transient_error?/1
)
# Just calculate backoff delay
delay = Retry.backoff_delay(attempt, 100, 5000)
Summary
Functions
Calculate exponential backoff delay.
Check if error is safe to retry for commit operations.
Check if error is safe to retry for consumer group operations.
Check if error is a coordinator-related error that may resolve with retry.
Check if error is a leadership-related error requiring metadata refresh.
Check if error is safe to retry for produce operations.
Check if error is a transient error that may resolve with retry.
Execute a function with retry and exponential backoff.
Types
@type retry_opts() :: [ max_retries: non_neg_integer(), base_delay_ms: non_neg_integer(), max_delay_ms: non_neg_integer() | :infinity, retryable?: (error() -> boolean()), on_retry: (error(), non_neg_integer(), non_neg_integer() -> :ok) | nil ]
Functions
@spec backoff_delay( non_neg_integer(), non_neg_integer(), non_neg_integer() | :infinity ) :: non_neg_integer()
Calculate exponential backoff delay.
Parameters
attempt- Zero-based attempt number (0 = first retry)base_ms- Base delay in millisecondsmax_ms- Maximum delay cap (default::infinity)
Examples
iex> Retry.backoff_delay(0, 100)
100
iex> Retry.backoff_delay(1, 100)
200
iex> Retry.backoff_delay(2, 100)
400
iex> Retry.backoff_delay(10, 100, 5000)
5000
Check if error is safe to retry for commit operations.
Commits are idempotent so we can safely retry on transient errors.
Check if error is safe to retry for consumer group operations.
Following Java client pattern (KAFKA-6829): includes coordinator errors, transient errors, and UNKNOWN_TOPIC_OR_PARTITION.
Check if error is a coordinator-related error that may resolve with retry.
These errors typically occur during consumer group operations when the coordinator is unavailable or changing.
Check if error is a leadership-related error requiring metadata refresh.
These errors indicate the client has stale partition leadership information. Safe to retry for ALL request types including produce.
Check if error is safe to retry for produce operations.
Produce retries are only safe for leadership errors where we know the message wasn't written. Timeout errors are NOT safe because the message may have been written but the response lost.
Note: For truly idempotent produces, enable enable.idempotence=true
on the Kafka producer (requires Kafka 0.11+).
Check if error is a transient error that may resolve with retry.
Includes timeouts, parse errors, connection issues, and coordinator errors.
@spec with_retry((-> retry_result()), retry_opts()) :: retry_result()
Execute a function with retry and exponential backoff.
Options
:max_retries- Maximum number of retry attempts (default: 3):base_delay_ms- Base delay for exponential backoff (default: 100):max_delay_ms- Maximum delay cap (default::infinity):retryable?- Function to determine if error is retryable (default: always true):on_retry- Optional callback(error, attempt, delay) -> :okfor logging
Returns
{:ok, result}on success{:error, last_error}after all retries exhausted or non-retryable error
Examples
# Retry any error up to 3 times
Retry.with_retry(fn -> fetch_data() end)
# Only retry specific errors
Retry.with_retry(
fn -> commit_offset() end,
retryable?: &Retry.coordinator_error?/1
)