View Source GRPC.Stream (grpc v0.10.2)
Provides a Flow
-based abstraction layer for building gRPC streaming pipelines in Elixir.
This module allows you to consume gRPC request streams as Flow
pipelines with support for
backpressure via GenStage. You can also produce gRPC responses by materializing a Flow
back into the gRPC stream.
Capabilities
- Transforms an incoming gRPC request stream into a
Flow
with backpressure. - Emits messages back into the gRPC response stream using
run_with/3
. - Supports joining with external producers (e.g., RabbitMQ, Kafka) for unbounded or fan-in stream sources.
- Offers composable functional operators (
map/2
,filter/2
,flat_map/2
, etc.) on the stream.
Example: Bidirectional Streaming
defmodule MyGRPCService do
use GRPC.Server, service: MyService.Service
def route_chat(input, materializer) do
GRPC.Stream.from(input, max_demand: 10)
|> GRPC.Stream.map(fn note -> process_note(note) end)
|> GRPC.Stream.run_with(materializer)
end
defp process_note(note), do: %Response{message: "Received"}
end
Example: Joining with an External Producer
When integrating with external unbounded sources (e.g., message queues),
you can pass a running GenStage
producer using the :join_with
option:
defmodule MyGRPCService do
use GRPC.Server, service: MyService.Service
def stream_events(input, materializer) do
{:ok, pid} = MyApp.RabbitMQ.Producer.start_link([])
GRPC.Stream.from(input, join_with: pid, max_demand: 10)
|> GRPC.Stream.map(&handle_event/1)
|> GRPC.Stream.run_with(materializer)
end
defp handle_event({_, msg}), do: msg
defp handle_event(event), do: %MyGRPC.Event{data: inspect(event)}
end
Summary
Functions
Sends a request to an external process and awaits a response.
Same as ask/3
, but raises an exception on failure.
Filters the stream using the given predicate function.
Applies a function to each entry and concatenates the resulting lists.
Converts a gRPC input into a Flow
pipeline with backpressure support.
Retrieves HTTP/2 headers from a GRPC.Server.Stream
.
Applies a function to each stream item.
Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.
Partitions the stream to allow grouping of items by key or condition.
Reduces items in the stream using an accumulator.
Executes the underlying Flow
for unary streams and emits responses into the provided gRPC server stream.
Executes the flow and emits responses into the provided gRPC server stream.
Extracts the underlying Flow
pipeline from a GRPC.Stream
.
Converts a single gRPC request into a Flow
pipeline with support for backpressure.
This is useful for unary gRPC requests where you want to use the Flow API.
Emits only distinct items from the stream. See uniq_by/2
for more information.
Emits only unique items as determined by the result of the given function.
Types
Functions
Sends a request to an external process and awaits a response.
If target
is a PID, a message in the format {:request, item, from}
is sent, and a reply
in the format {:response, msg}
is expected.
If target
is an atom
we will try to locate the process through Process.whereis/1
.
Parameters
stream
: TheGRPC.Stream
pipeline.target
: Target process PID or atom name.timeout
: Timeout in milliseconds (defaults to5000
).
Returns
- Updated stream if successful.
{:error, item, reason}
if the request fails or times out.
@spec ask!(t(), pid() | atom(), non_neg_integer()) :: t()
Same as ask/3
, but raises an exception on failure.
Caution
This version propagates errors via raised exceptions, which can crash the Flow worker and halt the pipeline.
Prefer ask/3
for production usage unless failure should abort the stream.
Filters the stream using the given predicate function.
The filter function is applied concurrently to the stream entries, so it shouldn't rely on execution order.
@spec flat_map(t(), (term() -> Enumerable.t())) :: t()
Applies a function to each entry and concatenates the resulting lists.
Useful for emitting multiple messages for each input.
Converts a gRPC input into a Flow
pipeline with backpressure support.
Parameters
input
: A gRPC request stream (struct, enumerable, or ElixirStream
).
Options
:join_with
— An optional externalGenStage
producer to merge with the gRPC input.:dispatcher
— Specifies theFlow
dispatcher (defaults toGenStage.DemandDispatcher
).:propagate_context
- Iftrue
, the context from thematerializer
is propagated to theFlow
.:materializer
- The%GRPC.Server.Stream{}
struct representing the current gRPC stream context.
And any other options supported by Flow
.
Returns
A GRPC.Stream
struct that represents the transformed stream.
Example
flow = GRPC.Stream.from(request, max_demand: 50)
@spec get_headers(GRPC.Server.Stream.t()) :: map()
Retrieves HTTP/2 headers from a GRPC.Server.Stream
.
Client Note
To receive headers on the client side, use the :return_headers
option. See GRPC.Stub
.
Applies a function to each stream item.
Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.
Partitions the stream to allow grouping of items by key or condition.
Use this before stateful operations such as reduce/3
.
Note
Excessive use of partitioning can impact performance and memory usage. Only partition when required for correctness or performance. See https://hexdocs.pm/flow/Flow.html#module-partitioning for more details.
Reduces items in the stream using an accumulator.
Parameters
acc_fun
initializes the accumulator,reducer_fun
updates it for each input.
Note
See https://hexdocs.pm/flow/Flow.html#reduce/3 for more details.
Executes the underlying Flow
for unary streams and emits responses into the provided gRPC server stream.
Parameters
flow
: AGRPC.Stream
struct containing the flow to be executed.stream
: AGRPC.Server.Stream
to which responses are sent.:dry_run
— Iftrue
, responses are not sent (used for testing or inspection).
Example
GRPC.Stream.run(request)
Executes the flow and emits responses into the provided gRPC server stream.
Parameters
flow
: AGRPC.Stream
struct containing the flow to be executed.stream
: AGRPC.Server.Stream
to which responses are sent.
Options
:dry_run
— Iftrue
, responses are not sent (used for testing or inspection).
Returns
:ok
if the stream was processed successfully.
Example
GRPC.Stream.run_with(request, mat)
Extracts the underlying Flow
pipeline from a GRPC.Stream
.
Raises an ArgumentError
if the Flow
has not been initialized.
Returns
A Flow
pipeline.
Converts a single gRPC request into a Flow
pipeline with support for backpressure.
This is useful for unary gRPC requests where you want to use the Flow API.
Parameters
input
: The single gRPC message to convert into a Flow.
Options
:join_with
- An optional additional producer stage PID to include in the Flow.:dispatcher
- An optionalGenStage
dispatcher to use in the underlyingFlow
. Defaults toGenStage.DemandDispatcher
.:propagate_context
- Iftrue
, the context from thematerializer
is propagated to theFlow
.:materializer
- The%GRPC.Server.Stream{}
struct representing the current gRPC stream context.
And any other options supported by Flow
.
Returns
- A
GRPCStream
that emits the single gRPC message under demand.
Example
flow = GRPCStream.single(request, max_demand: 5)
Emits only distinct items from the stream. See uniq_by/2
for more information.
Emits only unique items as determined by the result of the given function.
Note
This function requires care when used for unbounded flows. For more information see https://hexdocs.pm/flow/Flow.html#uniq_by/2