Pulsar.Consumer (Pulsar v2.8.6)

Copy Markdown View Source

Pulsar consumer process that communicates with broker processes.

This consumer uses service discovery to find the appropriate broker for the topic and then communicates with that broker process.

Callback Module

The consumer requires a callback module that uses Pulsar.Consumer.Callback, providing stateful message processing capabilities.

To create a callback module:

defmodule MyApp.MessageHandler do
  use Pulsar.Consumer.Callback

  def handle_message(%Pulsar.Message{payload: payload}, state) do
    # Process the message
    IO.inspect(payload)
    {:ok, state}
  end
end

See Pulsar.Consumer.Callback for detailed documentation and examples.

Summary

Functions

Manually acknowledges one or more messages.

Returns a specification to start this module under a supervisor.

Manually negatively acknowledges one or more messages.

Sends a flow command to request more messages from the broker.

Gracefully stops a consumer process.

Types

t()

@type t() :: %Pulsar.Consumer{
  broker_monitor: reference(),
  broker_pid: pid(),
  callback_module: module(),
  callback_state: term(),
  chunk_cleanup_interval: term(),
  chunked_message_contexts: %{
    optional(String.t()) => Pulsar.Consumer.ChunkedMessageContext.t()
  },
  client: term(),
  consumer_id: integer(),
  consumer_name: String.t() | nil,
  dead_letter_producer_pid: pid() | nil,
  dead_letter_topic: String.t() | nil,
  durable: term(),
  expire_incomplete_chunked_message_after: non_neg_integer(),
  flow_initial: non_neg_integer(),
  flow_outstanding_permits: non_neg_integer(),
  flow_refill: non_neg_integer(),
  flow_threshold: non_neg_integer(),
  force_create_topic: boolean(),
  initial_position: atom(),
  max_pending_chunked_messages: non_neg_integer(),
  max_redelivery: non_neg_integer() | nil,
  nacked_messages: MapSet.t(),
  read_compacted: term(),
  redelivery_interval: non_neg_integer() | nil,
  schema: Pulsar.Schema.t() | nil,
  schema_version: binary() | nil,
  start_message_id: {non_neg_integer(), non_neg_integer()},
  start_timestamp: non_neg_integer(),
  subscription_name: String.t(),
  subscription_type: String.t(),
  topic: String.t()
}

Functions

ack(consumer, message_ids)

@spec ack(
  pid(),
  Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()
  | [Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()]
) :: :ok | {:error, term()}

Manually acknowledges one or more messages.

Use this when your callback returns {:noreply, state} to manually control acknowledgment. Supports batching multiple message IDs in a single ACK command for better performance.

Parameters

  • consumer - The consumer process PID
  • message_ids - A single message ID or a list of message IDs to acknowledge

Examples

# Acknowledge a single message
def handle_message({command, _metadata, _payload, _broker_metadata}, state) do
  message_id = command.message_id
  # Process message...
  spawn(fn ->
    # Do async processing
    Pulsar.Consumer.ack(consumer_pid, message_id)
  end)
  {:noreply, state}
end

# Acknowledge multiple messages in batch (more efficient)
Pulsar.Consumer.ack(consumer_pid, [message_id1, message_id2, message_id3])

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

nack(consumer, message_ids)

@spec nack(
  pid(),
  Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()
  | [Pulsar.Protocol.Binary.Pulsar.Proto.MessageIdData.t()]
) :: :ok | {:error, term()}

Manually negatively acknowledges one or more messages.

Use this when your callback returns {:noreply, state} to manually control acknowledgment. Supports batching multiple message IDs in a single NACK for better performance.

The messages will be tracked for redelivery if :redelivery_interval is configured. When the messages are redelivered and the redelivery count exceeds :max_redelivery, they will automatically be sent to the dead letter queue (if :dead_letter_policy is configured), regardless of whether you use manual or automatic acknowledgment.

Parameters

  • consumer - The consumer process PID
  • message_ids - A single message ID or a list of message IDs to negatively acknowledge

Examples

# NACK a single message
def handle_message({command, _metadata, _payload, _broker_metadata}, state) do
  message_id = command.message_id
  case process_message() do
    :ok -> Pulsar.Consumer.ack(self(), message_id)
    {:error, _reason} -> Pulsar.Consumer.nack(self(), message_id)
  end
  {:noreply, state}
end

# NACK multiple messages in batch (more efficient)
Pulsar.Consumer.nack(consumer_pid, [message_id1, message_id2, message_id3])

send_flow(consumer, permits)

@spec send_flow(pid(), non_neg_integer()) :: :ok | {:error, term()}

Sends a flow command to request more messages from the broker.

Use this function when you've disabled automatic flow control by setting :flow_initial to 0. This allows you to implement custom flow control, such as integrating with Broadway's demand mechanism.

Parameters

  • consumer - The consumer process PID
  • permits - Number of message permits to request

Examples

# Start consumer with no automatic flow control
{:ok, consumer_pid} = Pulsar.start_consumer(
  topic,
  subscription,
  MyCallback,
  flow_initial: 0  # Disable automatic flow
)

# Manually request messages based on your own demand
Pulsar.Consumer.send_flow(consumer_pid, 10)

# Example with Broadway demand
def handle_demand(demand, state) do
  Pulsar.Consumer.send_flow(state.consumer, demand)
  # ... rest of logic
end

start_link(topic, subscription_name, subscription_type, callback_module, opts \\ [])

Starts a consumer process with explicit parameters.

Parameters

  • topic - The topic to subscribe to
  • subscription_name - Name of the subscription
  • subscription_type - Type of subscription (e.g., :Exclusive, :Shared)
  • callback_module - Module that uses Pulsar.Consumer.Callback
  • opts - Additional options:
    • :init_args - Arguments passed to callback module's init/1 function
    • :flow_initial - Initial flow permits (default: 100). Set to 0 to disable automatic flow control and use send_flow/2 manually.
    • :flow_threshold - Flow permits threshold for automatic refill (default: 50). Ignored when :flow_initial is 0.
    • :flow_refill - Flow permits refill amount (default: 50). Ignored when :flow_initial is 0.
    • :initial_position - Initial position for subscription (:latest or :earliest, defaults to :latest)
    • :read_compacted - If true, only reads non-compacted messages from compacted topics (default: false)
    • :name - Name for the consumer on the remote broker (default: nil, no name). If provided, will be visible in broker's consumer list.
    • :redelivery_interval - Interval in milliseconds for redelivering NACKed messages (default: nil, disabled)
  • :dead_letter_policy - Dead letter policy configuration (default: nil, disabled):
    • :max_redelivery - Maximum number of redeliveries before sending to dead letter topic (must be >= 1)
    • :topic - Dead letter topic (optional, defaults to <topic>-<subscription>-DLQ)
    • :max_pending_chunked_messages - Maximum number of concurrent chunked messages to buffer (default: 10)
    • :expire_incomplete_chunked_message_after - Timeout in milliseconds for incomplete chunked messages (default: 60_000)
    • :chunk_cleanup_interval - Interval in milliseconds for checking expired chunked messages (default: 30_000)
    • :startup_delay_ms - Fixed startup delay in milliseconds before consumer initialization (default: 1000, matches broker conn_timeout)
    • :startup_jitter_ms - Maximum random startup delay in milliseconds to avoid thundering herd (default: 1000)

The total startup delay is startup_delay_ms + random(0, startup_jitter_ms), applied on every consumer start/restart. The default startup_delay_ms matches the broker's conn_timeout to ensure the broker has time to reconnect before consumers start requesting topic lookups.

The consumer will automatically use any available broker for service discovery.

stop(consumer, reason \\ :normal, timeout \\ :infinity)

@spec stop(GenServer.server(), term(), timeout()) :: :ok

Gracefully stops a consumer process.