PhoenixMicro (PhoenixMicro v1.0.0)

Copy Markdown View Source

PhoenixMicro — a production-grade microservices toolkit for Elixir/Phoenix.

Phoenix Microservices, built natively for OTP and the BEAM VM.

Quick start

# 1. Configure in config/config.exs
config :phoenix_micro,
  transport: :rabbitmq,
  consumers: [MyApp.Payments.CreatedConsumer],
  transports: [
    rabbitmq: [url: "amqp://localhost"]
  ]

# 2. Define a consumer
defmodule MyApp.Payments.CreatedConsumer do
  use PhoenixMicro.Consumer

  topic "payments.created"
  concurrency 5
  retry max_attempts: 3

  @impl PhoenixMicro.Consumer
  def handle(message, _ctx) do
    Logger.info("Received payload: #{inspect(message.payload)}")
    :ok
  end
end

# 3. Publish from anywhere
PhoenixMicro.publish("payments.created", %{amount: 100, currency: "USD"})

# 4. RPC — two calling conventions
{:ok, result} = PhoenixMicro.rpc("math.sum", [1, 2, 3])
{:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3])

Architecture

PhoenixMicro.Application
   PhoenixMicro.Supervisor (one_for_one)
         Registry
         Transport.Memory
         Transport.* (configured)
         Producer
         RPC
         ConsumerManager (DynamicSupervisor)
               Pipeline (Broadway) per consumer

Summary

Functions

Returns all currently running consumer modules.

Stops a running consumer.

Publishes a message to topic asynchronously (fire-and-forget).

Publishes a batch of [{topic, payload}] tuples.

Publishes to topic synchronously. Returns :ok or {:error, reason}.

Dynamically registers and starts a consumer module.

Synchronous RPC call. Supports two signatures

4-argument RPC: rpc(service, pattern, payload, opts).

Returns the active transport module.

Functions

consumers()

@spec consumers() :: [module()]

Returns all currently running consumer modules.

deregister_consumer(module)

@spec deregister_consumer(module()) :: :ok | {:error, :not_found}

Stops a running consumer.

publish(topic, payload, opts \\ [])

@spec publish(String.t(), term(), keyword()) :: :ok

Publishes a message to topic asynchronously (fire-and-forget).

Options: :transport, :headers, :correlation_id.

publish_batch(messages, opts \\ [])

@spec publish_batch(
  [{String.t(), term()}],
  keyword()
) :: :ok

Publishes a batch of [{topic, payload}] tuples.

publish_sync(topic, payload, opts \\ [])

@spec publish_sync(String.t(), term(), keyword()) :: :ok | {:error, term()}

Publishes to topic synchronously. Returns :ok or {:error, reason}.

register_consumer(module)

@spec register_consumer(module()) :: {:ok, pid()} | {:error, term()}

Dynamically registers and starts a consumer module.

rpc(topic, payload, opts \\ [])

@spec rpc(String.t(), term(), keyword() | term()) :: {:ok, term()} | {:error, term()}

Synchronous RPC call. Supports two signatures:

# topic + payload form
{:ok, result} = PhoenixMicro.rpc("math.sum", [1, 2, 3])

# service + pattern + payload form (spec-compliant)
{:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3])

# with options
{:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3], timeout: 3_000)

Options: :timeout (ms, default 5000), :retry (attempts, default 0).

rpc(service, pattern, payload, opts)

@spec rpc(String.t(), String.t(), term(), keyword()) ::
  {:ok, term()} | {:error, term()}

4-argument RPC: rpc(service, pattern, payload, opts).

{:ok, result} = PhoenixMicro.rpc("math", "sum", [1, 2, 3], timeout: 3_000)

transport()

@spec transport() :: module()

Returns the active transport module.