WeaviateEx.Batch.Dynamic (WeaviateEx v0.7.4)
View SourceDynamic batch processor that auto-adjusts batch sizes based on server queue.
This GenServer accumulates objects and references, then sends them in batches to Weaviate. The batch size is dynamically adjusted based on the server's queue depth to optimize throughput.
Features
- Auto-adjusting batch sizes (10-1000 range by default)
- Concurrent batch sending
- Automatic flushing when batch size is reached
- Error tracking for failed objects/references
- Graceful shutdown with final flush
Examples
# Start a dynamic batcher
{:ok, batcher} = Dynamic.start(client: client)
# Add objects
Dynamic.add_object(batcher, "Article", %{title: "Test"})
Dynamic.add_object(batcher, "Article", %{title: "Test 2"}, uuid: "custom-uuid")
# Add references
Dynamic.add_reference(batcher, "Article", "uuid-1", "hasAuthor", "author-uuid")
# Manually flush
{:ok, results} = Dynamic.flush(batcher)
# Stop and get final results
{:ok, final_results} = Dynamic.stop(batcher)
Summary
Functions
Add an object to the batch buffer.
Add a reference to the batch buffer.
Returns a specification to start this module under a supervisor.
Flush all buffered objects and references to Weaviate.
Get current server batch statistics.
Get the current state of the batcher.
Report the current queue size to adjust batch sizing.
Start a new dynamic batcher.
Stop the batcher, flushing any remaining objects.
Types
@type batch_stats() :: %{ queue_length: non_neg_integer(), rate_per_second: float(), failed_count: non_neg_integer() }
@type state() :: %{ client: WeaviateEx.Client.t(), batch_size: pos_integer(), min_batch_size: pos_integer(), max_batch_size: pos_integer(), concurrent_requests: pos_integer(), auto_flush: boolean(), objects_buffer: [batch_object()], references_buffer: [batch_reference()], queue_size: non_neg_integer(), results: WeaviateEx.Batch.ErrorTracking.Results.t(), on_flush: (WeaviateEx.Batch.ErrorTracking.Results.t() -> any()) | nil, on_error: (WeaviateEx.Error.t() -> any()) | nil, consistency_level: String.t() | nil, monitor_server_stats: boolean(), poll_interval: pos_integer(), poll_timer_ref: reference() | nil, auto_retry: boolean(), retry_queue_pid: pid() | nil, on_permanent_failure: ([map()] -> any()) | nil }
Functions
Add an object to the batch buffer.
Options
:uuid- Custom UUID for the object:vector- Custom vector for the object:tenant- Tenant name for multi-tenant collections
Examples
Dynamic.add_object(batcher, "Article", %{title: "Test"})
Dynamic.add_object(batcher, "Article", %{title: "Test"}, uuid: "custom-uuid")
Add a reference to the batch buffer.
Options
:tenant- Tenant name for multi-tenant collections
Examples
Dynamic.add_reference(batcher, "Article", "uuid-1", "hasAuthor", "author-uuid")
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec flush(pid()) :: {:ok, WeaviateEx.Batch.ErrorTracking.Results.t()} | {:error, WeaviateEx.Error.t()}
Flush all buffered objects and references to Weaviate.
Returns aggregated results including any errors.
@spec get_server_batch_stats(WeaviateEx.Client.t()) :: {:ok, batch_stats()} | {:error, term()}
Get current server batch statistics.
Polls the /v1/nodes endpoint to retrieve batch queue information,
useful for monitoring and dynamic batch sizing.
Examples
{:ok, stats} = Dynamic.get_server_batch_stats(client)
# => %{queue_length: 42, rate_per_second: 150.5, failed_count: 0}Returns
{:ok, batch_stats()}- Current batch statistics{:error, term()}- Error if request fails
Get the current state of the batcher.
Useful for debugging and testing.
@spec report_queue_size(pid(), non_neg_integer()) :: :ok
Report the current queue size to adjust batch sizing.
Called internally or externally to inform the batcher about server load.
Start a new dynamic batcher.
Options
:client- WeaviateEx.Client (required):batch_size- Initial batch size (default: 100):min_batch_size- Minimum batch size (default: 10):max_batch_size- Maximum batch size (default: 1000):concurrent_requests- Number of concurrent requests (default: 2):auto_flush- Automatically flush when batch size is reached (default: false):name- Optional name for process registration:on_flush- Callback function called after each flush:on_error- Callback function called on errors:consistency_level- Consistency level for requests:monitor_server_stats- Poll server for batch stats to adjust sizing (default: false):poll_interval- Interval in ms between server stat polls (default: 5000):auto_retry- Automatically re-queue failed objects (default: true):max_retries- Maximum retry attempts per object (default: 3):retry_delay_ms- Base delay for retry backoff in ms (default: 1000):on_permanent_failure- Callback for objects that exceed max_retries
Examples
{:ok, batcher} = Dynamic.start(client: client, batch_size: 50)
# With server stats monitoring
{:ok, batcher} = Dynamic.start(client: client, monitor_server_stats: true)
# With auto-retry configuration
{:ok, batcher} = Dynamic.start(
client: client,
auto_retry: true,
max_retries: 5,
on_permanent_failure: fn objects ->
Logger.error("Permanent failures: #{length(objects)}")
end
)
@spec stop(pid()) :: {:ok, WeaviateEx.Batch.ErrorTracking.Results.t()}
Stop the batcher, flushing any remaining objects.
Returns final aggregated results.