WeaviateEx.Batch.RetryQueue (WeaviateEx v0.7.4)
View SourceManages automatic re-queuing of failed batch objects.
This GenServer tracks failed objects and provides automatic retry with exponential backoff. Objects are tracked by UUID and dropped after exceeding the maximum retry count.
Features
- Track retry count per object UUID
- Exponential backoff between retries with jitter
- Drop objects after max_retries
- Permanent failure callback for objects that exceed retry limit
- Drain queue to get all pending retries
Examples
# Start the retry queue
{:ok, pid} = RetryQueue.start_link(
client: client,
max_retries: 3,
base_delay_ms: 1000,
on_permanent_failure: fn objects ->
Logger.error("Permanent failures: #{length(objects)}")
end
)
# Enqueue failed objects for retry
:ok = RetryQueue.enqueue_failed(pid, failed_objects)
# Get retry count for a specific UUID
count = RetryQueue.get_retry_count(pid, "uuid-123")
# Drain all queued objects (for manual processing)
{:ok, objects} = RetryQueue.drain(pid)
# Clear the queue
:ok = RetryQueue.clear(pid)
Summary
Functions
Calculate exponential backoff delay for a given attempt.
Calculate exponential backoff with jitter (±20%).
Returns a specification to start this module under a supervisor.
Clear all queued objects and reset retry counts.
Drain all queued objects for manual processing.
Enqueue failed objects for retry.
Get the retry count for a specific UUID.
Get the current state (for debugging/testing).
Check if an error message indicates a rate limit error.
Check if an error is retryable.
Get the current queue size.
Start the retry queue GenServer.
Types
@type state() :: %{ client: map(), queue: :queue.queue(failed_object()), retry_counts: %{required(String.t()) => non_neg_integer()}, max_retries: pos_integer(), base_delay_ms: pos_integer(), on_permanent_failure: ([failed_object()] -> any()) | nil }
Functions
@spec calculate_backoff(non_neg_integer(), pos_integer()) :: pos_integer()
Calculate exponential backoff delay for a given attempt.
Returns delay in milliseconds, capped at max_delay (60 seconds).
@spec calculate_backoff_with_jitter(non_neg_integer(), pos_integer()) :: pos_integer()
Calculate exponential backoff with jitter (±20%).
Jitter helps prevent thundering herd when multiple retries happen simultaneously.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec clear(GenServer.server()) :: :ok
Clear all queued objects and reset retry counts.
@spec drain(GenServer.server()) :: {:ok, [failed_object()]}
Drain all queued objects for manual processing.
Returns all pending objects and clears the queue.
@spec enqueue_failed(GenServer.server(), [failed_object()]) :: :ok
Enqueue failed objects for retry.
Objects are tracked by UUID. If an object has already been retried
max_retries times, the on_permanent_failure callback is invoked
and the object is dropped.
@spec get_retry_count(GenServer.server(), String.t()) :: non_neg_integer()
Get the retry count for a specific UUID.
@spec get_state(GenServer.server()) :: map()
Get the current state (for debugging/testing).
Check if an error message indicates a rate limit error.
Check if an error is retryable.
Retryable errors include:
- gRPC status codes: UNAVAILABLE, RESOURCE_EXHAUSTED, ABORTED, DEADLINE_EXCEEDED
- Rate limit errors (detected by message patterns)
@spec size(GenServer.server()) :: non_neg_integer()
Get the current queue size.
@spec start_link(keyword()) :: GenServer.on_start()
Start the retry queue GenServer.
Options
:client- WeaviateEx client (required):max_retries- Maximum retry attempts per object (default: 3):base_delay_ms- Base delay for exponential backoff (default: 1000):on_permanent_failure- Callback for objects that exceed max retries