WeaviateEx.GRPC.Services.BatchStream (WeaviateEx v0.7.4)
View SourceBidirectional gRPC streaming for batch operations (Weaviate 1.34+).
This module provides low-level gRPC streaming operations for batch object
and reference insertion. For most use cases, prefer WeaviateEx.Batch.Stream
which provides a higher-level API.
Protocol Flow
- Client sends a Start message to initiate the stream
- Server responds with Started
- Client sends Data messages containing objects/references
- Server responds with Acks (acknowledgments) or Results (final results)
- Server may send Backoff to request slower sending
- Client sends Stop to close the stream
- Server responds with ShuttingDown, then Shutdown
Example
alias WeaviateEx.GRPC.Services.BatchStream
# Open stream
{:ok, stream} = BatchStream.open(channel)
# Send start message
:ok = GRPC.Stub.send_request(stream, BatchStream.start_message())
# Wait for started confirmation
{:ok, reply} = GRPC.Stub.recv(stream)
{:started, _} = BatchStream.parse_reply(reply)
# Send data
objects = [%{uuid: "...", collection: "Test", properties: %{}}]
:ok = GRPC.Stub.send_request(stream, BatchStream.data_message(objects, []))
# Receive results
{:ok, reply} = GRPC.Stub.recv(stream)
{:results, results} = BatchStream.parse_reply(reply)
# Close stream
:ok = GRPC.Stub.send_request(stream, BatchStream.stop_message())
Summary
Functions
Builds a BatchObject from a map.
Builds a BatchReference from a map.
Closes the batch stream gracefully.
Converts a consistency level atom to the protobuf enum value.
Creates a Data message containing objects and/or references.
Opens a bidirectional batch stream on the given gRPC channel.
Parses a BatchStreamReply into a more usable format.
Receives results from the batch stream.
Sends objects through an open batch stream.
Sends references through an open batch stream.
Creates a Start message for initiating the batch stream.
Creates a Stop message for gracefully closing the batch stream.
Types
@type batch_ref() :: map()
@type consistency_level() :: :all | :quorum | :one | nil
@type object() :: map()
@type reply() :: {:started, map()} | {:shutdown, map()} | {:shutting_down, map()} | {:backoff, %{batch_size: non_neg_integer() | nil}} | {:acks, %{uuids: [String.t()], beacons: [String.t()]}} | {:results, %{successes: [batch_result()], errors: [batch_result()]}} | {:unknown, map()}
@type stream_handle() :: GRPC.Client.Stream.t()
Functions
Builds a BatchObject from a map.
Builds a BatchReference from a map.
@spec close(stream_handle()) :: :ok | {:error, term()}
Closes the batch stream gracefully.
Sends a stop message and waits for the server to acknowledge shutdown.
@spec consistency_level_to_proto(consistency_level()) :: atom() | nil
Converts a consistency level atom to the protobuf enum value.
Creates a Data message containing objects and/or references.
@spec open( GRPC.Channel.t(), keyword() ) :: {:ok, stream_handle()} | {:error, term()}
Opens a bidirectional batch stream on the given gRPC channel.
Options
:timeout- Stream open timeout in milliseconds (default: 30000):metadata- Additional gRPC metadata headers
Parses a BatchStreamReply into a more usable format.
@spec receive_results(stream_handle(), timeout()) :: {:ok, term()} | {:error, term()}
Receives results from the batch stream.
Returns the next reply from the stream, or an error if the stream is closed or times out.
@spec send_objects(stream_handle(), [object()]) :: stream_handle()
Sends objects through an open batch stream.
@spec send_references(stream_handle(), [batch_ref()]) :: stream_handle()
Sends references through an open batch stream.
Creates a Start message for initiating the batch stream.
Options
:consistency_level- Consistency level (:all, :quorum, :one)
@spec stop_message() :: struct()
Creates a Stop message for gracefully closing the batch stream.