# `Pulsar.Broker`
[🔗](https://github.com/efcasado/pulsar-elixir/blob/main/lib/pulsar/broker.ex#L1)

Unified Pulsar broker connection process.

This module combines:
- TCP connection management with reconnection logic
- Protocol handshake and authentication
- Service discovery functionality
- Consumer and producer registration and message routing
- Request/response correlation

Uses gen_statem for robust state management with states:
- :disconnected - Not connected to broker
- :connected - Connected and authenticated, ready for operations

Consumer and producer processes are monitored by this broker process
for automatic cleanup when they exit.

# `t`

```elixir
@type t() :: %Pulsar.Broker{
  actions: list(),
  auth: list(),
  buffer: binary(),
  conn_timeout: integer(),
  consumers: %{required(integer()) =&gt; {pid(), reference()}},
  host: String.t(),
  name: String.t(),
  pending_bytes: integer(),
  port: integer(),
  prev_backoff: integer(),
  producers: %{required(integer()) =&gt; {pid(), reference()}},
  requests: %{required(integer()) =&gt; {GenServer.from(), integer()}},
  socket: :gen_tcp.socket() | :ssl.sslsocket() | nil,
  socket_module: :gen_tcp | :ssl,
  socket_opts: list()
}
```

# `connected`

# `disconnected`

# `get_consumers`

```elixir
@spec get_consumers(
  GenServer.server() | String.t(),
  keyword()
) :: %{required(integer()) =&gt; pid()}
```

Gets the list of registered consumers.

Accepts either a broker PID or a broker URL string.
When passing a broker URL, you can optionally specify the client via the `:client` option.

# `get_producers`

```elixir
@spec get_producers(
  GenServer.server() | String.t(),
  keyword()
) :: %{required(integer()) =&gt; pid()}
```

Gets the list of registered producers.

Accepts either a broker PID or a broker URL string.
When passing a broker URL, you can optionally specify the client via the `:client` option.

# `lookup_topic`

```elixir
@spec lookup_topic(GenServer.server(), String.t(), boolean(), timeout()) ::
  {:ok, map()} | {:error, term()}
```

Service discovery: lookup topic.

# `partitioned_topic_metadata`

```elixir
@spec partitioned_topic_metadata(GenServer.server(), String.t(), timeout()) ::
  {:ok, map()} | {:error, term()}
```

Service discovery: get partitioned topic metadata.

# `publish_message`

```elixir
@spec publish_message(GenServer.server(), binary()) :: :ok | {:error, term()}
```

Publishes a message to the broker.
It expects the message to be already encoded in the Pulsar binary protocol format.

# `register_consumer`

```elixir
@spec register_consumer(GenServer.server(), integer(), pid()) :: :ok
```

Registers a consumer with this broker and monitors the process.

# `register_producer`

```elixir
@spec register_producer(GenServer.server(), integer(), pid()) :: :ok
```

Registers a producer with this broker and monitors the process.

# `send_command`

```elixir
@spec send_command(
  GenServer.server(),
  struct()
) :: :ok | {:error, term()}
```

Sends a command to the broker without expecting a response.

# `send_request`

```elixir
@spec send_request(GenServer.server(), struct(), timeout()) ::
  {:ok, term()} | {:error, term()}
```

Sends a command to the broker and expects a response.

# `start_link`

```elixir
@spec start_link(
  String.t(),
  keyword()
) :: {:ok, pid()} | :ignore | {:error, term()}
```

Starts a broker connection process.

The target Pulsar broker is expected to be specified in the form of: `<scheme>://<host>[:<port>]`,
where `scheme` can be either `pulsar` or `pulsar+ssl` and `port` is an optional field that
defaults to `6650` and `6651`, respectively.

# `stop`

```elixir
@spec stop(GenServer.server(), term(), timeout()) :: :ok
```

Gracefully stops the broker by closing all consumers/producers first.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
