View Source GRPC.Stub (grpc v0.9.0)
A module acting as the interface for gRPC client.
You can do everything in the client side via GRPC.Stub
, including connecting,
sending/receiving streaming or non-streaming requests, canceling calls and so on.
A service is needed to define a stub:
defmodule Greeter.Service do
use GRPC.Service, name: "ping"
rpc :SayHello, Request, Reply
rpc :SayGoodbye, stream(Request), stream(Reply)
end
defmodule Greeter.Stub do
use GRPC.Stub, service: Greeter.Service
end
so that functions say_hello/2
and say_goodbye/1
will be generated for you:
# Unary call
{:ok, reply} = Greeter.Stub.say_hello(channel, request)
# Streaming call
stream = Greeter.Stub.say_goodbye(channel)
GRPC.Stub.send_request(stream, request, end_stream: true)
{:ok, reply_enum} = GRPC.Stub.recv(stream)
replies = Enum.map(reply_enum, fn({:ok, reply}) -> reply end)
Note that streaming calls are very different with unary calls. If request is
streaming, the RPC function only accepts channel as argument and returns a
GRPC.Client.Stream
. You can send streaming requests one by one via send_request/3
,
then use recv/1
to receive the reply. And if the reply is streaming, recv/1
returns a Stream
.
You can refer to call/6
for doc of your RPC functions.
Summary
Functions
Cancel a stream in a streaming client.
Establish a connection with gRPC server and return GRPC.Channel
needed for
sending requests.
Disconnects the adapter and frees any resources the adapter is consuming
Send END_STREAM frame to end the stream.
Receive replies when requests are streaming.
Send streaming requests.
Types
@type receive_data_return() :: {:ok, struct()} | {:ok, struct(), map()} | {:ok, Enumerable.t()} | {:ok, Enumerable.t(), map()}
@type rpc_return() :: GRPC.Client.Stream.t() | {:error, GRPC.RPCError.t()} | receive_data_return()
Functions
Cancel a stream in a streaming client.
After that, callings to recv/2
will return a CANCEL error.
@spec connect( String.t(), keyword() ) :: {:ok, GRPC.Channel.t()} | {:error, any()}
Establish a connection with gRPC server and return GRPC.Channel
needed for
sending requests.
Examples
iex> GRPC.Stub.connect("localhost:50051")
{:ok, channel}
iex> GRPC.Stub.connect("localhost:50051", accepted_compressors: [GRPC.Compressor.Gzip])
{:ok, channel}
iex> GRPC.Stub.connect("/paht/to/unix.sock")
{:ok, channel}
Options
:cred
- aGRPC.Credential
used to indicate it's a secure connection. An insecure connection will be created without this option.:adapter
- custom client adapter:interceptors
- client interceptors:codec
- client will use this to encode and decode binary message:compressor
- the client will use this to compress requests and decompress responses. If this is set, accepted_compressors will be appended also, so this can be used safely without:accepted_compressors
.:accepted_compressors
- tell servers accepted compressors, this can be used without:compressor
:headers
- headers to attach to each request
@spec connect( String.t() | {:local, String.t()}, binary() | non_neg_integer(), keyword() ) :: {:ok, GRPC.Channel.t()} | {:error, any()}
@spec disconnect(GRPC.Channel.t()) :: {:ok, GRPC.Channel.t()} | {:error, any()}
Disconnects the adapter and frees any resources the adapter is consuming
@spec end_stream(GRPC.Client.Stream.t()) :: GRPC.Client.Stream.t()
Send END_STREAM frame to end the stream.
The stream will be in half_closed state after this is called.
Examples
iex> stream = GRPC.Stub.send_request(stream, request)
iex> GRPC.Stub.end_stream(stream)
@spec recv( GRPC.Client.Stream.t(), keyword() ) :: {:ok, struct()} | {:ok, struct(), map()} | {:ok, Enumerable.t()} | {:ok, Enumerable.t(), map()} | {:error, any()}
Receive replies when requests are streaming.
- If the reply is not streaming, a normal reply struct will be returned
- If the reply is streaming, a enumerable
Stream
will be returned. You can useEnum
to fetch further replies orStream
to manipulate it. Each item in theEnumerable
is a tuple{:ok, reply}
or{:error, error}
. When:return_headers
is true, the last item in theEnumerable
will be{:trailers, map}
Examples
# Reply is not streaming
{:ok, reply} = GRPC.Stub.recv(stream)
# Reply is streaming
{:ok, ex_stream} = GRPC.Stub.recv(stream)
replies = Enum.map(ex_stream, fn({:ok, reply}) -> reply end)
Options
:timeout
- request timeout:deadline
- when the request is timeout, will override timeout:return_headers
- when true, headers will be returned.
Stream behavior
We build the Stream struct using Stream.unfold/2
.
The unfold function is built in such a way that - for both adapters - the accumulator is a map used to find the
connection_stream
process and the next_fun
argument is a function that reads directly from the connection_stream
that is producing data.
Every time we execute next_fun
we read a chunk of data. This means that next_fun
will have the side effect of updating the state of the connection_stream
process, removing the chunk of data that's being read from the underlying GenServer
's state.
Examples
iex> ex_stream |> Stream.take(1) |> Enum.to_list()
[1]
iex> ex_stream |> Enum.to_list()
[2, 3]
iex> ex_stream |> Enum.to_list()
[]
@spec send_request(GRPC.Client.Stream.t(), struct(), keyword()) :: GRPC.Client.Stream.t()
Send streaming requests.
The last request can be sent with :end_stream
option, or you can call end_stream/1
to send a frame with END_STREAM flag to end the stream.
Options
:end_stream
- indicates it's the last one request, then the stream will be in half_closed state. Default is false.