WeaviateEx.Batch.Queue (WeaviateEx v0.7.4)
View SourceQueue for managing batch operation objects with failure tracking and retry support.
Provides a structured way to manage objects during batch operations, including:
- FIFO pending queue for objects to be inserted
- Failed object tracking with retry counts
- Automatic re-queue with configurable max retries
Examples
# Create a queue and add objects
queue = Queue.new()
|> Queue.enqueue_many(objects)
# Process in batches
{batch, queue} = Queue.dequeue_batch(queue, 100)
# Track failures
queue = Queue.mark_failed(queue, failed_object, "Validation error")
# Re-queue failed objects for retry
queue = Queue.requeue_failed(queue, max_retries: 3)
Summary
Functions
Clears all failed objects.
Removes up to size objects from the pending queue.
Checks if the queue has no pending objects.
Adds an object to the pending queue.
Adds multiple objects to the pending queue.
Returns the number of failed objects.
Returns all failed objects.
Marks an object as failed with a reason.
Creates a new empty queue.
Returns the number of pending objects.
Moves failed objects back to the pending queue for retry.
Returns statistics about the queue.
Types
@type t() :: %WeaviateEx.Batch.Queue{ failed: [WeaviateEx.Batch.Queue.FailedObject.t()], pending: :queue.queue(map()), retry_count: %{required(String.t()) => non_neg_integer()} }
Functions
Clears all failed objects.
Examples
queue = Queue.clear_failed(queue)
@spec dequeue_batch(t(), pos_integer()) :: {[map()], t()}
Removes up to size objects from the pending queue.
Returns a tuple of {objects, updated_queue}.
Examples
{batch, queue} = Queue.dequeue_batch(queue, 100)
Checks if the queue has no pending objects.
Examples
Queue.empty?(queue)
# => true
Adds an object to the pending queue.
Examples
queue = Queue.enqueue(queue, %{id: "uuid-1", properties: %{name: "Test"}})
Adds multiple objects to the pending queue.
Examples
queue = Queue.enqueue_many(queue, objects)
@spec failed_count(t()) :: non_neg_integer()
Returns the number of failed objects.
Examples
Queue.failed_count(queue)
# => 2
@spec get_failed(t()) :: [WeaviateEx.Batch.Queue.FailedObject.t()]
Returns all failed objects.
Examples
failed = Queue.get_failed(queue)
Marks an object as failed with a reason.
If the same object (by id) is marked failed again, increments the retry count.
Examples
queue = Queue.mark_failed(queue, object, "Validation error")
@spec new() :: t()
Creates a new empty queue.
Examples
queue = Queue.new()
@spec pending_count(t()) :: non_neg_integer()
Returns the number of pending objects.
Examples
Queue.pending_count(queue)
# => 5
Moves failed objects back to the pending queue for retry.
Only objects with retry_count < max_retries are requeued.
Options
:max_retries- Maximum number of retries allowed (default: 3)
Examples
queue = Queue.requeue_failed(queue, max_retries: 3)
Returns statistics about the queue.
Examples
stats = Queue.stats(queue)
# => %{pending: 5, failed: 2, total: 7}