PhoenixMicro.Transport behaviour (PhoenixMicro v1.0.0)

Copy Markdown View Source

The behaviour that every transport adapter must implement.

A transport is an OTP process (GenServer or similar) that:

  1. Maintains a connection to the external broker.
  2. Publishes messages to topics/queues/subjects.
  3. Subscribes consumers to topics and delivers messages.
  4. Acknowledges or negatively-acknowledges messages.
  5. Handles reconnection transparently.

Implementing a custom transport

defmodule MyApp.Transport.SQS do
  @behaviour PhoenixMicro.Transport
  use GenServer

  @impl PhoenixMicro.Transport
  def connect(config) do
    # Return {:ok, state} where state is whatever your GenServer needs
  end

  # ... implement all callbacks
end

Register it in config:

config :phoenix_micro,
  transport: MyApp.Transport.SQS,
  transports: [MyApp.Transport.SQS: [region: "us-east-1", ...]]

Summary

Callbacks

Acknowledges successful processing of a message.

Establishes a connection to the broker. Called during transport process init/1. Must return {:ok, state} where state is carried in the GenServer state.

Cleanly closes the connection to the broker.

Negatively acknowledges a message, indicating processing failure. reason is used for logging and DLQ routing.

Publishes a message to the given topic. opts may include :headers, :partition_key, :priority, etc.

Returns the current connection status.

Subscribes to the given topic pattern and delivers messages to handler. Returns {:ok, subscription_ref} where the ref can be used to unsubscribe.

Unsubscribes a previously established subscription.

Functions

Dispatches message through the middleware chain and into the handler. Used internally by transport implementations to invoke consumer handlers.

Types

config()

@type config() :: keyword()

handler()

@type handler() :: (PhoenixMicro.Message.t() -> :ok | {:error, term()})

publish_opts()

@type publish_opts() :: keyword()

state()

@type state() :: term()

subscription_opts()

@type subscription_opts() :: keyword()

topic()

@type topic() :: String.t()

Callbacks

ack(t, state)

@callback ack(PhoenixMicro.Message.t(), state()) :: :ok

Acknowledges successful processing of a message.

connect(config)

@callback connect(config()) :: {:ok, state()} | {:error, term()}

Establishes a connection to the broker. Called during transport process init/1. Must return {:ok, state} where state is carried in the GenServer state.

disconnect(state)

@callback disconnect(state()) :: :ok

Cleanly closes the connection to the broker.

nack(t, reason, state)

@callback nack(PhoenixMicro.Message.t(), reason :: term(), state()) :: :ok

Negatively acknowledges a message, indicating processing failure. reason is used for logging and DLQ routing.

publish(topic, t, publish_opts)

@callback publish(topic(), PhoenixMicro.Message.t(), publish_opts()) ::
  :ok | {:error, term()}

Publishes a message to the given topic. opts may include :headers, :partition_key, :priority, etc.

status(state)

(optional)
@callback status(state()) :: :connected | :disconnected | :reconnecting

Returns the current connection status.

subscribe(topic, handler, subscription_opts)

@callback subscribe(topic(), handler(), subscription_opts()) ::
  {:ok, reference()} | {:error, term()}

Subscribes to the given topic pattern and delivers messages to handler. Returns {:ok, subscription_ref} where the ref can be used to unsubscribe.

unsubscribe(reference, state)

(optional)
@callback unsubscribe(reference(), state()) :: :ok

Unsubscribes a previously established subscription.

Functions

dispatch(message, handler, middlewares \\ [])

@spec dispatch(PhoenixMicro.Message.t(), handler(), [module()]) ::
  :ok | {:error, term()}

Dispatches message through the middleware chain and into the handler. Used internally by transport implementations to invoke consumer handlers.