Pulsar.Client (Pulsar v2.8.6)

Copy Markdown View Source

A client represents an isolated Pulsar connection context.

Each client maintains:

  • Separate broker connections
  • Independent consumer/producer registries
  • Isolated broker configuration

Usage

Single Client (Implicit)

When using Pulsar.start/1, a default client is automatically created:

# config.exs
config :pulsar,
  host: "pulsar://localhost:6650",
  consumers: [...]

# Uses implicit :default client
{:ok, consumer} = Pulsar.start_consumer(topic, subscription, MyCallback)

Multiple Clients (Explicit)

You can start multiple clients in your supervision tree:

children = [
  {Pulsar.Client, name: :analytics_client, host: "pulsar://analytics:6650"},
  {Pulsar.Client, name: :events_client, host: "pulsar://events:6650"}
]

# Explicit client usage
{:ok, consumer} = Pulsar.start_consumer(
  topic, subscription, MyCallback,
  client: :analytics_client
)

Summary

Functions

Returns a specification to start this module under a supervisor.

Looks up an existing broker connection by broker URL.

Returns a random broker process from the specified client's broker supervisor.

Starts a broker connection.

Starts a client with the given options.

Stops a client and all its resources gracefully.

Stops a broker connection by broker URL.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

lookup_broker(broker_url, opts \\ [])

@spec lookup_broker(
  String.t(),
  keyword()
) :: {:ok, pid()} | {:error, :not_found}

Looks up an existing broker connection by broker URL.

Returns {:ok, broker_pid} if found, {:error, :not_found} otherwise.

random_broker(client_name \\ :default)

@spec random_broker(atom()) :: pid() | nil

Returns a random broker process from the specified client's broker supervisor.

Defaults to the :default client if no client is specified.

This is useful for operations that need any broker from a client (e.g., service discovery).

start_broker(broker_url, opts \\ [])

@spec start_broker(
  String.t(),
  keyword()
) :: {:ok, pid()} | {:error, term()}

Starts a broker connection.

If a broker for the given URL already exists, returns the existing broker. Otherwise, starts a new broker connection with the provided options.

Returns {:ok, broker_pid} if successful, {:error, reason} otherwise.

start_link(opts)

Starts a client with the given options.

Options

  • :name - Required. The name of the client (atom)
  • :host - Required. Bootstrap broker URL
  • :auth - Optional. Authentication configuration
  • :conn_timeout - Optional. Connection timeout (default: 5000)
  • :socket_opts - Optional. Socket options

stop(client_name, opts \\ [])

@spec stop(
  atom(),
  keyword()
) :: :ok

Stops a client and all its resources gracefully.

This stops all producers, consumers, brokers, and the client supervisor.

Options

  • :timeout - Maximum time to wait for shutdown (default: 5000ms)

Examples

Pulsar.Client.stop(:my_client)

stop_broker(broker_url, opts \\ [])

@spec stop_broker(
  String.t(),
  keyword()
) :: :ok | {:error, :not_found}

Stops a broker connection by broker URL.