Stream Operators
View SourceGRPC.Stream provides a functional and composable API for building
fault-tolerant pipelines for unary and streaming gRPC calls in Elixir.
This cheatsheet summarizes the main operators available today.
Creating Streams
unary/1
Wraps a single request into a stream (unary RPC type).
iex> GRPC.Stream.unary(request)from/1
Creates a stream from an stream RPC.
iex> GRPC.Stream.from(request)Transforming Streams
map/2
Transforms each element in the stream.
iex> stream |> GRPC.Stream.map(&process/1)flat_map/2
Transforms each element in the stream.
iex> GRPC.Stream.from([1, 2])
iex> |> GRPC.Stream.flat_map(&[&1, &1])
iex> |> GRPC.Stream.to_flow()
iex> |> Enum.to_list()ask/3
Performs an external call using a Materializer.
iex> pid =
iex> spawn(fn ->
iex> receive do
iex> {:request, :hello, test_pid} ->
iex> send(test_pid, {:response, :world})
iex> end
iex> end)
iex>
iex> GRPC.Stream.from([:hello])
iex> |> GRPC.Stream.ask(pid)
iex> |> GRPC.Stream.to_flow()
iex> |> Enum.to_list()Filtering, grouping, and reduce Streams
filter/2
Filters the stream using the given predicate function.
iex> GRPC.Stream.from([1, 2, 3, 4])
iex> |> GRPC.Stream.filter(&(rem(&1, 2) == 0))
iex> |> GRPC.Stream.to_flow()
iex> |> Enum.to_list()Effects
effect/2
Applies a side-effect function to each element of the stream without altering its values.
iex> parent = self()
iex> stream =
...> GRPC.Stream.from(request)
...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end)
...> |> GRPC.Stream.to_flow()
...> |> Enum.to_list()
iex> flush()
iex> stream
[1, 2, 3]Error Handling
map_error/2
Intercepts and transforms errors & exceptions.
iex> GRPC.Stream.from([1, 2])
iex> |> GRPC.Stream.map(fn
iex> 2 -> raise "boom"
iex> x -> x
iex> end)
iex> |> GRPC.Stream.map_error(fn
iex> {:error, %RuntimeError{message: "boom"}} ->
iex> GRPC.RPCError.exception(message: "Validation or runtime error")
iex>
iex> msg ->
iex> msg
iex> end)Running Streams
run/1
Executes the pipeline for unary RPC.
iex> stream |> GRPC.Stream.run()run_with/2
Executes the pipeline for stream RPC's.
iex> stream |> GRPC.Stream.run_with(materializer)