View Source GRPC.Stub (grpc v0.9.0)

A module acting as the interface for gRPC client.

You can do everything in the client side via GRPC.Stub, including connecting, sending/receiving streaming or non-streaming requests, canceling calls and so on.

A service is needed to define a stub:

defmodule Greeter.Service do
  use GRPC.Service, name: "ping"

  rpc :SayHello, Request, Reply
  rpc :SayGoodbye, stream(Request), stream(Reply)
end

defmodule Greeter.Stub do
  use GRPC.Stub, service: Greeter.Service
end

so that functions say_hello/2 and say_goodbye/1 will be generated for you:

# Unary call
{:ok, reply} = Greeter.Stub.say_hello(channel, request)

# Streaming call
stream = Greeter.Stub.say_goodbye(channel)
GRPC.Stub.send_request(stream, request, end_stream: true)
{:ok, reply_enum} = GRPC.Stub.recv(stream)
replies = Enum.map(reply_enum, fn({:ok, reply}) -> reply end)

Note that streaming calls are very different with unary calls. If request is streaming, the RPC function only accepts channel as argument and returns a GRPC.Client.Stream. You can send streaming requests one by one via send_request/3, then use recv/1 to receive the reply. And if the reply is streaming, recv/1 returns a Stream.

You can refer to call/6 for doc of your RPC functions.

Summary

Functions

Cancel a stream in a streaming client.

Establish a connection with gRPC server and return GRPC.Channel needed for sending requests.

Disconnects the adapter and frees any resources the adapter is consuming

Send END_STREAM frame to end the stream.

Receive replies when requests are streaming.

Types

@type receive_data_return() ::
  {:ok, struct()}
  | {:ok, struct(), map()}
  | {:ok, Enumerable.t()}
  | {:ok, Enumerable.t(), map()}
@type rpc_return() ::
  GRPC.Client.Stream.t() | {:error, GRPC.RPCError.t()} | receive_data_return()

Functions

Cancel a stream in a streaming client.

After that, callings to recv/2 will return a CANCEL error.

Link to this function

connect(addr, opts \\ [])

View Source
@spec connect(
  String.t(),
  keyword()
) :: {:ok, GRPC.Channel.t()} | {:error, any()}

Establish a connection with gRPC server and return GRPC.Channel needed for sending requests.

Examples

iex> GRPC.Stub.connect("localhost:50051")
{:ok, channel}

iex> GRPC.Stub.connect("localhost:50051", accepted_compressors: [GRPC.Compressor.Gzip])
{:ok, channel}

iex> GRPC.Stub.connect("/paht/to/unix.sock")
{:ok, channel}

Options

  • :cred - a GRPC.Credential used to indicate it's a secure connection. An insecure connection will be created without this option.
  • :adapter - custom client adapter
  • :interceptors - client interceptors
  • :codec - client will use this to encode and decode binary message
  • :compressor - the client will use this to compress requests and decompress responses. If this is set, accepted_compressors will be appended also, so this can be used safely without :accepted_compressors.
  • :accepted_compressors - tell servers accepted compressors, this can be used without :compressor
  • :headers - headers to attach to each request
Link to this function

connect(host, port, opts)

View Source
@spec connect(
  String.t() | {:local, String.t()},
  binary() | non_neg_integer(),
  keyword()
) :: {:ok, GRPC.Channel.t()} | {:error, any()}
@spec disconnect(GRPC.Channel.t()) :: {:ok, GRPC.Channel.t()} | {:error, any()}

Disconnects the adapter and frees any resources the adapter is consuming

@spec end_stream(GRPC.Client.Stream.t()) :: GRPC.Client.Stream.t()

Send END_STREAM frame to end the stream.

The stream will be in half_closed state after this is called.

Examples

iex> stream = GRPC.Stub.send_request(stream, request)
iex> GRPC.Stub.end_stream(stream)
Link to this function

recv(stream, opts \\ [])

View Source
@spec recv(
  GRPC.Client.Stream.t(),
  keyword()
) ::
  {:ok, struct()}
  | {:ok, struct(), map()}
  | {:ok, Enumerable.t()}
  | {:ok, Enumerable.t(), map()}
  | {:error, any()}

Receive replies when requests are streaming.

  • If the reply is not streaming, a normal reply struct will be returned
  • If the reply is streaming, a enumerable Stream will be returned. You can use Enum to fetch further replies or Stream to manipulate it. Each item in the Enumerable is a tuple {:ok, reply} or {:error, error}. When :return_headers is true, the last item in the Enumerable will be {:trailers, map}

Examples

# Reply is not streaming
{:ok, reply} = GRPC.Stub.recv(stream)

# Reply is streaming
{:ok, ex_stream} = GRPC.Stub.recv(stream)
replies = Enum.map(ex_stream, fn({:ok, reply}) -> reply end)

Options

  • :timeout - request timeout
  • :deadline - when the request is timeout, will override timeout
  • :return_headers - when true, headers will be returned.

Stream behavior

We build the Stream struct using Stream.unfold/2.

The unfold function is built in such a way that - for both adapters - the accumulator is a map used to find the connection_streamprocess and the next_fun argument is a function that reads directly from the connection_stream that is producing data. Every time we execute next_fun we read a chunk of data. This means that next_fun will have the side effect of updating the state of the connection_stream process, removing the chunk of data that's being read from the underlying GenServer's state.

Examples

iex> ex_stream |> Stream.take(1) |> Enum.to_list()
[1]
iex> ex_stream |> Enum.to_list()
[2, 3]
iex> ex_stream |> Enum.to_list()
[]
Link to this function

send_request(stream, request, opts \\ [])

View Source
@spec send_request(GRPC.Client.Stream.t(), struct(), keyword()) ::
  GRPC.Client.Stream.t()

Send streaming requests.

The last request can be sent with :end_stream option, or you can call end_stream/1 to send a frame with END_STREAM flag to end the stream.

Options

  • :end_stream - indicates it's the last one request, then the stream will be in half_closed state. Default is false.