WeaviateEx.Batch.Stream (WeaviateEx v0.7.4)

View Source

Stream-based batch insertion with server-side batching.

Provides a high-level API for streaming batch insertions to Weaviate using bidirectional gRPC streaming. This is more efficient than traditional batch APIs for continuous data ingestion, as it maintains a persistent connection and allows the server to manage batching.

Features

  • Bidirectional gRPC streaming for high throughput
  • Client-side buffering with configurable size
  • Auto-flush based on buffer size or time interval
  • Server-side batching mode (Weaviate 1.34+)
  • Automatic reconnection on stream failure
  • Result tracking and error collection

Example

alias WeaviateEx.Batch.Stream

# Create a new stream
{:ok, stream} = Stream.new(client, "Article", buffer_size: 100)

# Add objects - they're buffered locally
{:ok, stream} = Stream.add(stream, %{properties: %{title: "Article 1"}})
{:ok, stream} = Stream.add(stream, %{properties: %{title: "Article 2"}})

# Add many objects at once
objects = Enum.map(1..100, &%{properties: %{title: "Article #{&1}"}})
{:ok, stream} = Stream.add_many(stream, objects)

# Manually flush if needed
{:ok, stream} = Stream.flush(stream)

# Close the stream and get final results
{:ok, results} = Stream.close(stream)

IO.puts("Inserted #{length(results)} objects")

With Multi-tenancy

{:ok, stream} = Stream.new(client, "Article",
  buffer_size: 100,
  tenant: "tenant-a"
)

# All objects will be inserted with the tenant
{:ok, stream} = Stream.add_many(stream, objects)
{:ok, results} = Stream.close(stream)

Summary

Types

Batch stream result entries (uuid, beacon, status, error).

t()

Functions

Adds a single object to the buffer.

Adds multiple objects to the buffer.

Checks if the buffer is full.

Closes the stream and returns all results.

Opens the gRPC stream connection.

Returns the number of failed insertions.

Flushes the current buffer to the server.

Returns only the error results.

Creates a new batch stream for the given collection.

Returns the number of objects currently in the buffer.

Prepares an object for insertion.

Attempts to reconnect the stream after a failure.

Returns the total number of results received.

Determines if the stream should auto-flush.

Returns the number of successful insertions.

Checks if a state is valid.

Types

batch_result()

@type batch_result() :: map()

Batch stream result entries (uuid, beacon, status, error).

object()

prepared_object()

@type prepared_object() :: %{
  :uuid => String.t(),
  :collection => String.t(),
  optional(:tenant) => String.t(),
  optional(:vector) => [float()],
  optional(:vectors) => %{required(String.t()) => [float()]},
  optional(:properties) => map()
}

state()

@type state() :: :initialized | :connected | :streaming | :closing | :closed | :error

t()

@type t() :: %WeaviateEx.Batch.Stream{
  buffer: [prepared_object()],
  buffer_size: pos_integer(),
  client: map(),
  collection: String.t(),
  consistency_level: :all | :quorum | :one | nil,
  flush_interval_ms: pos_integer(),
  last_flush_at: DateTime.t() | nil,
  max_reconnect_attempts: pos_integer(),
  reconnect_attempts: non_neg_integer(),
  results: [batch_result()],
  server_side_batching: boolean(),
  state: state(),
  stream_handle: WeaviateEx.GRPC.Services.BatchStream.stream_handle() | nil,
  tenant: String.t() | nil
}

Functions

add(stream, object)

@spec add(t(), object()) :: {:ok, t()} | {:error, term()}

Adds a single object to the buffer.

The object will be buffered locally until the buffer is full or flush is called. If no UUID is provided, one will be generated.

Example

{:ok, stream} = Stream.add(stream, %{
  properties: %{title: "My Article", content: "..."}
})

add_many(stream, objects)

@spec add_many(t(), [object()]) :: {:ok, t()} | {:error, term()}

Adds multiple objects to the buffer.

UUIDs will be generated for objects that don't have them. May trigger multiple flushes if objects exceed buffer size.

Example

objects = Enum.map(1..1000, &%{properties: %{index: &1}})
{:ok, stream} = Stream.add_many(stream, objects)

buffer_full?(stream)

@spec buffer_full?(t()) :: boolean()

Checks if the buffer is full.

close(stream)

@spec close(t()) :: {:ok, [batch_result()]} | {:error, term()}

Closes the stream and returns all results.

Any remaining buffered objects will be flushed before closing.

Example

{:ok, results} = Stream.close(stream)
IO.puts("Inserted #{length(results)} objects")

connect(stream)

@spec connect(t()) :: {:ok, t()} | {:error, term()}

Opens the gRPC stream connection.

This is called automatically on first flush, but can be called manually to establish the connection early.

error_count(stream)

@spec error_count(t()) :: non_neg_integer()

Returns the number of failed insertions.

flush(stream)

@spec flush(t()) :: {:ok, t()} | {:error, term()}

Flushes the current buffer to the server.

This sends all buffered objects through the gRPC stream. If the stream is not yet connected, it will be connected first.

Example

{:ok, stream} = Stream.flush(stream)

get_errors(stream)

@spec get_errors(t()) :: [batch_result()]

Returns only the error results.

new(client, collection, opts \\ [])

@spec new(map(), String.t(), keyword()) :: {:ok, t()} | {:error, term()}

Creates a new batch stream for the given collection.

Options

  • :buffer_size - Number of objects to buffer before auto-flush (default: 100)
  • :flush_interval_ms - Auto-flush interval in milliseconds (default: 1000)
  • :server_side_batching - Let Weaviate manage batching (default: true, requires 1.34+)
  • :consistency_level - Consistency level (:all, :quorum, :one)
  • :tenant - Tenant name for multi-tenancy
  • :max_reconnect_attempts - Maximum reconnection attempts (default: 3)

Example

{:ok, stream} = Stream.new(client, "Article",
  buffer_size: 200,
  flush_interval_ms: 2000,
  consistency_level: :quorum
)

pending_count(stream)

@spec pending_count(t()) :: non_neg_integer()

Returns the number of objects currently in the buffer.

prepare_object(stream, object)

@spec prepare_object(t(), object()) :: prepared_object()

Prepares an object for insertion.

Adds collection, generates UUID if missing, and adds tenant if specified.

reconnect(stream, max_attempts \\ 3)

@spec reconnect(t(), pos_integer()) :: {:ok, t()} | {:error, term()}

Attempts to reconnect the stream after a failure.

Returns an error if max reconnection attempts have been exceeded.

results_count(stream)

@spec results_count(t()) :: non_neg_integer()

Returns the total number of results received.

should_flush?(stream)

@spec should_flush?(t()) :: boolean()

Determines if the stream should auto-flush.

Returns true if the buffer is full or the flush interval has passed.

success_count(stream)

@spec success_count(t()) :: non_neg_integer()

Returns the number of successful insertions.

valid_state?(state)

@spec valid_state?(atom()) :: boolean()

Checks if a state is valid.