GRPC.Stream (gRPC v0.11.4)

View Source

Provides a Flow-based abstraction layer for building gRPC streaming pipelines in Elixir.

This module allows you to consume gRPC request streams as Flow pipelines with support for backpressure via GenStage. You can also produce gRPC responses by materializing a Flow back into the gRPC stream.

Capabilities

  • Transforms an incoming gRPC request stream into a Flow with backpressure.
  • Emits messages back into the gRPC response stream using run_with/3.
  • Supports joining with external producers (e.g., RabbitMQ, Kafka) for unbounded or fan-in stream sources.
  • Offers composable functional operators (map/2, filter/2, flat_map/2, etc.) on the stream.

Example: Bidirectional Streaming

defmodule MyGRPCService do
  use GRPC.Server, service: MyService.Service

  def route_chat(input, materializer) do
    GRPC.Stream.from(input, max_demand: 10)
    |> GRPC.Stream.map(fn note -> process_note(note) end)
    |> GRPC.Stream.run_with(materializer)
  end

  defp process_note(note), do: %Response{message: "Received"}
end

Example: Joining with an External Producer

When integrating with external unbounded sources (e.g., message queues), you can pass a running GenStage producer using the :join_with option:

defmodule MyGRPCService do
  use GRPC.Server, service: MyService.Service

  def stream_events(input, materializer) do
    {:ok, pid} = MyApp.RabbitMQ.Producer.start_link([])

    GRPC.Stream.from(input, join_with: pid, max_demand: 10)
    |> GRPC.Stream.map(&handle_event/1)
    |> GRPC.Stream.run_with(materializer)
  end

  defp handle_event({_, msg}), do: msg
  defp handle_event(event), do: %MyGRPC.Event{data: inspect(event)}
end

Summary

Functions: Creation

Converts a gRPC input into a Flow pipeline with backpressure support.

Converts a single gRPC request into a Flow pipeline with support for backpressure. This is useful for unary gRPC requests where you want to use the Flow API.

Functions: Materializers

Executes the underlying Flow for a unary stream.

Executes the flow and emits responses into the provided gRPC server stream.

Functions: Transformers

Filters the stream using the given predicate function.

Applies a function to each entry and concatenates the resulting lists.

Applies a function to each stream item.

Intercepts and transforms error tuples or unexpected exceptions that occur within a gRPC stream pipeline.

Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.

Partitions the stream to allow grouping of items by key or condition.

Reduces items in the stream using an accumulator.

Extracts the underlying Flow pipeline from a GRPC.Stream.

Emits only distinct items from the stream. See uniq_by/2 for more information.

Emits only unique items as determined by the result of the given function.

Functions: Actions

Sends a request to an external process and awaits a response.

Same as ask/3, but raises an exception on failure.

Applies a side-effect function to each element of the stream without altering its values.

Functions

Retrieves HTTP/2 headers from a GRPC.Server.Stream.

Functions: Creation

from(input, opts \\ [])

@spec from(any(), Keyword.t()) :: t()

Converts a gRPC input into a Flow pipeline with backpressure support.

Parameters

  • input: A gRPC request stream (struct, enumerable, or Elixir Stream).

Options

  • :join_with — An optional external GenStage producer to merge with the gRPC input.
  • :dispatcher — Specifies the Flow dispatcher (defaults to GenStage.DemandDispatcher).
  • :propagate_context - If true, the context from the materializer is propagated to the Flow.
  • :materializer - The %GRPC.Server.Stream{} struct representing the current gRPC stream context.

And any other options supported by Flow.

Returns

A GRPC.Stream struct that represents the transformed stream.

Example

flow = GRPC.Stream.from(request, max_demand: 50)

unary(input, opts \\ [])

@spec unary(any(), Keyword.t()) :: t()

Converts a single gRPC request into a Flow pipeline with support for backpressure. This is useful for unary gRPC requests where you want to use the Flow API.

Parameters

  • input: The single gRPC message to convert into a Flow.

Options

  • :join_with - An optional additional producer stage PID to include in the Flow.
  • :dispatcher - An optional GenStage dispatcher to use in the underlying Flow. Defaults to GenStage.DemandDispatcher.
  • :propagate_context - If true, the context from the materializer is propagated to the Flow.
  • :materializer - The %GRPC.Server.Stream{} struct representing the current gRPC stream context.

And any other options supported by Flow.

Returns

  • A GRPC.Stream that emits the single gRPC message under demand.

Example

flow = GRPC.Stream.unary(request, max_demand: 5)

Functions: Materializers

run(stream)

@spec run(stream :: t()) :: :noreply

Executes the underlying Flow for a unary stream.

The response will be emitted automatically to the provided :materializer (set to a GRPC.Server.Stream) for the single resulting item in the materialized enumerable.

The stream argument must be initialized as a :unary stream with a :materializer set.

Example

def say_unary_hello(request, mat) do
  GRPC.Stream.unary(request, materializer: mat)
  |> GRPC.Stream.map(fn
    %HelloReply{} = reply ->
      %HelloReply{message: "[Reply] message"}

    {:error, _reason} ->
      GRPC.RPCError.exception(message: "[Error] Something bad happened")
  end)
  |> GRPC.Stream.run()
end

run_with(stream, from, opts \\ [])

@spec run_with(t(), Stream.t(), Keyword.t()) :: :ok

Executes the flow and emits responses into the provided gRPC server stream.

Parameters

Options

  • :dry_run — If true, responses are not sent (used for testing or inspection).

Returns

  • :ok if the stream was processed successfully.

Example

def say_bid_stream_hello(request, materializer) do
  output_stream =
    Stream.repeatedly(fn ->
      %HelloReply{message: "I'm the Server ;)"}
    end)

  GRPC.Stream.from(request, join_with: output_stream)
  |> GRPC.Stream.map(fn
    %HelloRequest{} = _hello ->
      %HelloReply{message: "Welcome Sr!"}

    {:error, _reason} ->
      GRPC.RPCError.exception(message: "[Error] Something bad happened")
  end)
  |> GRPC.Stream.run_with(materializer)
end

Functions: Transformers

filter(stream, filter)

@spec filter(t(), (term() -> term())) :: t()

Filters the stream using the given predicate function.

The filter function is applied concurrently to the stream entries, so it shouldn't rely on execution order.

flat_map(stream, flat_mapper)

@spec flat_map(t(), (term() -> Enumerable.t())) :: t()

Applies a function to each entry and concatenates the resulting lists.

Useful for emitting multiple messages for each input.

map(stream, mapper)

@spec map(t(), (term() -> term())) :: t()

Applies a function to each stream item.

map_error(stream, func)

Intercepts and transforms error tuples or unexpected exceptions that occur within a gRPC stream pipeline.

map_error/3 allows graceful handling or recovery from errors produced by previous operators (e.g. map/2, flat_map/2) or from validation logic applied to incoming data.

The provided handler/1 function receives the error reason (or the exception struct) like:

{:error, reason} -> failure
{:error, {:exception, exception}} -> failure due to exception
{:error, {kind, reason}} -> failure due to throw or exit

And can either:

  • Return a new error tuple — e.g. {:error, new_reason} — to re-emit a modified error.
  • Return any other value to recover from the failure and continue the pipeline.

This makes it suitable for both input validation and capturing unexpected runtime errors in stream transformations.

Parameters

  • stream — The input stream or Flow pipeline.
  • func — A function that takes an error reason or exception and returns either a new value or an error tuple.

Returns

  • A new stream where all error tuples and raised exceptions are processed by func/1.

Examples

iex> GRPC.Stream.from([1, 2])
...> |> GRPC.Stream.map(fn
...>   2 -> raise "boom"
...>   x -> x
...> end)
...> |> GRPC.Stream.map_error(fn
...>   {:error, {:exception, _reason}} ->
...>     {:error, GRPC.RPCError.exception(message: "Validation or runtime error")}
...> end)

In this example:

  • The call to GRPC.Stream.map/2 raises an exception for value 2.
  • map_error/3 catches the error and wraps it in a GRPC.RPCError struct with a custom message.
  • The pipeline continues execution, transforming errors into structured responses.

Notes

  • map_error/3 is lazy and only executes when the stream is materialized (via GRPC.Stream.run/1 or GRPC.Stream.run_with/3).

  • Use this operator to implement robust error recovery, input validation, or to normalize exceptions from downstream Flow stages into well-defined gRPC errors.

map_with_context(stream, mapper)

@spec map_with_context(t(), (map(), term() -> term())) :: t()

Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.

partition(stream, options \\ [])

@spec partition(
  t(),
  keyword()
) :: t()

Partitions the stream to allow grouping of items by key or condition.

Use this before stateful operations such as reduce/3.

Note

Excessive use of partitioning can impact performance and memory usage. Only partition when required for correctness or performance. See https://hexdocs.pm/flow/Flow.html#module-partitioning for more details.

reduce(stream, acc_fun, reducer_fun)

@spec reduce(t(), (-> acc), (term(), acc -> acc)) :: t() when acc: term()

Reduces items in the stream using an accumulator.

Parameters

  • acc_fun initializes the accumulator,
  • reducer_fun updates it for each input.

Note

See https://hexdocs.pm/flow/Flow.html#reduce/3 for more details.

to_flow(stream)

@spec to_flow(t()) :: Flow.t()

Extracts the underlying Flow pipeline from a GRPC.Stream.

Raises an ArgumentError if the Flow has not been initialized.

Returns

A Flow pipeline.

uniq(stream)

@spec uniq(t()) :: t()

Emits only distinct items from the stream. See uniq_by/2 for more information.

uniq_by(stream, fun)

@spec uniq_by(t(), (term() -> term())) :: t()

Emits only unique items as determined by the result of the given function.

Note

This function requires care when used for unbounded flows. For more information see https://hexdocs.pm/flow/Flow.html#uniq_by/2

Functions: Actions

ask(stream, target, timeout \\ 5000)

@spec ask(t(), pid() | atom(), non_neg_integer()) ::
  t() | {:error, :timeout | :process_not_alive}

Sends a request to an external process and awaits a response.

If target is a PID, a message in the format {:request, item, from} is sent, and a reply in the format {:response, msg} is expected.

If target is an atom we will try to locate the process through Process.whereis/1.

Parameters

  • stream: The GRPC.Stream pipeline.
  • target: Target process PID or atom name.
  • timeout: Timeout in milliseconds (defaults to 5000).

ask!(stream, target, timeout \\ 5000)

@spec ask!(t(), pid() | atom(), non_neg_integer()) :: t()

Same as ask/3, but raises an exception on failure.

Caution

This version propagates errors via raised exceptions, which can crash the Flow worker and halt the pipeline. Prefer ask/3 for production usage unless failure should abort the stream.

effect(stream, effect_fun)

@spec effect(t(), (term() -> any())) :: t()

Applies a side-effect function to each element of the stream without altering its values.

The effect/2 function is useful for performing imperative or external actions (such as logging, sending messages, collecting metrics, or debugging) while preserving the original stream data.

It behaves like Enum.each/2, but returns the stream itself so it can continue in the pipeline.

Examples

iex> parent = self()
iex> stream =
...>   GRPC.Stream.from([1, 2, 3])
...>   |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end)
...>   |> GRPC.Stream.to_flow()
...>   |> Enum.to_list()
iex> assert_receive {:seen, 2}
iex> assert_receive {:seen, 4}
iex> assert_receive {:seen, 6}
iex> stream
[1, 2, 3]

In this example, the effect/2 function sends a message to the current process for each element in the stream, but the resulting stream values remain unchanged.

Parameters

  • stream — The input GRPC.Stream.
  • effect_fun — A function that receives each item and performs a side effect (e.g. IO.inspect/1, Logger.info/1, send/2, etc.).

Notes

  • This function is lazy — the effect_fun will only run once the stream is materialized (e.g. via GRPC.Stream.run/1 or GRPC.Stream.run_with/3).
  • The use of effect/2 ensures that the original item is returned unchanged, enabling seamless continuation of the pipeline.

Types

item()

@type item() :: any()

reason()

@type reason() :: any()

t()

@type t() :: %GRPC.Stream{flow: Flow.t(), metadata: map(), options: Keyword.t()}

Functions

get_headers(stream)

@spec get_headers(GRPC.Server.Stream.t()) :: map()

Retrieves HTTP/2 headers from a GRPC.Server.Stream.

Client Note

To receive headers on the client side, use the :return_headers option. See GRPC.Stub.