WeaviateEx.Batch (WeaviateEx v0.7.4)
View SourceFunctions for batch operations in Weaviate.
Batch operations are much more efficient than individual operations when dealing with large numbers of objects.
Batch Modes
This module supports three batching modes:
- Fixed size (default): Simple fixed-size batching
- Dynamic: Auto-adjusting batch sizes based on server queue depth
- Rate-limited: Respects vectorizer API rate limits
Context Manager Pattern
Use with_batch/3 for a context-manager style interface that automatically
flushes on exit:
{:ok, results} = WeaviateEx.Batch.with_batch(client, [batch_size: 100], fn batch ->
batch
|> WeaviateEx.Batch.add_object("Article", %{title: "Article 1"})
|> WeaviateEx.Batch.add_object("Article", %{title: "Article 2"})
end)Direct API Examples
# Batch create objects
objects = [
%{class: "Article", properties: %{title: "Article 1"}},
%{class: "Article", properties: %{title: "Article 2"}},
%{class: "Article", properties: %{title: "Article 3"}}
]
{:ok, result} = WeaviateEx.Batch.create_objects(objects)
# Batch delete matching criteria
{:ok, result} = WeaviateEx.Batch.delete_objects(%{
class: "Article",
where: %{
path: ["title"],
operator: "Equal",
valueText: "Delete Me"
}
})
# Batch add references
references = [
%{
from: "weaviate://localhost/Article/uuid1/hasAuthor",
to: "weaviate://localhost/Author/uuid2"
}
]
{:ok, result} = WeaviateEx.Batch.add_references(references)
# Request a summary separating successes and failures
{:ok, summary} = WeaviateEx.Batch.create_objects(objects, return_summary: true)
summary.statistics
Summary
Functions
Add an object to the batch context.
Add a reference to the batch context.
Adds cross-references in batch.
Start a background batch processor.
Creates multiple objects in a single batch request.
Deletes multiple objects matching the given criteria.
Explicitly flush the current batch within a context.
Wait for all vectors to be indexed after batch insertion.
Execute batch operations within a context that automatically flushes on exit.
Types
@type batch_context() :: %{ mode: batch_mode(), client: WeaviateEx.Client.t(), batcher: WeaviateEx.Batch.FixedSize.t() | pid(), opts: keyword(), results: WeaviateEx.Batch.ErrorTracking.Results.t() }
@type batch_mode() :: :fixed | :dynamic | :rate_limited
@type batch_objects() :: [map()]
@type batch_references() :: [map()]
@type delete_criteria() :: map()
Functions
@spec add_object(batch_context(), String.t(), map(), keyword()) :: batch_context()
Add an object to the batch context.
Used within a with_batch/3 callback.
Options
:uuid- Custom UUID for the object:vector- Custom vector for the object:tenant- Tenant name for multi-tenant collections
Examples
Batch.with_batch(client, [], fn batch ->
batch
|> Batch.add_object("Article", %{title: "Test"})
|> Batch.add_object("Article", %{title: "Test 2"}, uuid: "custom-uuid")
end)
@spec add_reference( batch_context(), String.t(), String.t(), String.t(), String.t(), keyword() ) :: batch_context()
Add a reference to the batch context.
Used within a with_batch/3 callback.
Options
:tenant- Tenant name for multi-tenant collections
Examples
Batch.with_batch(client, [], fn batch ->
batch
|> Batch.add_reference("Article", "uuid-1", "hasAuthor", "author-uuid")
end)
@spec add_references(batch_references(), Keyword.t()) :: WeaviateEx.api_response()
Adds cross-references in batch.
Parameters
references- List of reference objectsopts- Additional options
Reference Format
Each reference should have:
:from- Beacon URL of source property (e.g., "weaviate://localhost/Article/uuid/hasAuthor"):to- Beacon URL of target object (e.g., "weaviate://localhost/Author/uuid")
Examples
references = [
%{
from: "weaviate://localhost/Article/550e8400-e29b-41d4-a716-446655440000/hasAuthor",
to: "weaviate://localhost/Author/650e8400-e29b-41d4-a716-446655440000"
}
]
{:ok, result} = Batch.add_references(references)
@spec background(WeaviateEx.Client.t(), String.t(), keyword()) :: {:ok, pid()} | {:error, term()}
Start a background batch processor.
Unlike the synchronous with_batch/3, this returns immediately and
processes objects asynchronously in the background.
Options
:batch_size- Objects per batch (default: 100):concurrent_requests- Max concurrent requests (default: 2):flush_interval- Auto-flush interval in ms (default: 1000):on_flush- Callback on each flush completion:on_error- Callback on each error:tenant- Tenant name for multi-tenancy
Examples
{:ok, batcher} = Batch.background(client, "Article",
batch_size: 100,
concurrent_requests: 2
)
for article <- articles do
:ok = Batch.Background.add_object(batcher, article)
end
results = Batch.Background.stop(batcher, flush: true)
@spec create_objects(batch_objects(), Keyword.t()) :: WeaviateEx.api_response()
Creates multiple objects in a single batch request.
Much more efficient than creating objects one by one.
Parameters
objects- List of objects to createopts- Additional options
Options
:consistency_level- Consistency level for the operation
Object Format
Each object should have:
:class- Collection name:id- Optional UUID:properties- Object properties:vector- Optional vector embedding
Examples
objects = [
%{class: "Article", properties: %{title: "Article 1"}},
%{class: "Article", properties: %{title: "Article 2"}}
]
{:ok, result} = Batch.create_objects(objects)
# result["results"] contains status for each object
@spec delete_objects(delete_criteria(), Keyword.t()) :: WeaviateEx.api_response()
Deletes multiple objects matching the given criteria.
Parameters
criteria- Delete criteria including class and where clauseopts- Additional options
Criteria Format
:class- Collection name (required):where- Where clause to match objects (required):output- Output verbosity ("minimal" or "verbose", default: "minimal"):dryRun- If true, only reports what would be deleted without deleting
Examples
# Delete all articles with specific title
{:ok, result} = Batch.delete_objects(%{
class: "Article",
where: %{
path: ["title"],
operator: "Equal",
valueText: "Delete Me"
}
})
# Dry run to see what would be deleted
{:ok, result} = Batch.delete_objects(%{
class: "Article",
where: %{path: ["status"], operator: "Equal", valueText: "draft"},
dryRun: true
})
@spec flush(batch_context()) :: {:ok, batch_context(), WeaviateEx.Batch.ErrorTracking.Results.t()} | {:error, WeaviateEx.Error.t()}
Explicitly flush the current batch within a context.
Returns updated context with flushed results.
Examples
Batch.with_batch(client, [], fn batch ->
batch = Batch.add_object(batch, "Article", %{title: "Test 1"})
{:ok, batch, _results} = Batch.flush(batch)
batch = Batch.add_object(batch, "Article", %{title: "Test 2"})
batch
end)
@spec wait_for_vector_indexing(WeaviateEx.Client.t(), String.t(), keyword()) :: :ok | {:error, term()}
Wait for all vectors to be indexed after batch insertion.
This function polls shard status until all vector queues are empty, indicating that async vectorization is complete. This is useful when you need to ensure all objects are searchable before proceeding.
Parameters
client- WeaviateEx.Client instancecollection- Collection name to wait foropts- Options
Options
:poll_interval- Milliseconds between status checks (default: 1000):max_failures- Max consecutive failures before error (default: 5):timeout- Maximum wait time in milliseconds (default: 300000):shards- Specific shards to monitor (default: all shards)
Examples
# Wait for all shards
:ok = Batch.wait_for_vector_indexing(client, "Article")
# Wait with custom timeout
:ok = Batch.wait_for_vector_indexing(client, "Article", timeout: 60_000)
# Wait for specific shards
:ok = Batch.wait_for_vector_indexing(client, "Article", shards: ["shard-0"])Returns
:ok- All vectors indexed successfully{:error, :timeout}- Timed out waiting for indexing{:error, {:max_failures, reason}}- Too many consecutive failures
@spec with_batch(WeaviateEx.Client.t(), keyword(), (batch_context() -> batch_context())) :: {:ok, WeaviateEx.Batch.ErrorTracking.Results.t()} | {:error, WeaviateEx.Error.t()}
Execute batch operations within a context that automatically flushes on exit.
This provides a Python-like context manager pattern for batch operations. All buffered objects and references are automatically flushed when the callback completes.
Parameters
client- WeaviateEx.Clientopts- Batch optionsfun- Callback function receiving the batch context
Options
:mode- Batch mode::fixed(default),:dynamic, or:rate_limited:batch_size- Number of objects per batch (default: 100):on_flush- Callback function called after each batch flush:on_error- Callback function called on errors:consistency_level- Consistency level for requests
Dynamic Mode Options
:min_batch_size- Minimum batch size (default: 10):max_batch_size- Maximum batch size (default: 1000):concurrent_requests- Number of concurrent requests (default: 2)
Rate-Limited Mode Options
:requests_per_minute- Maximum requests per minute (default: 60):retry_on_rate_limit- Retry on rate limit errors (default: false):max_retries- Maximum retry attempts (default: 5)
Examples
# Simple fixed-size batching
{:ok, results} = Batch.with_batch(client, [batch_size: 100], fn batch ->
batch
|> Batch.add_object("Article", %{title: "Test 1"})
|> Batch.add_object("Article", %{title: "Test 2"})
end)
# Dynamic batching
{:ok, results} = Batch.with_batch(client, [mode: :dynamic], fn batch ->
Enum.reduce(objects, batch, fn obj, b ->
Batch.add_object(b, "Article", obj)
end)
end)
# Rate-limited batching
{:ok, results} = Batch.with_batch(client, [
mode: :rate_limited,
requests_per_minute: 30
], fn batch ->
batch
|> Batch.add_object("Article", %{title: "Test"})
end)