The behaviour that every transport adapter must implement.
A transport is an OTP process (GenServer or similar) that:
- Maintains a connection to the external broker.
- Publishes messages to topics/queues/subjects.
- Subscribes consumers to topics and delivers messages.
- Acknowledges or negatively-acknowledges messages.
- Handles reconnection transparently.
Implementing a custom transport
defmodule MyApp.Transport.SQS do
@behaviour PhoenixMicro.Transport
use GenServer
@impl PhoenixMicro.Transport
def connect(config) do
# Return {:ok, state} where state is whatever your GenServer needs
end
# ... implement all callbacks
endRegister it in config:
config :phoenix_micro,
transport: MyApp.Transport.SQS,
transports: [MyApp.Transport.SQS: [region: "us-east-1", ...]]
Summary
Callbacks
Acknowledges successful processing of a message.
Establishes a connection to the broker.
Called during transport process init/1.
Must return {:ok, state} where state is carried in the GenServer state.
Cleanly closes the connection to the broker.
Negatively acknowledges a message, indicating processing failure.
reason is used for logging and DLQ routing.
Publishes a message to the given topic.
opts may include :headers, :partition_key, :priority, etc.
Returns the current connection status.
Subscribes to the given topic pattern and delivers messages to handler.
Returns {:ok, subscription_ref} where the ref can be used to unsubscribe.
Unsubscribes a previously established subscription.
Functions
Dispatches message through the middleware chain and into the handler.
Used internally by transport implementations to invoke consumer handlers.
Types
Callbacks
@callback ack(PhoenixMicro.Message.t(), state()) :: :ok
Acknowledges successful processing of a message.
Establishes a connection to the broker.
Called during transport process init/1.
Must return {:ok, state} where state is carried in the GenServer state.
@callback disconnect(state()) :: :ok
Cleanly closes the connection to the broker.
@callback nack(PhoenixMicro.Message.t(), reason :: term(), state()) :: :ok
Negatively acknowledges a message, indicating processing failure.
reason is used for logging and DLQ routing.
@callback publish(topic(), PhoenixMicro.Message.t(), publish_opts()) :: :ok | {:error, term()}
Publishes a message to the given topic.
opts may include :headers, :partition_key, :priority, etc.
@callback status(state()) :: :connected | :disconnected | :reconnecting
Returns the current connection status.
@callback subscribe(topic(), handler(), subscription_opts()) :: {:ok, reference()} | {:error, term()}
Subscribes to the given topic pattern and delivers messages to handler.
Returns {:ok, subscription_ref} where the ref can be used to unsubscribe.
Unsubscribes a previously established subscription.
Functions
@spec dispatch(PhoenixMicro.Message.t(), handler(), [module()]) :: :ok | {:error, term()}
Dispatches message through the middleware chain and into the handler.
Used internally by transport implementations to invoke consumer handlers.