PhoenixMicro's RPC system provides synchronous request/reply over any transport. No HTTP, no gRPC — the same message broker you use for pub/sub handles RPC.

How it works

  1. Caller generates a correlation_id and a unique reply_to inbox topic
  2. Caller publishes the request to the target topic with reply_to and correlation_id set
  3. Handler consumer publishes a response to message.reply_to
  4. Caller's RPC GenServer matches the correlation_id and returns {:ok, result}
  5. No reply within timeout{:error, :timeout}

Making RPC calls

# Topic form — simplest
{:ok, result} = PhoenixMicro.rpc("math.sum", [1, 2, 3])

# Service + pattern form
{:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3])

# With options
{:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3],
  timeout: 5_000,  # ms (default: 5_000)
  retry:   2       # retries on timeout (default: 0)
)

# Handle all cases
case PhoenixMicro.rpc("payments", "charge", %{amount: 100}) do
  {:ok, charge}          -> process(charge)
  {:error, :timeout}     -> handle_timeout()
  {:error, reason}       -> handle_error(reason)
end

Implementing an RPC handler

The handler consumer must publish a reply to message.reply_to:

defmodule MyApp.RPC.MathConsumer do
  use PhoenixMicro.Consumer

  topic "math.sum"
  concurrency 20

  @impl PhoenixMicro.Consumer
  def handle(%PhoenixMicro.Message{} = message, _ctx) do
    result = Enum.sum(message.payload)

    if message.reply_to do
      PhoenixMicro.publish(message.reply_to, result,
        headers: %{"x-correlation-id" => message.correlation_id}
      )
    end

    :ok
  end
end

Pattern routing

PhoenixMicro.rpc("service", "pattern", payload) publishes to "service.pattern". Route multiple patterns in a single consumer with wildcards:

defmodule MyApp.RPC.MathRouter do
  use PhoenixMicro.Consumer

  topic "math.*"       # handles math.sum, math.multiply, math.max, etc.
  concurrency 20

  @impl PhoenixMicro.Consumer
  def handle(%PhoenixMicro.Message{} = message, _ctx) do
    pattern = Map.get(message.headers, "x-pattern", "")
    result  = dispatch(pattern, message.payload)

    if message.reply_to do
      PhoenixMicro.publish(message.reply_to, result,
        headers: %{"x-correlation-id" => message.correlation_id}
      )
    end

    :ok
  end

  defp dispatch("sum",      nums),  do: Enum.sum(nums)
  defp dispatch("multiply", nums),  do: Enum.reduce(nums, 1, &*/2)
  defp dispatch("max",      nums),  do: Enum.max(nums)
  defp dispatch(unknown, _payload), do: {:error, "unknown pattern: #{unknown}"}
end

Timeouts and retries

# Default: 5 second timeout, no retry
PhoenixMicro.rpc("slow.service", payload)

# Tight timeout for fast SLA-bound services
PhoenixMicro.rpc("inventory.check", payload, timeout: 500)

# Retry on timeout — for idempotent operations only
PhoenixMicro.rpc("idempotent.lookup", payload, timeout: 2_000, retry: 3)

Warning: Only use retry for truly idempotent operations. If the handler already processed the request but the reply was lost in transit, retrying will process it again.

Circuit-breaking RPC callers

Wrap RPC calls in a circuit breaker to prevent timeout cascades:

alias PhoenixMicro.Middleware.CircuitBreaker

case CircuitBreaker.call("payments_rpc_fuse", fn ->
  PhoenixMicro.rpc("payments", "charge", payload, timeout: 2_000)
end, threshold: 5, reset_timeout_ms: 30_000) do
  {:ok, charge}           -> charge
  {:error, :circuit_open} -> {:error, :payments_unavailable}
  {:error, :timeout}      -> {:error, :payments_slow}
  {:error, reason}        -> {:error, reason}
end

Telemetry events

EventMeasurementsMetadata
[:phoenix_micro, :rpc, :request]%{count: 1}%{topic: t, correlation_id: id}
[:phoenix_micro, :rpc, :response]%{duration: ns}%{topic: t, correlation_id: id}
[:phoenix_micro, :rpc, :timeout]%{count: 1}%{topic: t, correlation_id: id}

Testing RPC

# config/test.exs
config :phoenix_micro, transport: :memory, consumers: []

defmodule MyApp.RPC.MathTest do
  use ExUnit.Case, async: false

  alias PhoenixMicro.Transport.Memory

  setup do
    Memory.clear()

    # Register a fake RPC handler
    Memory.subscribe("math.sum", fn msg ->
      if msg.reply_to do
        PhoenixMicro.publish(msg.reply_to, Enum.sum(msg.payload),
          headers: %{"x-correlation-id" => msg.correlation_id}
        )
      end
      :ok
    end, [])

    :ok
  end

  test "returns sum of numbers" do
    assert {:ok, 6} = PhoenixMicro.rpc("math.sum", [1, 2, 3], timeout: 1_000)
  end

  test "returns timeout when no handler replies" do
    assert {:error, :timeout} = PhoenixMicro.rpc("ghost.service", [], timeout: 100)
  end
end

Best practices

  • Keep RPC calls under 1 second. Blocking the calling process for longer creates backpressure and head-of-line blocking.
  • Prefer events for workflows. Use RPC for lookups (user profile, inventory check) and use pub/sub + sagas for multi-step business operations.
  • Set realistic timeouts. The default 5 s is conservative. Most services should respond in under 100 ms. Match the timeout to your P99 latency + buffer.
  • Never retry mutations via RPC. Only retry reads or truly idempotent writes.