PhoenixMicro's RPC system provides synchronous request/reply over any transport. No HTTP, no gRPC — the same message broker you use for pub/sub handles RPC.
How it works
- Caller generates a
correlation_idand a uniquereply_toinbox topic - Caller publishes the request to the target topic with
reply_toandcorrelation_idset - Handler consumer publishes a response to
message.reply_to - Caller's RPC GenServer matches the
correlation_idand returns{:ok, result} - No reply within
timeout→{:error, :timeout}
Making RPC calls
# Topic form — simplest
{:ok, result} = PhoenixMicro.rpc("math.sum", [1, 2, 3])
# Service + pattern form
{:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3])
# With options
{:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3],
timeout: 5_000, # ms (default: 5_000)
retry: 2 # retries on timeout (default: 0)
)
# Handle all cases
case PhoenixMicro.rpc("payments", "charge", %{amount: 100}) do
{:ok, charge} -> process(charge)
{:error, :timeout} -> handle_timeout()
{:error, reason} -> handle_error(reason)
endImplementing an RPC handler
The handler consumer must publish a reply to message.reply_to:
defmodule MyApp.RPC.MathConsumer do
use PhoenixMicro.Consumer
topic "math.sum"
concurrency 20
@impl PhoenixMicro.Consumer
def handle(%PhoenixMicro.Message{} = message, _ctx) do
result = Enum.sum(message.payload)
if message.reply_to do
PhoenixMicro.publish(message.reply_to, result,
headers: %{"x-correlation-id" => message.correlation_id}
)
end
:ok
end
endPattern routing
PhoenixMicro.rpc("service", "pattern", payload) publishes to "service.pattern".
Route multiple patterns in a single consumer with wildcards:
defmodule MyApp.RPC.MathRouter do
use PhoenixMicro.Consumer
topic "math.*" # handles math.sum, math.multiply, math.max, etc.
concurrency 20
@impl PhoenixMicro.Consumer
def handle(%PhoenixMicro.Message{} = message, _ctx) do
pattern = Map.get(message.headers, "x-pattern", "")
result = dispatch(pattern, message.payload)
if message.reply_to do
PhoenixMicro.publish(message.reply_to, result,
headers: %{"x-correlation-id" => message.correlation_id}
)
end
:ok
end
defp dispatch("sum", nums), do: Enum.sum(nums)
defp dispatch("multiply", nums), do: Enum.reduce(nums, 1, &*/2)
defp dispatch("max", nums), do: Enum.max(nums)
defp dispatch(unknown, _payload), do: {:error, "unknown pattern: #{unknown}"}
endTimeouts and retries
# Default: 5 second timeout, no retry
PhoenixMicro.rpc("slow.service", payload)
# Tight timeout for fast SLA-bound services
PhoenixMicro.rpc("inventory.check", payload, timeout: 500)
# Retry on timeout — for idempotent operations only
PhoenixMicro.rpc("idempotent.lookup", payload, timeout: 2_000, retry: 3)Warning: Only use
retryfor truly idempotent operations. If the handler already processed the request but the reply was lost in transit, retrying will process it again.
Circuit-breaking RPC callers
Wrap RPC calls in a circuit breaker to prevent timeout cascades:
alias PhoenixMicro.Middleware.CircuitBreaker
case CircuitBreaker.call("payments_rpc_fuse", fn ->
PhoenixMicro.rpc("payments", "charge", payload, timeout: 2_000)
end, threshold: 5, reset_timeout_ms: 30_000) do
{:ok, charge} -> charge
{:error, :circuit_open} -> {:error, :payments_unavailable}
{:error, :timeout} -> {:error, :payments_slow}
{:error, reason} -> {:error, reason}
endTelemetry events
| Event | Measurements | Metadata |
|---|---|---|
[:phoenix_micro, :rpc, :request] | %{count: 1} | %{topic: t, correlation_id: id} |
[:phoenix_micro, :rpc, :response] | %{duration: ns} | %{topic: t, correlation_id: id} |
[:phoenix_micro, :rpc, :timeout] | %{count: 1} | %{topic: t, correlation_id: id} |
Testing RPC
# config/test.exs
config :phoenix_micro, transport: :memory, consumers: []
defmodule MyApp.RPC.MathTest do
use ExUnit.Case, async: false
alias PhoenixMicro.Transport.Memory
setup do
Memory.clear()
# Register a fake RPC handler
Memory.subscribe("math.sum", fn msg ->
if msg.reply_to do
PhoenixMicro.publish(msg.reply_to, Enum.sum(msg.payload),
headers: %{"x-correlation-id" => msg.correlation_id}
)
end
:ok
end, [])
:ok
end
test "returns sum of numbers" do
assert {:ok, 6} = PhoenixMicro.rpc("math.sum", [1, 2, 3], timeout: 1_000)
end
test "returns timeout when no handler replies" do
assert {:error, :timeout} = PhoenixMicro.rpc("ghost.service", [], timeout: 100)
end
endBest practices
- Keep RPC calls under 1 second. Blocking the calling process for longer creates backpressure and head-of-line blocking.
- Prefer events for workflows. Use RPC for lookups (user profile, inventory check) and use pub/sub + sagas for multi-step business operations.
- Set realistic timeouts. The default 5 s is conservative. Most services should respond in under 100 ms. Match the timeout to your P99 latency + buffer.
- Never retry mutations via RPC. Only retry reads or truly idempotent writes.