WeaviateEx.Batch.Dynamic (WeaviateEx v0.7.4)

View Source

Dynamic batch processor that auto-adjusts batch sizes based on server queue.

This GenServer accumulates objects and references, then sends them in batches to Weaviate. The batch size is dynamically adjusted based on the server's queue depth to optimize throughput.

Features

  • Auto-adjusting batch sizes (10-1000 range by default)
  • Concurrent batch sending
  • Automatic flushing when batch size is reached
  • Error tracking for failed objects/references
  • Graceful shutdown with final flush

Examples

# Start a dynamic batcher
{:ok, batcher} = Dynamic.start(client: client)

# Add objects
Dynamic.add_object(batcher, "Article", %{title: "Test"})
Dynamic.add_object(batcher, "Article", %{title: "Test 2"}, uuid: "custom-uuid")

# Add references
Dynamic.add_reference(batcher, "Article", "uuid-1", "hasAuthor", "author-uuid")

# Manually flush
{:ok, results} = Dynamic.flush(batcher)

# Stop and get final results
{:ok, final_results} = Dynamic.stop(batcher)

Summary

Functions

Add an object to the batch buffer.

Returns a specification to start this module under a supervisor.

Flush all buffered objects and references to Weaviate.

Get current server batch statistics.

Get the current state of the batcher.

Report the current queue size to adjust batch sizing.

Start a new dynamic batcher.

Stop the batcher, flushing any remaining objects.

Types

batch_object()

@type batch_object() :: %{
  collection: String.t(),
  properties: map(),
  uuid: String.t() | nil,
  vector: [float()] | nil,
  tenant: String.t() | nil
}

batch_reference()

@type batch_reference() :: %{
  collection: String.t(),
  from_uuid: String.t(),
  property: String.t(),
  to_uuid: String.t(),
  tenant: String.t() | nil
}

batch_stats()

@type batch_stats() :: %{
  queue_length: non_neg_integer(),
  rate_per_second: float(),
  failed_count: non_neg_integer()
}

state()

@type state() :: %{
  client: WeaviateEx.Client.t(),
  batch_size: pos_integer(),
  min_batch_size: pos_integer(),
  max_batch_size: pos_integer(),
  concurrent_requests: pos_integer(),
  auto_flush: boolean(),
  objects_buffer: [batch_object()],
  references_buffer: [batch_reference()],
  queue_size: non_neg_integer(),
  results: WeaviateEx.Batch.ErrorTracking.Results.t(),
  on_flush: (WeaviateEx.Batch.ErrorTracking.Results.t() -> any()) | nil,
  on_error: (WeaviateEx.Error.t() -> any()) | nil,
  consistency_level: String.t() | nil,
  monitor_server_stats: boolean(),
  poll_interval: pos_integer(),
  poll_timer_ref: reference() | nil,
  auto_retry: boolean(),
  retry_queue_pid: pid() | nil,
  on_permanent_failure: ([map()] -> any()) | nil
}

Functions

add_object(pid, collection, properties, opts \\ [])

@spec add_object(pid(), String.t(), map(), keyword()) :: :ok

Add an object to the batch buffer.

Options

  • :uuid - Custom UUID for the object
  • :vector - Custom vector for the object
  • :tenant - Tenant name for multi-tenant collections

Examples

Dynamic.add_object(batcher, "Article", %{title: "Test"})
Dynamic.add_object(batcher, "Article", %{title: "Test"}, uuid: "custom-uuid")

add_reference(pid, collection, from_uuid, property, to_uuid, opts \\ [])

@spec add_reference(pid(), String.t(), String.t(), String.t(), String.t(), keyword()) ::
  :ok

Add a reference to the batch buffer.

Options

  • :tenant - Tenant name for multi-tenant collections

Examples

Dynamic.add_reference(batcher, "Article", "uuid-1", "hasAuthor", "author-uuid")

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

flush(pid)

@spec flush(pid()) ::
  {:ok, WeaviateEx.Batch.ErrorTracking.Results.t()}
  | {:error, WeaviateEx.Error.t()}

Flush all buffered objects and references to Weaviate.

Returns aggregated results including any errors.

get_server_batch_stats(client)

@spec get_server_batch_stats(WeaviateEx.Client.t()) ::
  {:ok, batch_stats()} | {:error, term()}

Get current server batch statistics.

Polls the /v1/nodes endpoint to retrieve batch queue information, useful for monitoring and dynamic batch sizing.

Examples

{:ok, stats} = Dynamic.get_server_batch_stats(client)
# => %{queue_length: 42, rate_per_second: 150.5, failed_count: 0}

Returns

  • {:ok, batch_stats()} - Current batch statistics
  • {:error, term()} - Error if request fails

get_state(pid)

@spec get_state(pid()) :: state()

Get the current state of the batcher.

Useful for debugging and testing.

report_queue_size(pid, queue_size)

@spec report_queue_size(pid(), non_neg_integer()) :: :ok

Report the current queue size to adjust batch sizing.

Called internally or externally to inform the batcher about server load.

start(opts)

@spec start(keyword()) :: {:ok, pid()} | {:error, term()}

Start a new dynamic batcher.

Options

  • :client - WeaviateEx.Client (required)
  • :batch_size - Initial batch size (default: 100)
  • :min_batch_size - Minimum batch size (default: 10)
  • :max_batch_size - Maximum batch size (default: 1000)
  • :concurrent_requests - Number of concurrent requests (default: 2)
  • :auto_flush - Automatically flush when batch size is reached (default: false)
  • :name - Optional name for process registration
  • :on_flush - Callback function called after each flush
  • :on_error - Callback function called on errors
  • :consistency_level - Consistency level for requests
  • :monitor_server_stats - Poll server for batch stats to adjust sizing (default: false)
  • :poll_interval - Interval in ms between server stat polls (default: 5000)
  • :auto_retry - Automatically re-queue failed objects (default: true)
  • :max_retries - Maximum retry attempts per object (default: 3)
  • :retry_delay_ms - Base delay for retry backoff in ms (default: 1000)
  • :on_permanent_failure - Callback for objects that exceed max_retries

Examples

{:ok, batcher} = Dynamic.start(client: client, batch_size: 50)

# With server stats monitoring
{:ok, batcher} = Dynamic.start(client: client, monitor_server_stats: true)

# With auto-retry configuration
{:ok, batcher} = Dynamic.start(
  client: client,
  auto_retry: true,
  max_retries: 5,
  on_permanent_failure: fn objects ->
    Logger.error("Permanent failures: #{length(objects)}")
  end
)

stop(pid)

@spec stop(pid()) :: {:ok, WeaviateEx.Batch.ErrorTracking.Results.t()}

Stop the batcher, flushing any remaining objects.

Returns final aggregated results.