WeaviateEx.Batch.RetryQueue (WeaviateEx v0.7.4)

View Source

Manages automatic re-queuing of failed batch objects.

This GenServer tracks failed objects and provides automatic retry with exponential backoff. Objects are tracked by UUID and dropped after exceeding the maximum retry count.

Features

  • Track retry count per object UUID
  • Exponential backoff between retries with jitter
  • Drop objects after max_retries
  • Permanent failure callback for objects that exceed retry limit
  • Drain queue to get all pending retries

Examples

# Start the retry queue
{:ok, pid} = RetryQueue.start_link(
  client: client,
  max_retries: 3,
  base_delay_ms: 1000,
  on_permanent_failure: fn objects ->
    Logger.error("Permanent failures: #{length(objects)}")
  end
)

# Enqueue failed objects for retry
:ok = RetryQueue.enqueue_failed(pid, failed_objects)

# Get retry count for a specific UUID
count = RetryQueue.get_retry_count(pid, "uuid-123")

# Drain all queued objects (for manual processing)
{:ok, objects} = RetryQueue.drain(pid)

# Clear the queue
:ok = RetryQueue.clear(pid)

Summary

Functions

Calculate exponential backoff delay for a given attempt.

Calculate exponential backoff with jitter (±20%).

Returns a specification to start this module under a supervisor.

Clear all queued objects and reset retry counts.

Drain all queued objects for manual processing.

Enqueue failed objects for retry.

Get the retry count for a specific UUID.

Get the current state (for debugging/testing).

Check if an error message indicates a rate limit error.

Check if an error is retryable.

Get the current queue size.

Start the retry queue GenServer.

Types

failed_object()

@type failed_object() :: %{
  :uuid => String.t(),
  :properties => map(),
  :collection => String.t(),
  optional(:vector) => [float()],
  optional(:tenant) => String.t()
}

state()

@type state() :: %{
  client: map(),
  queue: :queue.queue(failed_object()),
  retry_counts: %{required(String.t()) => non_neg_integer()},
  max_retries: pos_integer(),
  base_delay_ms: pos_integer(),
  on_permanent_failure: ([failed_object()] -> any()) | nil
}

Functions

calculate_backoff(attempt, base_delay_ms)

@spec calculate_backoff(non_neg_integer(), pos_integer()) :: pos_integer()

Calculate exponential backoff delay for a given attempt.

Returns delay in milliseconds, capped at max_delay (60 seconds).

calculate_backoff_with_jitter(attempt, base_delay_ms)

@spec calculate_backoff_with_jitter(non_neg_integer(), pos_integer()) :: pos_integer()

Calculate exponential backoff with jitter (±20%).

Jitter helps prevent thundering herd when multiple retries happen simultaneously.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

clear(pid)

@spec clear(GenServer.server()) :: :ok

Clear all queued objects and reset retry counts.

drain(pid)

@spec drain(GenServer.server()) :: {:ok, [failed_object()]}

Drain all queued objects for manual processing.

Returns all pending objects and clears the queue.

enqueue_failed(pid, failed_objects)

@spec enqueue_failed(GenServer.server(), [failed_object()]) :: :ok

Enqueue failed objects for retry.

Objects are tracked by UUID. If an object has already been retried max_retries times, the on_permanent_failure callback is invoked and the object is dropped.

get_retry_count(pid, uuid)

@spec get_retry_count(GenServer.server(), String.t()) :: non_neg_integer()

Get the retry count for a specific UUID.

get_state(pid)

@spec get_state(GenServer.server()) :: map()

Get the current state (for debugging/testing).

rate_limit_error?(message)

@spec rate_limit_error?(String.t()) :: boolean()

Check if an error message indicates a rate limit error.

retryable_error?(arg1)

@spec retryable_error?(map()) :: boolean()

Check if an error is retryable.

Retryable errors include:

  • gRPC status codes: UNAVAILABLE, RESOURCE_EXHAUSTED, ABORTED, DEADLINE_EXCEEDED
  • Rate limit errors (detected by message patterns)

size(pid)

@spec size(GenServer.server()) :: non_neg_integer()

Get the current queue size.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start the retry queue GenServer.

Options

  • :client - WeaviateEx client (required)
  • :max_retries - Maximum retry attempts per object (default: 3)
  • :base_delay_ms - Base delay for exponential backoff (default: 1000)
  • :on_permanent_failure - Callback for objects that exceed max retries