PhoenixMicro is designed for fast, deterministic tests. The in-memory transport provides the full pub/sub API with no external services, broker setup, or async timing issues.

Setup

# config/test.exs
config :phoenix_micro,
  transport: :memory,
  consumers: []   # register consumers in individual tests, not globally

ExUnit setup pattern

defmodule MyApp.SomeTest do
  use ExUnit.Case, async: false  # async: false when using Memory (shared state)

  alias PhoenixMicro.Transport.Memory

  setup do
    Memory.clear()   # reset published messages and DLQ between tests
    :ok
  end
end

Testing a consumer handler directly

The fastest and most reliable approach — call handle/2 directly, no transport needed:

defmodule MyApp.Payments.CreatedConsumerTest do
  use ExUnit.Case, async: true   # async: true is fine — no shared transport state

  alias PhoenixMicro.Message
  alias MyApp.Payments.CreatedConsumer

  @valid_payload %{
    "payment_id"   => "pay_001",
    "order_id"     => "ord_001",
    "amount_cents" => 4999,
    "currency"     => "USD",
    "user_id"      => "user_001"
  }

  test "handles a valid payment message" do
    msg = Message.new("payments.created", @valid_payload)
    assert :ok = CreatedConsumer.handle(msg, %{})
  end

  test "returns error for missing required fields" do
    msg = Message.new("payments.created", %{"bad" => "data"})
    assert {:error, _reason} = CreatedConsumer.handle(msg, %{})
  end

  test "handles v1 payload migration transparently" do
    v1_payload = %{"payment_id" => "pay_v1", "order_id" => "ord_v1",
                   "amount" => 49.99, "user_id" => "u1"}
    msg = Message.new("payments.created", v1_payload)
    assert :ok = CreatedConsumer.handle(msg, %{})
  end
end

Testing publish → consumer integration

Use Memory.messages/0 to assert what was published:

defmodule MyApp.Payments.IntegrationTest do
  use ExUnit.Case, async: false

  alias PhoenixMicro.Transport.Memory

  setup do
    Memory.clear()
    :ok
  end

  test "publishing a payment triggers an inventory check" do
    PhoenixMicro.publish("payments.created", %{
      "payment_id"   => "pay_001",
      "order_id"     => "ord_001",
      "amount_cents" => 4999,
      "currency"     => "USD",
      "user_id"      => "user_001"
    })

    # wait for async consumer to process and publish downstream
    Memory.wait_for_messages("inventory.check", 1, timeout: 2_000)

    msgs = Memory.messages()
    inv_check = Enum.find(msgs, &(&1.topic == "inventory.check"))
    assert inv_check != nil
    assert inv_check.payload["order_id"] == "ord_001"
  end

  test "failed messages go to DLQ after exhausting retries" do
    PhoenixMicro.publish("payments.created", %{"trigger_failure" => true})
    Memory.wait_for_messages("payments.created.dlq", 1, timeout: 5_000)

    refute Enum.empty?(Memory.dlq_messages())
    [dlq_msg] = Memory.dlq_messages()
    assert dlq_msg.metadata[:original_topic] == "payments.created"
  end
end

Testing sagas

defmodule MyApp.OrderSagaTest do
  use ExUnit.Case, async: true

  alias MyApp.OrderSaga

  @valid %{
    user_id:           "user_001",
    items:             [%{product_id: "p1", quantity: 2, price_cents: 1999}],
    payment_method_id: "pm_test",
    user_email:        "test@example.com"
  }

  test "completes all steps on success" do
    assert {:ok, ctx} = OrderSaga.run(@valid)
    assert is_binary(ctx.order_id)
    assert is_binary(ctx.reservation_id)
    assert is_binary(ctx.charge_id)
  end

  test "compensates when items list is empty" do
    ctx = Map.put(@valid, :items, [])
    assert {:compensated, :validate_order, :no_items} = OrderSaga.run(ctx)
  end

  test "calculates total correctly across items" do
    assert {:ok, ctx} = OrderSaga.run(@valid)
    expected = Enum.reduce(@valid.items, 0, fn i, acc -> acc + i.price_cents * i.quantity end)
    assert ctx.total_cents == expected
  end
end

Testing schemas

defmodule MyApp.Schemas.PaymentCreatedTest do
  use ExUnit.Case, async: true

  alias MyApp.Schemas.PaymentCreated

  @valid %{
    "payment_id"   => "pay_001",
    "order_id"     => "ord_001",
    "amount_cents" => 4999,
    "user_id"      => "user_001"
  }

  test "validates correct payload" do
    assert {:ok, _v} = PaymentCreated.validate(@valid)
  end

  test "applies default currency" do
    {:ok, v} = PaymentCreated.validate(@valid)
    assert v["currency"] == "USD"
  end

  test "fails on missing required fields" do
    assert {:error, errors} = PaymentCreated.validate(%{})
    assert Keyword.has_key?(errors, :payment_id)
  end

  test "migrates v1 (float) to v2 (cents)" do
    v1 = %{"payment_id" => "p", "order_id" => "o", "amount" => 49.99, "user_id" => "u"}
    m  = PaymentCreated.migrate(1, v1)
    assert m["amount_cents"] == 4999
    refute Map.has_key?(m, "amount")
  end
end

Testing RPC

defmodule MyApp.RPC.MathTest do
  use ExUnit.Case, async: false

  alias PhoenixMicro.Transport.Memory

  setup do
    Memory.clear()
    # Stub the RPC handler
    Memory.subscribe("math.sum", fn msg ->
      if msg.reply_to do
        PhoenixMicro.publish(msg.reply_to, Enum.sum(msg.payload),
          headers: %{"x-correlation-id" => msg.correlation_id})
      end
      :ok
    end, [])
    :ok
  end

  test "rpc returns sum" do
    assert {:ok, 6} = PhoenixMicro.rpc("math.sum", [1, 2, 3], timeout: 1_000)
  end

  test "rpc returns timeout when no reply" do
    assert {:error, :timeout} = PhoenixMicro.rpc("ghost", [], timeout: 100)
  end
end

Testing middleware

defmodule MyApp.Middleware.TenantContextTest do
  use ExUnit.Case, async: true

  alias PhoenixMicro.Message
  alias MyApp.Middleware.TenantContext

  test "sets tenant from header" do
    msg = Message.new("topic", %{}, headers: %{"x-tenant-id" => "acme"})
    parent = self()

    TenantContext.call(msg, fn _m ->
      send(parent, {:tenant, TenantContext.current()})
      :ok
    end)

    assert_receive {:tenant, "acme"}
  end

  test "uses default tenant when header absent" do
    msg = Message.new("topic", %{})
    parent = self()

    TenantContext.call(msg, fn _m ->
      send(parent, {:tenant, TenantContext.current()})
      :ok
    end)

    assert_receive {:tenant, "default"}
  end

  test "clears tenant context after call" do
    msg = Message.new("topic", %{}, headers: %{"x-tenant-id" => "acme"})
    TenantContext.call(msg, fn _m -> :ok end)
    assert TenantContext.current() == nil
  end
end

Memory transport helpers reference

alias PhoenixMicro.Transport.Memory

# All messages published in this test session
Memory.messages()
#=> [%PhoenixMicro.Message{topic: "payments.created", ...}, ...]

# Messages that reached the DLQ (after max retries exhausted)
Memory.dlq_messages()

# Reset all state — call in setup
Memory.clear()

# Block until N messages arrive on topic (raises on timeout)
Memory.wait_for_messages("payments.created", 1, timeout: 2_000)
Memory.wait_for_messages("payments.created", 3, timeout: 5_000)

# Subscribe directly (for RPC stubs or custom assertions)
Memory.subscribe("math.sum", fn msg -> :ok end, [])

Using Mox for external dependencies

# test/support/mocks.ex
Mox.defmock(MyApp.PaymentsMock, for: MyApp.Payments.Behaviour)

# test/my_app/payments/created_consumer_test.exs
import Mox

setup :verify_on_exit!

test "calls the payments module with correct args" do
  expect(MyApp.PaymentsMock, :process, fn 4999, "USD" -> {:ok, :processed} end)

  msg = Message.new("payments.created", %{
    "payment_id" => "p", "order_id" => "o",
    "amount_cents" => 4999, "currency" => "USD", "user_id" => "u"
  })

  assert :ok = MyApp.Payments.CreatedConsumer.handle(msg, %{})
end

CI configuration

# .github/workflows/ci.yml
- name: Run tests
  run: mix test
  env:
    # No broker env vars needed — using :memory transport
    MIX_ENV: test

The in-memory transport means CI requires no external services at all.