View Source KafkaEx.Support.Retry (kafka_ex v1.0.0-rc.1)

Unified retry logic with exponential backoff for KafkaEx operations.

This module provides:

  • Exponential backoff calculation with optional cap
  • Generic retry wrapper function
  • Error classifiers for common Kafka error patterns

Usage

# Simple retry with defaults (3 retries, 100ms base delay)
Retry.with_retry(fn -> some_operation() end)

# Custom retry configuration
Retry.with_retry(
  fn -> some_operation() end,
  max_retries: 5,
  base_delay_ms: 200,
  max_delay_ms: 5000,
  retryable?: &Retry.transient_error?/1
)

# Just calculate backoff delay
delay = Retry.backoff_delay(attempt, 100, 5000)

Summary

Functions

Calculate exponential backoff delay.

Check if error is safe to retry for commit operations.

Check if error is safe to retry for consumer group operations.

Check if error is a coordinator-related error that may resolve with retry.

Check if error is a leadership-related error requiring metadata refresh.

Check if error is safe to retry for produce operations.

Check if error is a transient error that may resolve with retry.

Execute a function with retry and exponential backoff.

Types

@type error() :: atom() | term()
@type retry_opts() :: [
  max_retries: non_neg_integer(),
  base_delay_ms: non_neg_integer(),
  max_delay_ms: non_neg_integer() | :infinity,
  retryable?: (error() -> boolean()),
  on_retry: (error(), non_neg_integer(), non_neg_integer() -> :ok) | nil
]
@type retry_result() :: {:ok, term()} | {:error, error()}

Functions

Link to this function

backoff_delay(attempt, base_ms, max_ms \\ :infinity)

View Source
@spec backoff_delay(
  non_neg_integer(),
  non_neg_integer(),
  non_neg_integer() | :infinity
) ::
  non_neg_integer()

Calculate exponential backoff delay.

Parameters

  • attempt - Zero-based attempt number (0 = first retry)
  • base_ms - Base delay in milliseconds
  • max_ms - Maximum delay cap (default: :infinity)

Examples

iex> Retry.backoff_delay(0, 100)
100

iex> Retry.backoff_delay(1, 100)
200

iex> Retry.backoff_delay(2, 100)
400

iex> Retry.backoff_delay(10, 100, 5000)
5000
Link to this function

commit_retryable?(error)

View Source
@spec commit_retryable?(error()) :: boolean()

Check if error is safe to retry for commit operations.

Commits are idempotent so we can safely retry on transient errors.

Link to this function

consumer_group_retryable?(error)

View Source
@spec consumer_group_retryable?(error()) :: boolean()

Check if error is safe to retry for consumer group operations.

Following Java client pattern (KAFKA-6829): includes coordinator errors, transient errors, and UNKNOWN_TOPIC_OR_PARTITION.

Link to this function

coordinator_error?(arg1)

View Source
@spec coordinator_error?(error()) :: boolean()

Check if error is a coordinator-related error that may resolve with retry.

These errors typically occur during consumer group operations when the coordinator is unavailable or changing.

@spec leadership_error?(error()) :: boolean()

Check if error is a leadership-related error requiring metadata refresh.

These errors indicate the client has stale partition leadership information. Safe to retry for ALL request types including produce.

Link to this function

produce_retryable?(error)

View Source
@spec produce_retryable?(error()) :: boolean()

Check if error is safe to retry for produce operations.

Produce retries are only safe for leadership errors where we know the message wasn't written. Timeout errors are NOT safe because the message may have been written but the response lost.

Note: For truly idempotent produces, enable enable.idempotence=true on the Kafka producer (requires Kafka 0.11+).

@spec transient_error?(error()) :: boolean()

Check if error is a transient error that may resolve with retry.

Includes timeouts, parse errors, connection issues, and coordinator errors.

Link to this function

with_retry(fun, opts \\ [])

View Source
@spec with_retry((-> retry_result()), retry_opts()) :: retry_result()

Execute a function with retry and exponential backoff.

Options

  • :max_retries - Maximum number of retry attempts (default: 3)
  • :base_delay_ms - Base delay for exponential backoff (default: 100)
  • :max_delay_ms - Maximum delay cap (default: :infinity)
  • :retryable? - Function to determine if error is retryable (default: always true)
  • :on_retry - Optional callback (error, attempt, delay) -> :ok for logging

Returns

  • {:ok, result} on success
  • {:error, last_error} after all retries exhausted or non-retryable error

Examples

# Retry any error up to 3 times
Retry.with_retry(fn -> fetch_data() end)

# Only retry specific errors
Retry.with_retry(
  fn -> commit_offset() end,
  retryable?: &Retry.coordinator_error?/1
)