Pulsar consumer process that communicates with broker processes.
This consumer uses service discovery to find the appropriate broker for the topic and then communicates with that broker process.
Callback Module
The consumer requires a callback module that uses Pulsar.Consumer.Callback,
providing stateful message processing capabilities.
To create a callback module:
defmodule MyApp.MessageHandler do
use Pulsar.Consumer.Callback
def handle_message(%Pulsar.Message{payload: payload}, state) do
# Process the message
IO.inspect(payload)
{:ok, state}
end
endSee Pulsar.Consumer.Callback for detailed documentation and examples.
Summary
Functions
Manually acknowledges one or more messages.
Returns a specification to start this module under a supervisor.
Manually negatively acknowledges one or more messages.
Sends a flow command to request more messages from the broker.
Starts a consumer process with explicit parameters.
Gracefully stops a consumer process.
Types
@type t() :: %Pulsar.Consumer{ broker_monitor: reference(), broker_pid: pid(), callback_module: module(), callback_state: term(), chunk_cleanup_interval: term(), chunked_message_contexts: %{ optional(String.t()) => Pulsar.Consumer.ChunkedMessageContext.t() }, client: term(), consumer_id: integer(), consumer_name: String.t() | nil, dead_letter_producer_pid: pid() | nil, dead_letter_topic: String.t() | nil, durable: term(), expire_incomplete_chunked_message_after: non_neg_integer(), flow_initial: non_neg_integer(), flow_outstanding_permits: non_neg_integer(), flow_refill: non_neg_integer(), flow_threshold: non_neg_integer(), force_create_topic: boolean(), initial_position: atom(), max_pending_chunked_messages: non_neg_integer(), max_redelivery: non_neg_integer() | nil, nacked_messages: MapSet.t(), read_compacted: term(), redelivery_interval: non_neg_integer() | nil, schema: Pulsar.Schema.t() | nil, schema_version: binary() | nil, start_message_id: {non_neg_integer(), non_neg_integer()}, start_timestamp: non_neg_integer(), subscription_name: String.t(), subscription_type: String.t(), topic: String.t() }
Functions
@spec ack( pid(), Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t() | [Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()] ) :: :ok | {:error, term()}
Manually acknowledges one or more messages.
Use this when your callback returns {:noreply, state} to manually control acknowledgment.
Supports batching multiple message IDs in a single ACK command for better performance.
Parameters
consumer- The consumer process PIDmessage_ids- A single message ID or a list of message IDs to acknowledge
Examples
# Acknowledge a single message
def handle_message({command, _metadata, _payload, _broker_metadata}, state) do
message_id = command.message_id
# Process message...
spawn(fn ->
# Do async processing
Pulsar.Consumer.ack(consumer_pid, message_id)
end)
{:noreply, state}
end
# Acknowledge multiple messages in batch (more efficient)
Pulsar.Consumer.ack(consumer_pid, [message_id1, message_id2, message_id3])
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec nack( pid(), Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t() | [Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()] ) :: :ok | {:error, term()}
Manually negatively acknowledges one or more messages.
Use this when your callback returns {:noreply, state} to manually control acknowledgment.
Supports batching multiple message IDs in a single NACK for better performance.
The messages will be tracked for redelivery if :redelivery_interval is configured.
When the messages are redelivered and the redelivery count exceeds :max_redelivery,
they will automatically be sent to the dead letter queue (if :dead_letter_policy is configured),
regardless of whether you use manual or automatic acknowledgment.
Parameters
consumer- The consumer process PIDmessage_ids- A single message ID or a list of message IDs to negatively acknowledge
Examples
# NACK a single message
def handle_message({command, _metadata, _payload, _broker_metadata}, state) do
message_id = command.message_id
case process_message() do
:ok -> Pulsar.Consumer.ack(self(), message_id)
{:error, _reason} -> Pulsar.Consumer.nack(self(), message_id)
end
{:noreply, state}
end
# NACK multiple messages in batch (more efficient)
Pulsar.Consumer.nack(consumer_pid, [message_id1, message_id2, message_id3])
@spec send_flow(pid(), non_neg_integer()) :: :ok | {:error, term()}
Sends a flow command to request more messages from the broker.
Use this function when you've disabled automatic flow control by setting
:flow_initial to 0. This allows you to implement custom flow control,
such as integrating with Broadway's demand mechanism.
Parameters
consumer- The consumer process PIDpermits- Number of message permits to request
Examples
# Start consumer with no automatic flow control
{:ok, consumer_pid} = Pulsar.start_consumer(
topic,
subscription,
MyCallback,
flow_initial: 0 # Disable automatic flow
)
# Manually request messages based on your own demand
Pulsar.Consumer.send_flow(consumer_pid, 10)
# Example with Broadway demand
def handle_demand(demand, state) do
Pulsar.Consumer.send_flow(state.consumer, demand)
# ... rest of logic
end
Starts a consumer process with explicit parameters.
Parameters
topic- The topic to subscribe tosubscription_name- Name of the subscriptionsubscription_type- Type of subscription (e.g., :Exclusive, :Shared)callback_module- Module that usesPulsar.Consumer.Callbackopts- Additional options::init_args- Arguments passed to callback module's init/1 function:flow_initial- Initial flow permits (default: 100). Set to 0 to disable automatic flow control and usesend_flow/2manually.:flow_threshold- Flow permits threshold for automatic refill (default: 50). Ignored when:flow_initialis 0.:flow_refill- Flow permits refill amount (default: 50). Ignored when:flow_initialis 0.:initial_position- Initial position for subscription (:latestor:earliest, defaults to:latest):read_compacted- If true, only reads non-compacted messages from compacted topics (default: false):name- Name for the consumer on the remote broker (default: nil, no name). If provided, will be visible in broker's consumer list.:redelivery_interval- Interval in milliseconds for redelivering NACKed messages (default: nil, disabled)
:dead_letter_policy- Dead letter policy configuration (default: nil, disabled)::max_redelivery- Maximum number of redeliveries before sending to dead letter topic (must be >= 1):topic- Dead letter topic (optional, defaults to<topic>-<subscription>-DLQ):max_pending_chunked_messages- Maximum number of concurrent chunked messages to buffer (default: 10):expire_incomplete_chunked_message_after- Timeout in milliseconds for incomplete chunked messages (default: 60_000):chunk_cleanup_interval- Interval in milliseconds for checking expired chunked messages (default: 30_000):startup_delay_ms- Fixed startup delay in milliseconds before consumer initialization (default: 1000, matches broker conn_timeout):startup_jitter_ms- Maximum random startup delay in milliseconds to avoid thundering herd (default: 1000)
The total startup delay is startup_delay_ms + random(0, startup_jitter_ms), applied on every consumer start/restart.
The default startup_delay_ms matches the broker's conn_timeout to ensure the broker has time to reconnect
before consumers start requesting topic lookups.
The consumer will automatically use any available broker for service discovery.
@spec stop(GenServer.server(), term(), timeout()) :: :ok
Gracefully stops a consumer process.