WeaviateEx.Batch (WeaviateEx v0.7.4)

View Source

Functions for batch operations in Weaviate.

Batch operations are much more efficient than individual operations when dealing with large numbers of objects.

Batch Modes

This module supports three batching modes:

  • Fixed size (default): Simple fixed-size batching
  • Dynamic: Auto-adjusting batch sizes based on server queue depth
  • Rate-limited: Respects vectorizer API rate limits

Context Manager Pattern

Use with_batch/3 for a context-manager style interface that automatically flushes on exit:

{:ok, results} = WeaviateEx.Batch.with_batch(client, [batch_size: 100], fn batch ->
  batch
  |> WeaviateEx.Batch.add_object("Article", %{title: "Article 1"})
  |> WeaviateEx.Batch.add_object("Article", %{title: "Article 2"})
end)

Direct API Examples

# Batch create objects
objects = [
  %{class: "Article", properties: %{title: "Article 1"}},
  %{class: "Article", properties: %{title: "Article 2"}},
  %{class: "Article", properties: %{title: "Article 3"}}
]

{:ok, result} = WeaviateEx.Batch.create_objects(objects)

# Batch delete matching criteria
{:ok, result} = WeaviateEx.Batch.delete_objects(%{
  class: "Article",
  where: %{
    path: ["title"],
    operator: "Equal",
    valueText: "Delete Me"
  }
})

# Batch add references
references = [
  %{
    from: "weaviate://localhost/Article/uuid1/hasAuthor",
    to: "weaviate://localhost/Author/uuid2"
  }
]

{:ok, result} = WeaviateEx.Batch.add_references(references)

# Request a summary separating successes and failures
{:ok, summary} = WeaviateEx.Batch.create_objects(objects, return_summary: true)
summary.statistics

Summary

Functions

Add an object to the batch context.

Adds cross-references in batch.

Start a background batch processor.

Creates multiple objects in a single batch request.

Deletes multiple objects matching the given criteria.

Explicitly flush the current batch within a context.

Wait for all vectors to be indexed after batch insertion.

Execute batch operations within a context that automatically flushes on exit.

Types

batch_context()

@type batch_context() :: %{
  mode: batch_mode(),
  client: WeaviateEx.Client.t(),
  batcher: WeaviateEx.Batch.FixedSize.t() | pid(),
  opts: keyword(),
  results: WeaviateEx.Batch.ErrorTracking.Results.t()
}

batch_mode()

@type batch_mode() :: :fixed | :dynamic | :rate_limited

batch_objects()

@type batch_objects() :: [map()]

batch_references()

@type batch_references() :: [map()]

delete_criteria()

@type delete_criteria() :: map()

Functions

add_object(ctx, collection, properties, opts \\ [])

@spec add_object(batch_context(), String.t(), map(), keyword()) :: batch_context()

Add an object to the batch context.

Used within a with_batch/3 callback.

Options

  • :uuid - Custom UUID for the object
  • :vector - Custom vector for the object
  • :tenant - Tenant name for multi-tenant collections

Examples

Batch.with_batch(client, [], fn batch ->
  batch
  |> Batch.add_object("Article", %{title: "Test"})
  |> Batch.add_object("Article", %{title: "Test 2"}, uuid: "custom-uuid")
end)

add_reference(ctx, collection, from_uuid, property, to_uuid, opts \\ [])

@spec add_reference(
  batch_context(),
  String.t(),
  String.t(),
  String.t(),
  String.t(),
  keyword()
) ::
  batch_context()

Add a reference to the batch context.

Used within a with_batch/3 callback.

Options

  • :tenant - Tenant name for multi-tenant collections

Examples

Batch.with_batch(client, [], fn batch ->
  batch
  |> Batch.add_reference("Article", "uuid-1", "hasAuthor", "author-uuid")
end)

add_references(references, opts \\ [])

@spec add_references(batch_references(), Keyword.t()) :: WeaviateEx.api_response()

Adds cross-references in batch.

Parameters

  • references - List of reference objects
  • opts - Additional options

Reference Format

Each reference should have:

  • :from - Beacon URL of source property (e.g., "weaviate://localhost/Article/uuid/hasAuthor")
  • :to - Beacon URL of target object (e.g., "weaviate://localhost/Author/uuid")

Examples

references = [
  %{
    from: "weaviate://localhost/Article/550e8400-e29b-41d4-a716-446655440000/hasAuthor",
    to: "weaviate://localhost/Author/650e8400-e29b-41d4-a716-446655440000"
  }
]

{:ok, result} = Batch.add_references(references)

background(client, collection, opts \\ [])

@spec background(WeaviateEx.Client.t(), String.t(), keyword()) ::
  {:ok, pid()} | {:error, term()}

Start a background batch processor.

Unlike the synchronous with_batch/3, this returns immediately and processes objects asynchronously in the background.

Options

  • :batch_size - Objects per batch (default: 100)
  • :concurrent_requests - Max concurrent requests (default: 2)
  • :flush_interval - Auto-flush interval in ms (default: 1000)
  • :on_flush - Callback on each flush completion
  • :on_error - Callback on each error
  • :tenant - Tenant name for multi-tenancy

Examples

{:ok, batcher} = Batch.background(client, "Article",
  batch_size: 100,
  concurrent_requests: 2
)

for article <- articles do
  :ok = Batch.Background.add_object(batcher, article)
end

results = Batch.Background.stop(batcher, flush: true)

create_objects(objects, opts \\ [])

@spec create_objects(batch_objects(), Keyword.t()) :: WeaviateEx.api_response()

Creates multiple objects in a single batch request.

Much more efficient than creating objects one by one.

Parameters

  • objects - List of objects to create
  • opts - Additional options

Options

  • :consistency_level - Consistency level for the operation

Object Format

Each object should have:

  • :class - Collection name
  • :id - Optional UUID
  • :properties - Object properties
  • :vector - Optional vector embedding

Examples

objects = [
  %{class: "Article", properties: %{title: "Article 1"}},
  %{class: "Article", properties: %{title: "Article 2"}}
]

{:ok, result} = Batch.create_objects(objects)
# result["results"] contains status for each object

delete_objects(criteria, opts \\ [])

@spec delete_objects(delete_criteria(), Keyword.t()) :: WeaviateEx.api_response()

Deletes multiple objects matching the given criteria.

Parameters

  • criteria - Delete criteria including class and where clause
  • opts - Additional options

Criteria Format

  • :class - Collection name (required)
  • :where - Where clause to match objects (required)
  • :output - Output verbosity ("minimal" or "verbose", default: "minimal")
  • :dryRun - If true, only reports what would be deleted without deleting

Examples

# Delete all articles with specific title
{:ok, result} = Batch.delete_objects(%{
  class: "Article",
  where: %{
    path: ["title"],
    operator: "Equal",
    valueText: "Delete Me"
  }
})

# Dry run to see what would be deleted
{:ok, result} = Batch.delete_objects(%{
  class: "Article",
  where: %{path: ["status"], operator: "Equal", valueText: "draft"},
  dryRun: true
})

flush(ctx)

Explicitly flush the current batch within a context.

Returns updated context with flushed results.

Examples

Batch.with_batch(client, [], fn batch ->
  batch = Batch.add_object(batch, "Article", %{title: "Test 1"})
  {:ok, batch, _results} = Batch.flush(batch)
  batch = Batch.add_object(batch, "Article", %{title: "Test 2"})
  batch
end)

wait_for_vector_indexing(client, collection, opts \\ [])

@spec wait_for_vector_indexing(WeaviateEx.Client.t(), String.t(), keyword()) ::
  :ok | {:error, term()}

Wait for all vectors to be indexed after batch insertion.

This function polls shard status until all vector queues are empty, indicating that async vectorization is complete. This is useful when you need to ensure all objects are searchable before proceeding.

Parameters

  • client - WeaviateEx.Client instance
  • collection - Collection name to wait for
  • opts - Options

Options

  • :poll_interval - Milliseconds between status checks (default: 1000)
  • :max_failures - Max consecutive failures before error (default: 5)
  • :timeout - Maximum wait time in milliseconds (default: 300000)
  • :shards - Specific shards to monitor (default: all shards)

Examples

# Wait for all shards
:ok = Batch.wait_for_vector_indexing(client, "Article")

# Wait with custom timeout
:ok = Batch.wait_for_vector_indexing(client, "Article", timeout: 60_000)

# Wait for specific shards
:ok = Batch.wait_for_vector_indexing(client, "Article", shards: ["shard-0"])

Returns

  • :ok - All vectors indexed successfully
  • {:error, :timeout} - Timed out waiting for indexing
  • {:error, {:max_failures, reason}} - Too many consecutive failures

with_batch(client, opts, fun)

Execute batch operations within a context that automatically flushes on exit.

This provides a Python-like context manager pattern for batch operations. All buffered objects and references are automatically flushed when the callback completes.

Parameters

  • client - WeaviateEx.Client
  • opts - Batch options
  • fun - Callback function receiving the batch context

Options

  • :mode - Batch mode: :fixed (default), :dynamic, or :rate_limited
  • :batch_size - Number of objects per batch (default: 100)
  • :on_flush - Callback function called after each batch flush
  • :on_error - Callback function called on errors
  • :consistency_level - Consistency level for requests

Dynamic Mode Options

  • :min_batch_size - Minimum batch size (default: 10)
  • :max_batch_size - Maximum batch size (default: 1000)
  • :concurrent_requests - Number of concurrent requests (default: 2)

Rate-Limited Mode Options

  • :requests_per_minute - Maximum requests per minute (default: 60)
  • :retry_on_rate_limit - Retry on rate limit errors (default: false)
  • :max_retries - Maximum retry attempts (default: 5)

Examples

# Simple fixed-size batching
{:ok, results} = Batch.with_batch(client, [batch_size: 100], fn batch ->
  batch
  |> Batch.add_object("Article", %{title: "Test 1"})
  |> Batch.add_object("Article", %{title: "Test 2"})
end)

# Dynamic batching
{:ok, results} = Batch.with_batch(client, [mode: :dynamic], fn batch ->
  Enum.reduce(objects, batch, fn obj, b ->
    Batch.add_object(b, "Article", obj)
  end)
end)

# Rate-limited batching
{:ok, results} = Batch.with_batch(client, [
  mode: :rate_limited,
  requests_per_minute: 30
], fn batch ->
  batch
  |> Batch.add_object("Article", %{title: "Test"})
end)