WeaviateEx.Batch.Stream (WeaviateEx v0.7.4)
View SourceStream-based batch insertion with server-side batching.
Provides a high-level API for streaming batch insertions to Weaviate using bidirectional gRPC streaming. This is more efficient than traditional batch APIs for continuous data ingestion, as it maintains a persistent connection and allows the server to manage batching.
Features
- Bidirectional gRPC streaming for high throughput
- Client-side buffering with configurable size
- Auto-flush based on buffer size or time interval
- Server-side batching mode (Weaviate 1.34+)
- Automatic reconnection on stream failure
- Result tracking and error collection
Example
alias WeaviateEx.Batch.Stream
# Create a new stream
{:ok, stream} = Stream.new(client, "Article", buffer_size: 100)
# Add objects - they're buffered locally
{:ok, stream} = Stream.add(stream, %{properties: %{title: "Article 1"}})
{:ok, stream} = Stream.add(stream, %{properties: %{title: "Article 2"}})
# Add many objects at once
objects = Enum.map(1..100, &%{properties: %{title: "Article #{&1}"}})
{:ok, stream} = Stream.add_many(stream, objects)
# Manually flush if needed
{:ok, stream} = Stream.flush(stream)
# Close the stream and get final results
{:ok, results} = Stream.close(stream)
IO.puts("Inserted #{length(results)} objects")With Multi-tenancy
{:ok, stream} = Stream.new(client, "Article",
buffer_size: 100,
tenant: "tenant-a"
)
# All objects will be inserted with the tenant
{:ok, stream} = Stream.add_many(stream, objects)
{:ok, results} = Stream.close(stream)
Summary
Types
Batch stream result entries (uuid, beacon, status, error).
Functions
Adds a single object to the buffer.
Adds multiple objects to the buffer.
Checks if the buffer is full.
Closes the stream and returns all results.
Opens the gRPC stream connection.
Returns the number of failed insertions.
Flushes the current buffer to the server.
Returns only the error results.
Creates a new batch stream for the given collection.
Returns the number of objects currently in the buffer.
Prepares an object for insertion.
Attempts to reconnect the stream after a failure.
Returns the total number of results received.
Determines if the stream should auto-flush.
Returns the number of successful insertions.
Checks if a state is valid.
Types
@type batch_result() :: map()
Batch stream result entries (uuid, beacon, status, error).
@type object() :: WeaviateEx.GRPC.Services.BatchStream.object()
@type state() :: :initialized | :connected | :streaming | :closing | :closed | :error
@type t() :: %WeaviateEx.Batch.Stream{ buffer: [prepared_object()], buffer_size: pos_integer(), client: map(), collection: String.t(), consistency_level: :all | :quorum | :one | nil, flush_interval_ms: pos_integer(), last_flush_at: DateTime.t() | nil, max_reconnect_attempts: pos_integer(), reconnect_attempts: non_neg_integer(), results: [batch_result()], server_side_batching: boolean(), state: state(), stream_handle: WeaviateEx.GRPC.Services.BatchStream.stream_handle() | nil, tenant: String.t() | nil }
Functions
Adds a single object to the buffer.
The object will be buffered locally until the buffer is full or flush is called. If no UUID is provided, one will be generated.
Example
{:ok, stream} = Stream.add(stream, %{
properties: %{title: "My Article", content: "..."}
})
Adds multiple objects to the buffer.
UUIDs will be generated for objects that don't have them. May trigger multiple flushes if objects exceed buffer size.
Example
objects = Enum.map(1..1000, &%{properties: %{index: &1}})
{:ok, stream} = Stream.add_many(stream, objects)
Checks if the buffer is full.
@spec close(t()) :: {:ok, [batch_result()]} | {:error, term()}
Closes the stream and returns all results.
Any remaining buffered objects will be flushed before closing.
Example
{:ok, results} = Stream.close(stream)
IO.puts("Inserted #{length(results)} objects")
Opens the gRPC stream connection.
This is called automatically on first flush, but can be called manually to establish the connection early.
@spec error_count(t()) :: non_neg_integer()
Returns the number of failed insertions.
Flushes the current buffer to the server.
This sends all buffered objects through the gRPC stream. If the stream is not yet connected, it will be connected first.
Example
{:ok, stream} = Stream.flush(stream)
@spec get_errors(t()) :: [batch_result()]
Returns only the error results.
Creates a new batch stream for the given collection.
Options
:buffer_size- Number of objects to buffer before auto-flush (default: 100):flush_interval_ms- Auto-flush interval in milliseconds (default: 1000):server_side_batching- Let Weaviate manage batching (default: true, requires 1.34+):consistency_level- Consistency level (:all, :quorum, :one):tenant- Tenant name for multi-tenancy:max_reconnect_attempts- Maximum reconnection attempts (default: 3)
Example
{:ok, stream} = Stream.new(client, "Article",
buffer_size: 200,
flush_interval_ms: 2000,
consistency_level: :quorum
)
@spec pending_count(t()) :: non_neg_integer()
Returns the number of objects currently in the buffer.
@spec prepare_object(t(), object()) :: prepared_object()
Prepares an object for insertion.
Adds collection, generates UUID if missing, and adds tenant if specified.
@spec reconnect(t(), pos_integer()) :: {:ok, t()} | {:error, term()}
Attempts to reconnect the stream after a failure.
Returns an error if max reconnection attempts have been exceeded.
@spec results_count(t()) :: non_neg_integer()
Returns the total number of results received.
Determines if the stream should auto-flush.
Returns true if the buffer is full or the flush interval has passed.
@spec success_count(t()) :: non_neg_integer()
Returns the number of successful insertions.
Checks if a state is valid.