View Source GRPC.Stream (grpc v0.11.2)
Provides a Flow-based abstraction layer for building gRPC streaming pipelines in Elixir.
This module allows you to consume gRPC request streams as Flow pipelines with support for
backpressure via GenStage. You can also produce gRPC responses by materializing a Flow
back into the gRPC stream.
Capabilities
- Transforms an incoming gRPC request stream into a 
Flowwith backpressure. - Emits messages back into the gRPC response stream using 
run_with/3. - Supports joining with external producers (e.g., RabbitMQ, Kafka) for unbounded or fan-in stream sources.
 - Offers composable functional operators (
map/2,filter/2,flat_map/2, etc.) on the stream. 
Example: Bidirectional Streaming
defmodule MyGRPCService do
  use GRPC.Server, service: MyService.Service
  def route_chat(input, materializer) do
    GRPC.Stream.from(input, max_demand: 10)
    |> GRPC.Stream.map(fn note -> process_note(note) end)
    |> GRPC.Stream.run_with(materializer)
  end
  defp process_note(note), do: %Response{message: "Received"}
endExample: Joining with an External Producer
When integrating with external unbounded sources (e.g., message queues),
you can pass a running GenStage producer using the :join_with option:
defmodule MyGRPCService do
  use GRPC.Server, service: MyService.Service
  def stream_events(input, materializer) do
    {:ok, pid} = MyApp.RabbitMQ.Producer.start_link([])
    GRPC.Stream.from(input, join_with: pid, max_demand: 10)
    |> GRPC.Stream.map(&handle_event/1)
    |> GRPC.Stream.run_with(materializer)
  end
  defp handle_event({_, msg}), do: msg
  defp handle_event(event), do: %MyGRPC.Event{data: inspect(event)}
end
  Summary
Functions
Sends a request to an external process and awaits a response.
Same as ask/3, but raises an exception on failure.
Filters the stream using the given predicate function.
Applies a function to each entry and concatenates the resulting lists.
Converts a gRPC input into a Flow pipeline with backpressure support.
Retrieves HTTP/2 headers from a GRPC.Server.Stream.
Applies a function to each stream item.
Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.
Partitions the stream to allow grouping of items by key or condition.
Reduces items in the stream using an accumulator.
Executes the underlying Flow for unary streams and emits responses into the provided gRPC server stream.
Executes the flow and emits responses into the provided gRPC server stream.
Extracts the underlying Flow pipeline from a GRPC.Stream.
Converts a single gRPC request into a Flow pipeline with support for backpressure.
This is useful for unary gRPC requests where you want to use the Flow API.
Emits only distinct items from the stream. See uniq_by/2 for more information.
Emits only unique items as determined by the result of the given function.
Types
Functions
Sends a request to an external process and awaits a response.
If target is a PID, a message in the format {:request, item, from} is sent, and a reply
in the format {:response, msg} is expected.
If target is an atom we will try to locate the process through Process.whereis/1.
Parameters
stream: TheGRPC.Streampipeline.target: Target process PID or atom name.timeout: Timeout in milliseconds (defaults to5000).
Returns
- Updated stream if successful.
 {:error, item, reason}if the request fails or times out.
@spec ask!(t(), pid() | atom(), non_neg_integer()) :: t()
Same as ask/3, but raises an exception on failure.
Caution
This version propagates errors via raised exceptions, which can crash the Flow worker and halt the pipeline.
Prefer ask/3 for production usage unless failure should abort the stream.
Filters the stream using the given predicate function.
The filter function is applied concurrently to the stream entries, so it shouldn't rely on execution order.
@spec flat_map(t(), (term() -> Enumerable.t())) :: t()
Applies a function to each entry and concatenates the resulting lists.
Useful for emitting multiple messages for each input.
Converts a gRPC input into a Flow pipeline with backpressure support.
Parameters
input: A gRPC request stream (struct, enumerable, or ElixirStream).
Options
:join_with— An optional externalGenStageproducer to merge with the gRPC input.:dispatcher— Specifies theFlowdispatcher (defaults toGenStage.DemandDispatcher).:propagate_context- Iftrue, the context from thematerializeris propagated to theFlow.:materializer- The%GRPC.Server.Stream{}struct representing the current gRPC stream context.
And any other options supported by Flow.
Returns
  A GRPC.Stream struct that represents the transformed stream.
Example
flow = GRPC.Stream.from(request, max_demand: 50)
  @spec get_headers(GRPC.Server.Stream.t()) :: map()
Retrieves HTTP/2 headers from a GRPC.Server.Stream.
Client Note
To receive headers on the client side, use the :return_headers option. See GRPC.Stub.
Applies a function to each stream item.
Applies a transformation function to each stream item, passing the context as an additional argument. This is useful for operations that require access to the stream's headers.
Partitions the stream to allow grouping of items by key or condition.
Use this before stateful operations such as reduce/3.
Note
Excessive use of partitioning can impact performance and memory usage. Only partition when required for correctness or performance. See https://hexdocs.pm/flow/Flow.html#module-partitioning for more details.
Reduces items in the stream using an accumulator.
Parameters
acc_funinitializes the accumulator,reducer_funupdates it for each input.
Note
See https://hexdocs.pm/flow/Flow.html#reduce/3 for more details.
Executes the underlying Flow for unary streams and emits responses into the provided gRPC server stream.
Parameters
flow: AGRPC.Streamstruct containing the flow to be executed.stream: AGRPC.Server.Streamto which responses are sent.:dry_run— Iftrue, responses are not sent (used for testing or inspection).
Example
GRPC.Stream.run(request)
  Executes the flow and emits responses into the provided gRPC server stream.
Parameters
flow: AGRPC.Streamstruct containing the flow to be executed.stream: AGRPC.Server.Streamto which responses are sent.
Options
:dry_run— Iftrue, responses are not sent (used for testing or inspection).
Returns
:okif the stream was processed successfully.
Example
GRPC.Stream.run_with(request, mat)
  Extracts the underlying Flow pipeline from a GRPC.Stream.
Raises an ArgumentError if the Flow has not been initialized.
Returns
  A Flow pipeline.
Converts a single gRPC request into a Flow pipeline with support for backpressure.
This is useful for unary gRPC requests where you want to use the Flow API.
Parameters
input: The single gRPC message to convert into a Flow.
Options
:join_with- An optional additional producer stage PID to include in the Flow.:dispatcher- An optionalGenStagedispatcher to use in the underlyingFlow. Defaults toGenStage.DemandDispatcher.:propagate_context- Iftrue, the context from thematerializeris propagated to theFlow.:materializer- The%GRPC.Server.Stream{}struct representing the current gRPC stream context.
And any other options supported by Flow.
Returns
- A 
GRPCStreamthat emits the single gRPC message under demand. 
Example
flow = GRPCStream.single(request, max_demand: 5)
  Emits only distinct items from the stream. See uniq_by/2 for more information.
Emits only unique items as determined by the result of the given function.
Note
This function requires care when used for unbounded flows. For more information see https://hexdocs.pm/flow/Flow.html#uniq_by/2