Unified Pulsar broker connection process.
This module combines:
- TCP connection management with reconnection logic
- Protocol handshake and authentication
- Service discovery functionality
- Consumer and producer registration and message routing
- Request/response correlation
Uses gen_statem for robust state management with states:
- :disconnected - Not connected to broker
- :connected - Connected and authenticated, ready for operations
Consumer and producer processes are monitored by this broker process for automatic cleanup when they exit.
Summary
Functions
Gets the list of registered consumers.
Gets the list of registered producers.
Service discovery: lookup topic.
Service discovery: get partitioned topic metadata.
Publishes a message to the broker. It expects the message to be already encoded in the Pulsar binary protocol format.
Registers a consumer with this broker and monitors the process.
Registers a producer with this broker and monitors the process.
Sends a command to the broker without expecting a response.
Sends a command to the broker and expects a response.
Starts a broker connection process.
Gracefully stops the broker by closing all consumers/producers first.
Types
@type t() :: %Pulsar.Broker{ actions: list(), auth: list(), buffer: binary(), conn_timeout: integer(), consumers: %{required(integer()) => {pid(), reference()}}, host: String.t(), name: String.t(), pending_bytes: integer(), port: integer(), prev_backoff: integer(), producers: %{required(integer()) => {pid(), reference()}}, requests: %{required(integer()) => {GenServer.from(), integer()}}, socket: :gen_tcp.socket() | :ssl.sslsocket() | nil, socket_module: :gen_tcp | :ssl, socket_opts: list() }
Functions
@spec get_consumers( GenServer.server() | String.t(), keyword() ) :: %{required(integer()) => pid()}
Gets the list of registered consumers.
Accepts either a broker PID or a broker URL string.
When passing a broker URL, you can optionally specify the client via the :client option.
@spec get_producers( GenServer.server() | String.t(), keyword() ) :: %{required(integer()) => pid()}
Gets the list of registered producers.
Accepts either a broker PID or a broker URL string.
When passing a broker URL, you can optionally specify the client via the :client option.
@spec lookup_topic(GenServer.server(), String.t(), boolean(), timeout()) :: {:ok, map()} | {:error, term()}
Service discovery: lookup topic.
@spec partitioned_topic_metadata(GenServer.server(), String.t(), timeout()) :: {:ok, map()} | {:error, term()}
Service discovery: get partitioned topic metadata.
@spec publish_message(GenServer.server(), binary()) :: :ok | {:error, term()}
Publishes a message to the broker. It expects the message to be already encoded in the Pulsar binary protocol format.
@spec register_consumer(GenServer.server(), integer(), pid()) :: :ok
Registers a consumer with this broker and monitors the process.
@spec register_producer(GenServer.server(), integer(), pid()) :: :ok
Registers a producer with this broker and monitors the process.
@spec send_command( GenServer.server(), struct() ) :: :ok | {:error, term()}
Sends a command to the broker without expecting a response.
@spec send_request(GenServer.server(), struct(), timeout()) :: {:ok, term()} | {:error, term()}
Sends a command to the broker and expects a response.
Starts a broker connection process.
The target Pulsar broker is expected to be specified in the form of: <scheme>://<host>[:<port>],
where scheme can be either pulsar or pulsar+ssl and port is an optional field that
defaults to 6650 and 6651, respectively.
@spec stop(GenServer.server(), term(), timeout()) :: :ok
Gracefully stops the broker by closing all consumers/producers first.