Milvex.RPC (milvex v0.10.2)

Copy Markdown

Low-level gRPC wrapper with consistent error handling.

Provides helper functions for making gRPC calls and converting Milvus proto Status codes and gRPC errors to Splode errors.

Usage

alias Milvex.RPC
alias Milvex.Milvus.Proto.Milvus.MilvusService

# Make a gRPC call with automatic error conversion
case RPC.call(channel, MilvusService.Stub, :show_collections, request) do
  {:ok, response} -> handle_response(response)
  {:error, error} -> handle_error(error)
end

# Check and convert a Milvus Status
case RPC.check_status(status, "CreateCollection") do
  :ok -> :ok
  {:error, error} -> {:error, error}
end

Summary

Functions

Call a gRPC method with automatic error conversion.

Checks if a response has an embedded status field and validates it.

Checks a Milvus Status and converts to an error if not successful.

Converts a GRPC.RPCError to a Splode error.

Converts a Milvus proto Status to a Splode error.

Extracts the response if status is successful, otherwise returns error.

Types

rpc_result()

@type rpc_result() :: {:ok, struct()} | {:error, Milvex.Error.t()}

Functions

call(channel_fn, stub_module, method, request, opts \\ [])

@spec call(
  (-> {:ok, GRPC.Channel.t(), map()} | {:error, term()}),
  module(),
  atom(),
  struct(),
  keyword()
) :: rpc_result()

Call a gRPC method with automatic error conversion.

The channel function is called on each retry attempt to obtain a fresh channel, allowing recovery from dead connections.

Parameters

  • channel_fn - (-> {:ok, GRPC.Channel.t(), Config.t()} | {:error, term()})

  • stub_module - The generated gRPC stub module (e.g., MilvusService.Stub)
  • method - The RPC method name as an atom (e.g., :show_collections)
  • request - The request struct
  • opts - Options to pass to the gRPC call (e.g., [timeout: 10_000])

Returns

  • {:ok, response} on success
  • {:error, error} on failure (Connection or Grpc error)

Examples

RPC.call(
  fn -> Connection.get_channel(conn) end,
  MilvusService.Stub,
  :search,
  request,
  timeout: 30_000
)

check_response_status(arg1, operation)

@spec check_response_status(map(), String.t()) :: :ok | {:error, Milvex.Error.t()}

Checks if a response has an embedded status field and validates it.

Use this for responses that embed a Status struct rather than returning it directly.

Examples

response = %{status: %Status{code: 0}, collections: [...]}
case RPC.check_response_status(response, "ShowCollections") do
  :ok -> {:ok, response}
  {:error, error} -> {:error, error}
end

check_status(status, operation)

@spec check_status(Milvex.Milvus.Proto.Common.Status.t() | nil, String.t()) ::
  :ok | {:error, Milvex.Error.t()}

Checks a Milvus Status and converts to an error if not successful.

Many Milvus RPC calls return a Status struct. Use this function to check if the operation succeeded and convert to a proper error if not.

Parameters

Returns

  • :ok if status indicates success (code 0)
  • {:error, error} if status indicates failure

Examples

case RPC.check_status(response.status, "CreateCollection") do
  :ok -> {:ok, :created}
  {:error, error} -> {:error, error}
end

grpc_error_to_error(rpc_error, operation)

@spec grpc_error_to_error(GRPC.RPCError.t(), String.t()) :: Milvex.Errors.Grpc.t()

Converts a GRPC.RPCError to a Splode error.

Parameters

  • grpc_error - The GRPC.RPCError struct
  • operation - A string describing the operation (for error context)

Returns

A Milvex.Error.t() representing the gRPC error.

status_to_error(arg1, operation)

@spec status_to_error(Milvex.Milvus.Proto.Common.Status.t() | nil, String.t()) ::
  Milvex.Errors.Grpc.t()

Converts a Milvus proto Status to a Splode error.

Parameters

Returns

A Milvex.Error.t() representing the status error.

with_status_check(response, operation)

@spec with_status_check(map(), String.t()) ::
  {:ok, map()} | {:error, Milvex.Error.t()}

Extracts the response if status is successful, otherwise returns error.

This is a convenience function that combines status checking with response extraction.

Examples

case RPC.with_status_check(response, "ShowCollections") do
  {:ok, response} -> {:ok, response.collection_names}
  {:error, error} -> {:error, error}
end