Pulsar client for Elixir.
This module provides a high-level API for interacting with the Elixir client for Apache Pulsar.
Architecture
The library supports multi-client architecture, allowing you to connect to multiple Pulsar clusters simultaneously. Each client maintains its own set of registries and supervisors for brokers, consumers, and producers.
Supervision Tree
Pulsar.Supervisor
└── Client (:default)
├── Registries
│ ├── BrokerRegistry
│ ├── ConsumerRegistry
│ └── ProducerRegistry
│
├── ProducerEpochStore (ETS)
│
├── BrokerSupervisor
│ ├── Broker 1
│ │ ├── monitors: C1, C2, DLQ-P1, P1
│ └── Broker 2
│ ├── monitors: C3, C4
│
├── ConsumerSupervisor
│ ├── ConsumerGroup: my-topic
│ │ └── C1 (with DLQ policy)
│ │ └── DLQ-P1 (linked process)
│ │
│ └── PartitionedConsumer: my-partitioned-topic
│ ├── ConsumerGroup partition-0
│ │ └── C2
│ ├── ConsumerGroup partition-1
│ │ └── C3
│ └── ConsumerGroup partition-2
│ └── C4
│
└── ProducerSupervisor
├── ProducerGroup: my-topic
│ └── P1
│
└── PartitionedProducer: my-partitioned-topic
├── ProducerGroup partition-0
│ └── P2
├── ProducerGroup partition-1
│ └── P3
└── ProducerGroup partition-2
└── P4Consumer/Producer Architecture
Both consumers and producers are managed through supervised processes:
Consumers:
- Regular topics: Consumer groups with configurable process count
- Partitioned topics: PartitionedConsumer supervisor managing consumer groups per partition
- The
start_consumer/4function returns a single PID that can be registered with a name, regardless of partitioning or process count
Producers:
- Regular topics: Producer groups with configurable process count
- Partitioned topics: PartitionedProducer supervisor managing producer groups per partition
- The
start_producer/2function returns a single PID that can be registered with a name, regardless of partitioning or process count
Examples
# Start a broker connection (optional - automatically started by clients)
{:ok, broker_pid} = Pulsar.start_broker("pulsar://other-broker:6650")
# Start a producer
{:ok, producer_pid} = Pulsar.start_producer(
"persistent://public/default/my-topic",
name: :my_producer
)
# Send a message
{:ok, message_id} = Pulsar.send(:my_producer, "Hello, Pulsar!")
# Start a producer for a partitioned topic (single PID returned)
{:ok, producer_pid} = Pulsar.start_producer(
"persistent://public/default/my-partitioned-topic",
name: :my_partitioned_producer
)
# Messages are automatically routed to partitions
{:ok, message_id} = Pulsar.send(:my_partitioned_producer, "Partitioned message!")
# Start a consumer for a regular topic
{:ok, consumer_pid} = Pulsar.start_consumer(
"persistent://public/default/my-topic",
"my-subscription",
MyApp.MessageHandler,
subscription_type: :Exclusive
)
# Start a consumer for a partitioned topic (single PID returned)
{:ok, consumer_pid} = Pulsar.start_consumer(
"persistent://public/default/my-partitioned-topic",
"my-subscription",
MyApp.MessageHandler,
subscription_type: :Shared,
consumer_count: 2 # 2 consumers per partition
)
# Register consumer/producer with custom name
{:ok, consumer_pid} = Pulsar.start_consumer(
"persistent://public/default/my-topic",
"my-subscription",
MyApp.MessageHandler,
name: MyApp.MyConsumer
)
# Stop consumer/producer
Pulsar.stop_consumer(consumer_pid)
Pulsar.stop_producer(producer_pid)
# Or stop by name
Pulsar.stop_consumer(:my_consumer)
Pulsar.stop_producer(:my_producer)
# Lookup by name
{:ok, consumer_pid} = Pulsar.lookup_consumer("my-topic-my-subscription")
{:ok, producer_pid} = Pulsar.lookup_producer(:my_producer)
Summary
Functions
Manually acknowledges one or more messages.
Gets all consumer processes managed by a consumer manager.
Gets all producer processes managed by a producer group.
Looks up an existing broker connection by broker URL.
Looks up a consumer by name.
Looks up a producer group by name.
Manually negatively acknowledges one or more messages.
Sends a message synchronously using a producer group.
Sends a flow command to request more messages from a consumer.
Start the Pulsar application with custom configuration.
Starts a broker connection.
Starts a consumer for a topic (regular or partitioned).
Starts a producer for the given topic.
Stops a broker connection by broker URL.
Stops a consumer (regular or partitioned).
Stops a producer group.
Functions
@spec ack( pid() | String.t(), Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t() | [Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()], keyword() ) :: :ok | {:error, term()}
Manually acknowledges one or more messages.
This is a convenience wrapper around Pulsar.Consumer.ack/2.
Use this when your callback returns {:noreply, state} to manually control acknowledgment.
Supports batching multiple message IDs in a single ACK command for better performance.
Parameters
consumer- The consumer process PID or namemessage_ids- A single message ID or a list of message IDs to acknowledge
Examples
# Acknowledge a single message
Pulsar.ack(consumer_pid, message_id)
# Acknowledge multiple messages in batch (more efficient)
Pulsar.ack(consumer_pid, [message_id1, message_id2, message_id3])
# Can also be used with message structures for Broadway
def ack(consumer, messages) do
message_ids = Enum.map(messages, & &1.message_id)
Pulsar.ack(consumer, message_ids)
end
Gets all consumer processes managed by a consumer manager.
Works with both ConsumerGroup and PartitionedConsumer supervisors. Returns a flat list of consumer process PIDs.
Gets all producer processes managed by a producer group.
Returns a list of producer process PIDs.
Examples
iex> {:ok, group_pid} = Pulsar.start_producer("my-topic", producer_count: 3)
iex> Pulsar.get_producers(group_pid)
[#PID<0.123.0>, #PID<0.124.0>, #PID<0.125.0>]
# By name
iex> Pulsar.get_producers("my-topic-producer")
[#PID<0.123.0>, #PID<0.124.0>, #PID<0.125.0>]
Looks up an existing broker connection by broker URL.
Delegates to Pulsar.Client.lookup_broker/2.
Looks up a consumer by name.
Returns {:ok, consumer_pid} if found, {:error, :not_found} otherwise.
Examples
iex> {:ok, consumer_pid} = Pulsar.start_consumer("my-topic", "my-subscription", MyHandler, name: "my-consumer")
iex> Pulsar.lookup_consumer("my-consumer")
{:ok, consumer_pid}
iex> Pulsar.lookup_consumer("nonexistent")
{:error, :not_found}
Looks up a producer group by name.
Returns {:ok, producer_pid} if found, {:error, :not_found} otherwise.
Examples
iex> {:ok, producer_pid} = Pulsar.start_producer("my-topic", name: "my-producer")
iex> Pulsar.lookup_producer("my-producer")
{:ok, producer_pid}
iex> Pulsar.lookup_producer("nonexistent")
{:error, :not_found}
@spec nack( pid() | String.t(), Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t() | [Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()], keyword() ) :: :ok | {:error, term()}
Manually negatively acknowledges one or more messages.
This is a convenience wrapper around Pulsar.Consumer.nack/2.
Use this when your callback returns {:noreply, state} to manually control acknowledgment.
Supports batching multiple message IDs in a single NACK for better performance.
The messages will be tracked for redelivery if :redelivery_interval is configured.
When the messages are redelivered and the redelivery count exceeds :max_redelivery,
they will automatically be sent to the dead letter queue (if :dead_letter_policy is configured),
regardless of whether you use manual or automatic acknowledgment.
Parameters
consumer- The consumer process PID or namemessage_ids- A single message ID or a list of message IDs to negatively acknowledge
Examples
# NACK a single message
Pulsar.nack(consumer_pid, message_id)
# NACK multiple messages in batch (more efficient)
Pulsar.nack(consumer_pid, [message_id1, message_id2, message_id3])
# Can also be used with message structures for Broadway
def nack(consumer, messages) do
message_ids = Enum.map(messages, & &1.message_id)
Pulsar.nack(consumer, message_ids)
end
@spec send(String.t() | pid(), binary(), keyword()) :: {:ok, Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()} | {:error, term()}
Sends a message synchronously using a producer group.
The producer group must be started first using start_producer/2.
Parameters
producer_group_name- Name of the producer group (from start_producer's :name option, or default "topic-producer")message- Binary message payloadopts- Optional parameters::timeout- Timeout in milliseconds (default: 5000):partition_key- Partition routing key (string):ordering_key- Key for ordering in Key_Shared subscriptions (binary):properties- Custom message metadata as a map (e.g.,%{"trace_id" => "abc"}):event_time- Application event timestamp (DateTime or milliseconds since epoch):deliver_at_time- Absolute delayed delivery time (DateTime or milliseconds since epoch):deliver_after- Relative delayed delivery in milliseconds from now
Return Values
Returns {:ok, message_id_data} on success or {:error, reason} on failure.
Examples
# Basic send
Pulsar.send(producer, "payload")
# With message key for partitioning
Pulsar.send(producer, "payload", key: "user-123")
# With custom properties
Pulsar.send(producer, "payload", properties: %{"trace_id" => "abc"})
# With delayed delivery (60 seconds from now)
Pulsar.send(producer, "payload", deliver_after: 60_000)
@spec send_flow(pid() | String.t(), non_neg_integer()) :: :ok | {:error, term()}
Sends a flow command to request more messages from a consumer.
This is a convenience wrapper around Pulsar.Consumer.send_flow/2.
Use this when you've disabled automatic flow control by setting :flow_initial to 0.
Parameters
consumer- The consumer process PID or namepermits- Number of message permits to request
Examples
# Start consumer with manual flow control
{:ok, consumer} = Pulsar.start_consumer(
topic,
subscription,
MyCallback,
flow_initial: 0
)
# Request 10 messages
Pulsar.send_flow(consumer, 10)
Start the Pulsar application with custom configuration.
Examples
# Start with custom configuration (single client)
{:ok, pid} = Pulsar.start(
host: "pulsar://localhost:6650",
consumers: [
{:my_consumer, [
topic: "my-topic",
subscription_name: "my-subscription",
callback_module: MyConsumerCallback,
subscription_type: :Shared,
startup_delay_ms: 500,
startup_jitter_ms: 1000
]}
]
)
# Start with multiple clients
{:ok, pid} = Pulsar.start(
clients: [
default: [host: "pulsar://localhost:6650"],
cluster_2: [host: "pulsar://other:6650"]
],
consumers: [
{:consumer1, [client: :default, topic: "topic1", ...]},
{:consumer2, [client: :cluster_2, topic: "topic2", ...]}
]
)
# Later, stop it
:ok = Pulsar.stop(pid)
Starts a broker connection.
Delegates to Pulsar.Client.start_broker/2.
@spec start_consumer(String.t(), String.t(), module(), keyword()) :: {:ok, pid()} | {:error, term()}
Starts a consumer for a topic (regular or partitioned).
This is the primary way to consume messages from Pulsar topics. For regular topics, a single consumer group is created. For partitioned topics, a PartitionedConsumer supervisor is created that manages individual consumer groups for each partition.
Parameters
topic- The topic to subscribe to (regular or partitioned)subscription_name- Name of the subscriptioncallback_module- Module that implementsPulsar.Consumer.Callbackbehaviouropts- Optional parameters::subscription_type- Type of subscription (e.g., :Exclusive, :Shared, :Key_Shared, default: :Shared):name- Custom name for the consumer (default: "topic-subscription_name"):consumer_count- Number of consumer processes per topic/partition (default: 1):init_args- Arguments passed to callback module's init/1 function:flow_initial- Initial flow permits (default: 100):flow_threshold- Flow permits threshold for refill (default: 50):flow_refill- Flow permits refill amount (default: 50):initial_position- Initial position for subscription (:latestor:earliest, defaults to:latest)- Other options passed to ConsumerGroup supervisor
Return Values
- For regular topics: PID of the consumer group supervisor
- For partitioned topics: PID of the PartitionedConsumer supervisor that manages all partition groups
Partitioned Topics
When you subscribe to a partitioned topic, the function automatically:
- Queries the broker for partition metadata
- Creates a PartitionedConsumer supervisor
- The PartitionedConsumer creates separate consumer groups for each partition
- Returns a single PID for the PartitionedConsumer supervisor
Consumer Naming and Registry
All consumers are automatically registered in a registry and can be looked up by name:
- Default naming:
"<topic>-<subscription_name>" - Custom naming: Provided via the
:nameoption - Partitioned topics: Individual partition groups are named
"<base_name>-partition-<index>"
This allows you to manage consumers by name without keeping track of PIDs.
Examples
# Regular topic - returns single PID
iex> {:ok, consumer_pid} = Pulsar.start_consumer(
...> "persistent://public/default/my-topic",
...> "my-subscription",
...> MyApp.MessageHandler,
...> subscription_type: :Exclusive
...> )
{:ok, #PID<0.456.0>}
# Partitioned topic
iex> {:ok, consumer_pid} = Pulsar.start_consumer(
...> "persistent://public/default/my-partitioned-topic",
...> "my-subscription",
...> MyApp.MessageHandler,
...> subscription_type: :Shared
...> )
{:ok, #PID<0.456.0>}
# Register with custom name
iex> {:ok, consumer_pid} = Pulsar.start_consumer(
...> "persistent://public/default/my-partitioned-topic",
...> "my-subscription",
...> MyApp.MessageHandler,
...> subscription_type: :Key_Shared,
...> consumer_count: 2,
...> name: MyApp.MyConsumer
...> )
{:ok, #PID<0.456.0>}
Starts a producer for the given topic.
The broker will assign a unique producer name automatically. Returns a single ProducerGroup supervisor PID that manages one or more producer processes.
Parameters
topic- The topic to publish to (required)opts- Optional parameters::name- Custom name for the producer group (default: "<topic>-producer"):producer_count- Number of producer processes in the group (default: 1):access_mode- Producer access mode (default::Shared). Available modes::Shared- Multiple producers can publish on the topic:Exclusive- Only one producer can publish. Other producers get errors immediately.:WaitForExclusive- Wait for exclusive access if another producer is connected:ExclusiveWithFencing- Immediately remove any existing producer
- Other options passed to individual producer processes
Producer Naming and Registry
All producers are automatically registered in a registry and can be looked up by name:
- Default naming:
"<topic>-producer" - Custom naming: Provided via the
:nameoption
This allows you to manage producers by name without keeping track of PIDs.
Producer Access Modes
Access modes control how many producers can publish to a topic simultaneously:
:Shared(default) - Multiple producers can publish to the same topic:Exclusive- Only one producer can be connected. If another tries to connect, it fails immediately:WaitForExclusive- Waits for exclusive access instead of failing:ExclusiveWithFencing- Takes over by immediately disconnecting the existing producer
Examples
# Start producer with default settings (1 producer, shared mode)
iex> {:ok, producer_pid} = Pulsar.start_producer(
...> "persistent://public/default/my-topic"
...> )
{:ok, #PID<0.789.0>}
# Start producer with exclusive access
iex> {:ok, producer_pid} = Pulsar.start_producer(
...> "persistent://public/default/my-topic",
...> access_mode: :Exclusive
...> )
{:ok, #PID<0.789.0>}
# Register with custom name
iex> {:ok, producer_pid} = Pulsar.start_producer(
...> "persistent://public/default/my-topic",
...> name: "my-producer",
...> )
{:ok, #PID<0.789.0>}
Stops a broker connection by broker URL.
Delegates to Pulsar.Client.stop_broker/2.
Stops a consumer (regular or partitioned).
Accepts either:
- A consumer PID (returned by
start_consumer/1) - A consumer ID string (for programmatic access)
For partitioned topics, this stops the PartitionedConsumer supervisor, which automatically stops all partition consumer groups.
Returns :ok if successful, {:error, :not_found} if the consumer doesn't exist.
Examples
# Stop regular consumer
iex> {:ok, consumer_pid} = Pulsar.start_consumer(...)
iex> Pulsar.stop_consumer(consumer_pid)
:ok
# Stop partitioned consumer (stops all partitions)
iex> {:ok, consumer_pid} = Pulsar.start_consumer(...) # partitioned topic
iex> Pulsar.stop_consumer(consumer_pid)
:ok
# Stop by consumer ID string
iex> Pulsar.stop_consumer("my-topic-my-subscription")
:ok
Stops a producer group.
Accepts either:
- A producer group PID (returned by
start_producer/2) - A producer group ID string (for programmatic access)
This stops the ProducerGroup supervisor, which automatically stops all producer processes.
Returns :ok if successful, {:error, :not_found} if the producer doesn't exist.
Examples
# Stop producer group
iex> {:ok, producer_pid} = Pulsar.start_producer(...)
iex> Pulsar.stop_producer(producer_pid)
:ok
# Stop by producer group ID string
iex> Pulsar.stop_producer("my-topic-producer")
:ok