GRPC.Stream (gRPC v0.11.4)
View SourceProvides a Flow-based abstraction layer for building gRPC streaming pipelines in Elixir.
This module allows you to consume gRPC request streams as Flow pipelines with support for
backpressure via GenStage. You can also produce gRPC responses by materializing a Flow
back into the gRPC stream.
Capabilities
- Transforms an incoming gRPC request stream into a
Flowwith backpressure. - Emits messages back into the gRPC response stream using
run_with/3. - Supports joining with external producers (e.g., RabbitMQ, Kafka) for unbounded or fan-in stream sources.
- Offers composable functional operators (
map/2,filter/2,flat_map/2, etc.) on the stream.
Example: Bidirectional Streaming
defmodule MyGRPCService do
use GRPC.Server, service: MyService.Service
def route_chat(input, materializer) do
GRPC.Stream.from(input, max_demand: 10)
|> GRPC.Stream.map(fn note -> process_note(note) end)
|> GRPC.Stream.run_with(materializer)
end
defp process_note(note), do: %Response{message: "Received"}
endExample: Joining with an External Producer
When integrating with external unbounded sources (e.g., message queues),
you can pass a running GenStage producer using the :join_with option:
defmodule MyGRPCService do
use GRPC.Server, service: MyService.Service
def stream_events(input, materializer) do
{:ok, pid} = MyApp.RabbitMQ.Producer.start_link([])
GRPC.Stream.from(input, join_with: pid, max_demand: 10)
|> GRPC.Stream.map(&handle_event/1)
|> GRPC.Stream.run_with(materializer)
end
defp handle_event({_, msg}), do: msg
defp handle_event(event), do: %MyGRPC.Event{data: inspect(event)}
end
Summary
Functions: Creation
Converts a gRPC input into a Flow pipeline with backpressure support.
Converts a single gRPC request into a Flow pipeline with support for backpressure.
This is useful for unary gRPC requests where you want to use the Flow API.
Functions: Materializers
Executes the underlying Flow for a unary stream.
Executes the flow and emits responses into the provided gRPC server stream.
Functions: Transformers
Filters the stream using the given predicate function.
Applies a function to each entry and concatenates the resulting lists.
Applies a function to each stream item.
Intercepts and transforms error tuples or unexpected exceptions that occur within a gRPC stream pipeline.
Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.
Partitions the stream to allow grouping of items by key or condition.
Reduces items in the stream using an accumulator.
Extracts the underlying Flow pipeline from a GRPC.Stream.
Emits only distinct items from the stream. See uniq_by/2 for more information.
Emits only unique items as determined by the result of the given function.
Functions: Actions
Sends a request to an external process and awaits a response.
Same as ask/3, but raises an exception on failure.
Applies a side-effect function to each element of the stream without altering its values.
Functions
Retrieves HTTP/2 headers from a GRPC.Server.Stream.
Functions: Creation
Converts a gRPC input into a Flow pipeline with backpressure support.
Parameters
input: A gRPC request stream (struct, enumerable, or ElixirStream).
Options
:join_with— An optional externalGenStageproducer to merge with the gRPC input.:dispatcher— Specifies theFlowdispatcher (defaults toGenStage.DemandDispatcher).:propagate_context- Iftrue, the context from thematerializeris propagated to theFlow.:materializer- The%GRPC.Server.Stream{}struct representing the current gRPC stream context.
And any other options supported by Flow.
Returns
A GRPC.Stream struct that represents the transformed stream.
Example
flow = GRPC.Stream.from(request, max_demand: 50)
Converts a single gRPC request into a Flow pipeline with support for backpressure.
This is useful for unary gRPC requests where you want to use the Flow API.
Parameters
input: The single gRPC message to convert into a Flow.
Options
:join_with- An optional additional producer stage PID to include in the Flow.:dispatcher- An optionalGenStagedispatcher to use in the underlyingFlow. Defaults toGenStage.DemandDispatcher.:propagate_context- Iftrue, the context from thematerializeris propagated to theFlow.:materializer- The%GRPC.Server.Stream{}struct representing the current gRPC stream context.
And any other options supported by Flow.
Returns
- A
GRPC.Streamthat emits the single gRPC message under demand.
Example
flow = GRPC.Stream.unary(request, max_demand: 5)
Functions: Materializers
@spec run(stream :: t()) :: :noreply
Executes the underlying Flow for a unary stream.
The response will be emitted automatically to the provided
:materializer (set to a GRPC.Server.Stream) for the single resulting
item in the materialized enumerable.
The stream argument must be initialized as a :unary stream with
a :materializer set.
Example
def say_unary_hello(request, mat) do
GRPC.Stream.unary(request, materializer: mat)
|> GRPC.Stream.map(fn
%HelloReply{} = reply ->
%HelloReply{message: "[Reply] message"}
{:error, _reason} ->
GRPC.RPCError.exception(message: "[Error] Something bad happened")
end)
|> GRPC.Stream.run()
end
Executes the flow and emits responses into the provided gRPC server stream.
Parameters
flow: AGRPC.Streamstruct containing the flow to be executed.stream: AGRPC.Server.Streamto which responses are sent.
Options
:dry_run— Iftrue, responses are not sent (used for testing or inspection).
Returns
:okif the stream was processed successfully.
Example
def say_bid_stream_hello(request, materializer) do
output_stream =
Stream.repeatedly(fn ->
%HelloReply{message: "I'm the Server ;)"}
end)
GRPC.Stream.from(request, join_with: output_stream)
|> GRPC.Stream.map(fn
%HelloRequest{} = _hello ->
%HelloReply{message: "Welcome Sr!"}
{:error, _reason} ->
GRPC.RPCError.exception(message: "[Error] Something bad happened")
end)
|> GRPC.Stream.run_with(materializer)
end
Functions: Transformers
Filters the stream using the given predicate function.
The filter function is applied concurrently to the stream entries, so it shouldn't rely on execution order.
@spec flat_map(t(), (term() -> Enumerable.t())) :: t()
Applies a function to each entry and concatenates the resulting lists.
Useful for emitting multiple messages for each input.
Applies a function to each stream item.
Intercepts and transforms error tuples or unexpected exceptions that occur within a gRPC stream pipeline.
map_error/3 allows graceful handling or recovery from errors produced by previous
operators (e.g. map/2, flat_map/2) or from validation logic applied to incoming data.
The provided handler/1 function receives the error reason (or the exception struct) like:
{:error, reason} -> failure
{:error, {:exception, exception}} -> failure due to exception
{:error, {kind, reason}} -> failure due to throw or exitAnd can either:
- Return a new error tuple — e.g.
{:error, new_reason}— to re-emit a modified error. - Return any other value to recover from the failure and continue the pipeline.
This makes it suitable for both input validation and capturing unexpected runtime errors in stream transformations.
Parameters
stream— The input stream orFlowpipeline.func— A function that takes an error reason or exception and returns either a new value or an error tuple.
Returns
- A new stream where all error tuples and raised exceptions are processed by
func/1.
Examples
iex> GRPC.Stream.from([1, 2])
...> |> GRPC.Stream.map(fn
...> 2 -> raise "boom"
...> x -> x
...> end)
...> |> GRPC.Stream.map_error(fn
...> {:error, {:exception, _reason}} ->
...> {:error, GRPC.RPCError.exception(message: "Validation or runtime error")}
...> end)In this example:
- The call to
GRPC.Stream.map/2raises an exception for value2. map_error/3catches the error and wraps it in aGRPC.RPCErrorstruct with a custom message.- The pipeline continues execution, transforming errors into structured responses.
Notes
map_error/3is lazy and only executes when the stream is materialized (viaGRPC.Stream.run/1orGRPC.Stream.run_with/3).Use this operator to implement robust error recovery, input validation, or to normalize exceptions from downstream Flow stages into well-defined gRPC errors.
Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.
Partitions the stream to allow grouping of items by key or condition.
Use this before stateful operations such as reduce/3.
Note
Excessive use of partitioning can impact performance and memory usage. Only partition when required for correctness or performance. See https://hexdocs.pm/flow/Flow.html#module-partitioning for more details.
Reduces items in the stream using an accumulator.
Parameters
acc_funinitializes the accumulator,reducer_funupdates it for each input.
Note
See https://hexdocs.pm/flow/Flow.html#reduce/3 for more details.
Extracts the underlying Flow pipeline from a GRPC.Stream.
Raises an ArgumentError if the Flow has not been initialized.
Returns
A Flow pipeline.
Emits only distinct items from the stream. See uniq_by/2 for more information.
Emits only unique items as determined by the result of the given function.
Note
This function requires care when used for unbounded flows. For more information see https://hexdocs.pm/flow/Flow.html#uniq_by/2
Functions: Actions
@spec ask(t(), pid() | atom(), non_neg_integer()) :: t() | {:error, :timeout | :process_not_alive}
Sends a request to an external process and awaits a response.
If target is a PID, a message in the format {:request, item, from} is sent, and a reply
in the format {:response, msg} is expected.
If target is an atom we will try to locate the process through Process.whereis/1.
Parameters
stream: TheGRPC.Streampipeline.target: Target process PID or atom name.timeout: Timeout in milliseconds (defaults to5000).
@spec ask!(t(), pid() | atom(), non_neg_integer()) :: t()
Same as ask/3, but raises an exception on failure.
Caution
This version propagates errors via raised exceptions, which can crash the Flow worker and halt the pipeline.
Prefer ask/3 for production usage unless failure should abort the stream.
Applies a side-effect function to each element of the stream without altering its values.
The effect/2 function is useful for performing imperative or external actions
(such as logging, sending messages, collecting metrics, or debugging)
while preserving the original stream data.
It behaves like Enum.each/2, but returns the stream itself so it can continue in the pipeline.
Examples
iex> parent = self()
iex> stream =
...> GRPC.Stream.from([1, 2, 3])
...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end)
...> |> GRPC.Stream.to_flow()
...> |> Enum.to_list()
iex> assert_receive {:seen, 2}
iex> assert_receive {:seen, 4}
iex> assert_receive {:seen, 6}
iex> stream
[1, 2, 3]In this example, the effect/2 function sends a message to the current process for each element in the stream, but the resulting stream values remain unchanged.
Parameters
stream— The inputGRPC.Stream.effect_fun— A function that receives each item and performs a side effect (e.g. IO.inspect/1, Logger.info/1, send/2, etc.).
Notes
- This function is lazy — the
effect_funwill only run once the stream is materialized (e.g. viaGRPC.Stream.run/1orGRPC.Stream.run_with/3). - The use of
effect/2ensures that the original item is returned unchanged, enabling seamless continuation of the pipeline.
Types
Functions
@spec get_headers(GRPC.Server.Stream.t()) :: map()
Retrieves HTTP/2 headers from a GRPC.Server.Stream.
Client Note
To receive headers on the client side, use the :return_headers option. See GRPC.Stub.