WeaviateEx.GRPC.Services.BatchStream (WeaviateEx v0.7.4)

View Source

Bidirectional gRPC streaming for batch operations (Weaviate 1.34+).

This module provides low-level gRPC streaming operations for batch object and reference insertion. For most use cases, prefer WeaviateEx.Batch.Stream which provides a higher-level API.

Protocol Flow

  1. Client sends a Start message to initiate the stream
  2. Server responds with Started
  3. Client sends Data messages containing objects/references
  4. Server responds with Acks (acknowledgments) or Results (final results)
  5. Server may send Backoff to request slower sending
  6. Client sends Stop to close the stream
  7. Server responds with ShuttingDown, then Shutdown

Example

alias WeaviateEx.GRPC.Services.BatchStream

# Open stream
{:ok, stream} = BatchStream.open(channel)

# Send start message
:ok = GRPC.Stub.send_request(stream, BatchStream.start_message())

# Wait for started confirmation
{:ok, reply} = GRPC.Stub.recv(stream)
{:started, _} = BatchStream.parse_reply(reply)

# Send data
objects = [%{uuid: "...", collection: "Test", properties: %{}}]
:ok = GRPC.Stub.send_request(stream, BatchStream.data_message(objects, []))

# Receive results
{:ok, reply} = GRPC.Stub.recv(stream)
{:results, results} = BatchStream.parse_reply(reply)

# Close stream
:ok = GRPC.Stub.send_request(stream, BatchStream.stop_message())

Summary

Functions

Builds a BatchObject from a map.

Builds a BatchReference from a map.

Closes the batch stream gracefully.

Converts a consistency level atom to the protobuf enum value.

Creates a Data message containing objects and/or references.

Opens a bidirectional batch stream on the given gRPC channel.

Parses a BatchStreamReply into a more usable format.

Receives results from the batch stream.

Sends objects through an open batch stream.

Sends references through an open batch stream.

Creates a Start message for initiating the batch stream.

Creates a Stop message for gracefully closing the batch stream.

Types

batch_ref()

@type batch_ref() :: map()

batch_result()

@type batch_result() :: %{
  uuid: String.t() | nil,
  beacon: String.t() | nil,
  status: :success | :error,
  error: String.t() | nil
}

consistency_level()

@type consistency_level() :: :all | :quorum | :one | nil

object()

@type object() :: map()

reply()

@type reply() ::
  {:started, map()}
  | {:shutdown, map()}
  | {:shutting_down, map()}
  | {:backoff, %{batch_size: non_neg_integer() | nil}}
  | {:acks, %{uuids: [String.t()], beacons: [String.t()]}}
  | {:results, %{successes: [batch_result()], errors: [batch_result()]}}
  | {:unknown, map()}

stream_handle()

@type stream_handle() :: GRPC.Client.Stream.t()

Functions

build_batch_object(obj)

@spec build_batch_object(object()) :: struct()

Builds a BatchObject from a map.

build_batch_reference(ref)

@spec build_batch_reference(batch_ref()) :: struct()

Builds a BatchReference from a map.

close(stream)

@spec close(stream_handle()) :: :ok | {:error, term()}

Closes the batch stream gracefully.

Sends a stop message and waits for the server to acknowledge shutdown.

consistency_level_to_proto(atom)

@spec consistency_level_to_proto(consistency_level()) :: atom() | nil

Converts a consistency level atom to the protobuf enum value.

data_message(objects, references)

@spec data_message([object()], [batch_ref()]) :: struct()

Creates a Data message containing objects and/or references.

open(channel, opts \\ [])

@spec open(
  GRPC.Channel.t(),
  keyword()
) :: {:ok, stream_handle()} | {:error, term()}

Opens a bidirectional batch stream on the given gRPC channel.

Options

  • :timeout - Stream open timeout in milliseconds (default: 30000)
  • :metadata - Additional gRPC metadata headers

parse_reply(batch_stream_reply)

@spec parse_reply(struct()) :: reply()

Parses a BatchStreamReply into a more usable format.

receive_results(stream, timeout \\ 30000)

@spec receive_results(stream_handle(), timeout()) :: {:ok, term()} | {:error, term()}

Receives results from the batch stream.

Returns the next reply from the stream, or an error if the stream is closed or times out.

send_objects(stream, objects)

@spec send_objects(stream_handle(), [object()]) :: stream_handle()

Sends objects through an open batch stream.

send_references(stream, references)

@spec send_references(stream_handle(), [batch_ref()]) :: stream_handle()

Sends references through an open batch stream.

start_message(opts \\ [])

@spec start_message(keyword()) :: struct()

Creates a Start message for initiating the batch stream.

Options

  • :consistency_level - Consistency level (:all, :quorum, :one)

stop_message()

@spec stop_message() :: struct()

Creates a Stop message for gracefully closing the batch stream.