Stream Operators

View Source

GRPC.Stream provides a functional and composable API for building fault-tolerant pipelines for unary and streaming gRPC calls in Elixir.

This cheatsheet summarizes the main operators available today.

Creating Streams

unary/1

Wraps a single request into a stream (unary RPC type).

iex> GRPC.Stream.unary(request)

from/1

Creates a stream from an stream RPC.

iex> GRPC.Stream.from(request)

Transforming Streams

map/2

Transforms each element in the stream.

iex> stream |> GRPC.Stream.map(&process/1)

flat_map/2

Transforms each element in the stream.

iex> GRPC.Stream.from([1, 2])
iex> |> GRPC.Stream.flat_map(&[&1, &1])
iex> |> GRPC.Stream.to_flow()
iex> |> Enum.to_list()

ask/3

Performs an external call using a Materializer.

iex> pid =
iex> spawn(fn ->
iex>   receive do
iex>     {:request, :hello, test_pid} ->
iex>       send(test_pid, {:response, :world})
iex>   end
iex> end)
iex> 
iex> GRPC.Stream.from([:hello])
iex> |> GRPC.Stream.ask(pid)
iex> |> GRPC.Stream.to_flow()
iex> |> Enum.to_list()

Filtering, grouping, and reduce Streams

filter/2

Filters the stream using the given predicate function.

iex> GRPC.Stream.from([1, 2, 3, 4])
iex> |> GRPC.Stream.filter(&(rem(&1, 2) == 0))
iex> |> GRPC.Stream.to_flow()
iex> |> Enum.to_list()

Effects

effect/2

Applies a side-effect function to each element of the stream without altering its values.

iex> parent = self()
iex> stream =
...>   GRPC.Stream.from(request)
...>   |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end)
...>   |> GRPC.Stream.to_flow()
...>   |> Enum.to_list()
iex> flush()
iex> stream
[1, 2, 3]

Error Handling

map_error/2

Intercepts and transforms errors & exceptions.

iex> GRPC.Stream.from([1, 2])
iex> |> GRPC.Stream.map(fn
iex>   2 -> raise "boom"
iex>   x -> x
iex> end)
iex> |> GRPC.Stream.map_error(fn
iex>   {:error, %RuntimeError{message: "boom"}} ->
iex>     GRPC.RPCError.exception(message: "Validation or runtime error")
iex> 
iex>   msg ->
iex>     msg
iex> end)

Running Streams

run/1

Executes the pipeline for unary RPC.

iex> stream |> GRPC.Stream.run()

run_with/2

Executes the pipeline for stream RPC's.

iex> stream |> GRPC.Stream.run_with(materializer)