Pulsar.Broker (Pulsar v2.8.11)

Copy Markdown View Source

Unified Pulsar broker connection process.

This module combines:

  • TCP connection management with reconnection logic
  • Protocol handshake and authentication
  • Service discovery functionality
  • Consumer and producer registration and message routing
  • Request/response correlation

Uses gen_statem for robust state management with states:

  • :disconnected - Not connected to broker
  • :connected - Connected and authenticated, ready for operations

Consumer and producer processes are monitored by this broker process for automatic cleanup when they exit.

Summary

Functions

Gets the list of registered consumers.

Gets the list of registered producers.

Service discovery: get partitioned topic metadata.

Publishes a message to the broker. It expects the message to be already encoded in the Pulsar binary protocol format.

Registers a consumer with this broker and monitors the process.

Registers a producer with this broker and monitors the process.

Sends a command to the broker without expecting a response.

Sends a command to the broker and expects a response.

Starts a broker connection process.

Gracefully stops the broker by closing all consumers/producers first.

Types

t()

@type t() :: %Pulsar.Broker{
  actions: list(),
  auth: list(),
  buffer: binary(),
  conn_timeout: integer(),
  consumers: %{required(integer()) => {pid(), reference()}},
  host: String.t(),
  name: String.t(),
  pending_bytes: integer(),
  port: integer(),
  prev_backoff: integer(),
  producers: %{required(integer()) => {pid(), reference()}},
  requests: %{required(integer()) => {GenServer.from(), integer()}},
  socket: :gen_tcp.socket() | :ssl.sslsocket() | nil,
  socket_module: :gen_tcp | :ssl,
  socket_opts: list()
}

Functions

connected(arg1, request, broker)

disconnected(event_type, event_data, broker)

get_consumers(broker, opts \\ [])

@spec get_consumers(
  GenServer.server() | String.t(),
  keyword()
) :: %{required(integer()) => pid()}

Gets the list of registered consumers.

Accepts either a broker PID or a broker URL string. When passing a broker URL, you can optionally specify the client via the :client option.

get_producers(broker, opts \\ [])

@spec get_producers(
  GenServer.server() | String.t(),
  keyword()
) :: %{required(integer()) => pid()}

Gets the list of registered producers.

Accepts either a broker PID or a broker URL string. When passing a broker URL, you can optionally specify the client via the :client option.

lookup_topic(broker, topic, authoritative \\ false, timeout \\ 5000)

@spec lookup_topic(GenServer.server(), String.t(), boolean(), timeout()) ::
  {:ok, map()} | {:error, term()}

Service discovery: lookup topic.

partitioned_topic_metadata(broker, topic, timeout \\ 5000)

@spec partitioned_topic_metadata(GenServer.server(), String.t(), timeout()) ::
  {:ok, map()} | {:error, term()}

Service discovery: get partitioned topic metadata.

publish_message(broker, encoded_message)

@spec publish_message(GenServer.server(), binary()) :: :ok | {:error, term()}

Publishes a message to the broker. It expects the message to be already encoded in the Pulsar binary protocol format.

register_consumer(broker, consumer_id, consumer_pid)

@spec register_consumer(GenServer.server(), integer(), pid()) :: :ok

Registers a consumer with this broker and monitors the process.

register_producer(broker, producer_id, producer_pid)

@spec register_producer(GenServer.server(), integer(), pid()) :: :ok

Registers a producer with this broker and monitors the process.

send_command(broker, command)

@spec send_command(
  GenServer.server(),
  struct()
) :: :ok | {:error, term()}

Sends a command to the broker without expecting a response.

send_request(broker, command, timeout \\ 5000)

@spec send_request(GenServer.server(), struct(), timeout()) ::
  {:ok, term()} | {:error, term()}

Sends a command to the broker and expects a response.

start_link(broker_url, opts \\ [])

@spec start_link(
  String.t(),
  keyword()
) :: {:ok, pid()} | :ignore | {:error, term()}

Starts a broker connection process.

The target Pulsar broker is expected to be specified in the form of: <scheme>://<host>[:<port>], where scheme can be either pulsar or pulsar+ssl and port is an optional field that defaults to 6650 and 6651, respectively.

stop(broker, reason \\ :normal, timeout \\ :infinity)

@spec stop(GenServer.server(), term(), timeout()) :: :ok

Gracefully stops the broker by closing all consumers/producers first.