WeaviateEx.Batch.Queue (WeaviateEx v0.7.4)

View Source

Queue for managing batch operation objects with failure tracking and retry support.

Provides a structured way to manage objects during batch operations, including:

  • FIFO pending queue for objects to be inserted
  • Failed object tracking with retry counts
  • Automatic re-queue with configurable max retries

Examples

# Create a queue and add objects
queue = Queue.new()
  |> Queue.enqueue_many(objects)

# Process in batches
{batch, queue} = Queue.dequeue_batch(queue, 100)

# Track failures
queue = Queue.mark_failed(queue, failed_object, "Validation error")

# Re-queue failed objects for retry
queue = Queue.requeue_failed(queue, max_retries: 3)

Summary

Functions

Clears all failed objects.

Removes up to size objects from the pending queue.

Checks if the queue has no pending objects.

Adds an object to the pending queue.

Adds multiple objects to the pending queue.

Returns the number of failed objects.

Returns all failed objects.

Marks an object as failed with a reason.

Creates a new empty queue.

Returns the number of pending objects.

Moves failed objects back to the pending queue for retry.

Returns statistics about the queue.

Types

t()

@type t() :: %WeaviateEx.Batch.Queue{
  failed: [WeaviateEx.Batch.Queue.FailedObject.t()],
  pending: :queue.queue(map()),
  retry_count: %{required(String.t()) => non_neg_integer()}
}

Functions

clear_failed(queue)

@spec clear_failed(t()) :: t()

Clears all failed objects.

Examples

queue = Queue.clear_failed(queue)

dequeue_batch(queue, size)

@spec dequeue_batch(t(), pos_integer()) :: {[map()], t()}

Removes up to size objects from the pending queue.

Returns a tuple of {objects, updated_queue}.

Examples

{batch, queue} = Queue.dequeue_batch(queue, 100)

empty?(queue)

@spec empty?(t()) :: boolean()

Checks if the queue has no pending objects.

Examples

Queue.empty?(queue)
# => true

enqueue(queue, object)

@spec enqueue(t(), map()) :: t()

Adds an object to the pending queue.

Examples

queue = Queue.enqueue(queue, %{id: "uuid-1", properties: %{name: "Test"}})

enqueue_many(queue, objects)

@spec enqueue_many(t(), [map()]) :: t()

Adds multiple objects to the pending queue.

Examples

queue = Queue.enqueue_many(queue, objects)

failed_count(queue)

@spec failed_count(t()) :: non_neg_integer()

Returns the number of failed objects.

Examples

Queue.failed_count(queue)
# => 2

get_failed(queue)

@spec get_failed(t()) :: [WeaviateEx.Batch.Queue.FailedObject.t()]

Returns all failed objects.

Examples

failed = Queue.get_failed(queue)

mark_failed(queue, object, reason)

@spec mark_failed(t(), map(), String.t()) :: t()

Marks an object as failed with a reason.

If the same object (by id) is marked failed again, increments the retry count.

Examples

queue = Queue.mark_failed(queue, object, "Validation error")

new()

@spec new() :: t()

Creates a new empty queue.

Examples

queue = Queue.new()

pending_count(queue)

@spec pending_count(t()) :: non_neg_integer()

Returns the number of pending objects.

Examples

Queue.pending_count(queue)
# => 5

requeue_failed(queue, opts \\ [])

@spec requeue_failed(
  t(),
  keyword()
) :: t()

Moves failed objects back to the pending queue for retry.

Only objects with retry_count < max_retries are requeued.

Options

  • :max_retries - Maximum number of retries allowed (default: 3)

Examples

queue = Queue.requeue_failed(queue, max_retries: 3)

stats(queue)

@spec stats(t()) :: map()

Returns statistics about the queue.

Examples

stats = Queue.stats(queue)
# => %{pending: 5, failed: 2, total: 7}