View Source GRPC.Stream (grpc v0.10.2)

Provides a Flow-based abstraction layer for building gRPC streaming pipelines in Elixir.

This module allows you to consume gRPC request streams as Flow pipelines with support for backpressure via GenStage. You can also produce gRPC responses by materializing a Flow back into the gRPC stream.

Capabilities

  • Transforms an incoming gRPC request stream into a Flow with backpressure.
  • Emits messages back into the gRPC response stream using run_with/3.
  • Supports joining with external producers (e.g., RabbitMQ, Kafka) for unbounded or fan-in stream sources.
  • Offers composable functional operators (map/2, filter/2, flat_map/2, etc.) on the stream.

Example: Bidirectional Streaming

defmodule MyGRPCService do
  use GRPC.Server, service: MyService.Service

  def route_chat(input, materializer) do
    GRPC.Stream.from(input, max_demand: 10)
    |> GRPC.Stream.map(fn note -> process_note(note) end)
    |> GRPC.Stream.run_with(materializer)
  end

  defp process_note(note), do: %Response{message: "Received"}
end

Example: Joining with an External Producer

When integrating with external unbounded sources (e.g., message queues), you can pass a running GenStage producer using the :join_with option:

defmodule MyGRPCService do
  use GRPC.Server, service: MyService.Service

  def stream_events(input, materializer) do
    {:ok, pid} = MyApp.RabbitMQ.Producer.start_link([])

    GRPC.Stream.from(input, join_with: pid, max_demand: 10)
    |> GRPC.Stream.map(&handle_event/1)
    |> GRPC.Stream.run_with(materializer)
  end

  defp handle_event({_, msg}), do: msg
  defp handle_event(event), do: %MyGRPC.Event{data: inspect(event)}
end

Summary

Functions

Sends a request to an external process and awaits a response.

Same as ask/3, but raises an exception on failure.

Filters the stream using the given predicate function.

Applies a function to each entry and concatenates the resulting lists.

Converts a gRPC input into a Flow pipeline with backpressure support.

Retrieves HTTP/2 headers from a GRPC.Server.Stream.

Applies a function to each stream item.

Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.

Partitions the stream to allow grouping of items by key or condition.

Reduces items in the stream using an accumulator.

Executes the underlying Flow for unary streams and emits responses into the provided gRPC server stream.

Executes the flow and emits responses into the provided gRPC server stream.

Extracts the underlying Flow pipeline from a GRPC.Stream.

Converts a single gRPC request into a Flow pipeline with support for backpressure. This is useful for unary gRPC requests where you want to use the Flow API.

Emits only distinct items from the stream. See uniq_by/2 for more information.

Emits only unique items as determined by the result of the given function.

Types

@type item() :: any()
@type reason() :: any()
@type t() :: %GRPC.Stream{flow: Flow.t(), metadata: map(), options: Keyword.t()}

Functions

Link to this function

ask(stream, target, timeout \\ 5000)

View Source
@spec ask(t(), pid() | atom(), non_neg_integer()) :: t() | {:error, item(), reason()}

Sends a request to an external process and awaits a response.

If target is a PID, a message in the format {:request, item, from} is sent, and a reply in the format {:response, msg} is expected.

If target is an atom we will try to locate the process through Process.whereis/1.

Parameters

  • stream: The GRPC.Stream pipeline.
  • target: Target process PID or atom name.
  • timeout: Timeout in milliseconds (defaults to 5000).

Returns

  • Updated stream if successful.
  • {:error, item, reason} if the request fails or times out.
Link to this function

ask!(stream, target, timeout \\ 5000)

View Source
@spec ask!(t(), pid() | atom(), non_neg_integer()) :: t()

Same as ask/3, but raises an exception on failure.

Caution

This version propagates errors via raised exceptions, which can crash the Flow worker and halt the pipeline. Prefer ask/3 for production usage unless failure should abort the stream.

@spec filter(t(), (term() -> term())) :: t()

Filters the stream using the given predicate function.

The filter function is applied concurrently to the stream entries, so it shouldn't rely on execution order.

Link to this function

flat_map(stream, flat_mapper)

View Source
@spec flat_map(t(), (term() -> Enumerable.t())) :: t()

Applies a function to each entry and concatenates the resulting lists.

Useful for emitting multiple messages for each input.

@spec from(any(), Keyword.t()) :: t()

Converts a gRPC input into a Flow pipeline with backpressure support.

Parameters

  • input: A gRPC request stream (struct, enumerable, or Elixir Stream).

Options

  • :join_with — An optional external GenStage producer to merge with the gRPC input.
  • :dispatcher — Specifies the Flow dispatcher (defaults to GenStage.DemandDispatcher).
  • :propagate_context - If true, the context from the materializer is propagated to the Flow.
  • :materializer - The %GRPC.Server.Stream{} struct representing the current gRPC stream context.

And any other options supported by Flow.

Returns

A GRPC.Stream struct that represents the transformed stream.

Example

flow = GRPC.Stream.from(request, max_demand: 50)
@spec get_headers(GRPC.Server.Stream.t()) :: map()

Retrieves HTTP/2 headers from a GRPC.Server.Stream.

Client Note

To receive headers on the client side, use the :return_headers option. See GRPC.Stub.

@spec map(t(), (term() -> term())) :: t()

Applies a function to each stream item.

Link to this function

map_with_context(stream, mapper)

View Source
@spec map_with_context(t(), (map(), term() -> term())) :: t()

Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.

Link to this function

partition(stream, options \\ [])

View Source
@spec partition(
  t(),
  keyword()
) :: t()

Partitions the stream to allow grouping of items by key or condition.

Use this before stateful operations such as reduce/3.

Note

Excessive use of partitioning can impact performance and memory usage. Only partition when required for correctness or performance. See https://hexdocs.pm/flow/Flow.html#module-partitioning for more details.

Link to this function

reduce(stream, acc_fun, reducer_fun)

View Source
@spec reduce(t(), (-> acc), (term(), acc -> acc)) :: t() when acc: term()

Reduces items in the stream using an accumulator.

Parameters

  • acc_fun initializes the accumulator,
  • reducer_fun updates it for each input.

Note

See https://hexdocs.pm/flow/Flow.html#reduce/3 for more details.

@spec run(t()) :: any()

Executes the underlying Flow for unary streams and emits responses into the provided gRPC server stream.

Parameters

  • flow: A GRPC.Stream struct containing the flow to be executed.
  • stream: A GRPC.Server.Stream to which responses are sent.
  • :dry_run — If true, responses are not sent (used for testing or inspection).

Example

GRPC.Stream.run(request)
Link to this function

run_with(stream, from, opts \\ [])

View Source
@spec run_with(t(), Stream.t(), Keyword.t()) :: :ok

Executes the flow and emits responses into the provided gRPC server stream.

Parameters

Options

  • :dry_run — If true, responses are not sent (used for testing or inspection).

Returns

  • :ok if the stream was processed successfully.

Example

GRPC.Stream.run_with(request, mat)
@spec to_flow(t()) :: Flow.t()

Extracts the underlying Flow pipeline from a GRPC.Stream.

Raises an ArgumentError if the Flow has not been initialized.

Returns

A Flow pipeline.

Link to this function

unary(input, opts \\ [])

View Source
@spec unary(any(), Keyword.t()) :: t()

Converts a single gRPC request into a Flow pipeline with support for backpressure. This is useful for unary gRPC requests where you want to use the Flow API.

Parameters

  • input: The single gRPC message to convert into a Flow.

Options

  • :join_with - An optional additional producer stage PID to include in the Flow.
  • :dispatcher - An optional GenStage dispatcher to use in the underlying Flow. Defaults to GenStage.DemandDispatcher.
  • :propagate_context - If true, the context from the materializer is propagated to the Flow.
  • :materializer - The %GRPC.Server.Stream{} struct representing the current gRPC stream context.

And any other options supported by Flow.

Returns

  • A GRPCStream that emits the single gRPC message under demand.

Example

flow = GRPCStream.single(request, max_demand: 5)
@spec uniq(t()) :: t()

Emits only distinct items from the stream. See uniq_by/2 for more information.

@spec uniq_by(t(), (term() -> term())) :: t()

Emits only unique items as determined by the result of the given function.

Note

This function requires care when used for unbounded flows. For more information see https://hexdocs.pm/flow/Flow.html#uniq_by/2