Pulsar.Reader (Pulsar v2.8.10)

Copy Markdown View Source

A high-level interface for reading messages from Pulsar topics using Elixir's Stream API. The reader uses non-durable subscriptions, meaning it doesn't persist its position and starts fresh on each connection.

Usage

Basic usage with automatic connection:

# Read 10 messages from earliest
Pulsar.Reader.stream("persistent://public/default/my-topic",
  host: "pulsar://localhost:6650",
  start_position: :earliest
)
|> Stream.take(10)
|> Enum.each(fn message ->
  IO.inspect(message.payload)
end)

Using an external client (recommended for production):

# In your application supervision tree
{:ok, _pid} = Pulsar.start(host: "pulsar://localhost:6650")

# Later, in your code
Pulsar.Reader.stream("persistent://public/default/my-topic",
  client: :default,
  start_position: :earliest
)
|> Stream.map(fn message -> process(message) end)
|> Stream.run()

With custom flow control:

Pulsar.Reader.stream("persistent://public/default/my-topic",
  host: "pulsar://localhost:6650",
  flow_permits: 50  # Request 50 messages at a time
)
|> Enum.take(100)

Reading from a specific message:

Pulsar.Reader.stream("persistent://public/default/my-topic",
  host: "pulsar://localhost:6650",
  start_message_id: {123, 456}  # {ledger_id, entry_id}
)
|> Stream.each(&process/1)
|> Stream.run()

Options

  • :host - Pulsar broker URL (e.g., "pulsar://localhost:6650"). If provided, creates a temporary client for this stream. Mutually exclusive with :client.
  • :name - Name for the internal client (default: :default). Only used with :host. Use this to avoid conflicts when running multiple readers with :host.
  • :auth - Authentication configuration (only used with :host)
  • :socket_opts - Socket options (only used with :host)
  • :client - Name of existing Pulsar client to use (default: :default). Use this when Pulsar is already started in your supervision tree. Mutually exclusive with :host.
  • :start_position - Where to start reading (:earliest or :latest, default: :earliest)
  • :start_message_id - Start from specific message ID as {ledger_id, entry_id} tuple
  • :start_timestamp - Start from specific timestamp (milliseconds since epoch)
  • :flow_permits - Number of messages to request per flow command (default: 100)
  • :read_compacted - Only read non-deleted messages from compacted topics (default: false)
  • :timeout - Inactivity timeout in milliseconds (default: 60_000). Stream halts if no message is received within this time.

Connection Management

Internal Client Mode (host)

When :host is provided, the stream creates a temporary client that lives only for the duration of the stream. The connection is automatically closed when the stream completes or is halted.

Pulsar.Reader.stream(topic, host: "pulsar://localhost:6650")
|> Enum.take(10)
# Connection automatically closed after consuming 10 messages

External Client Mode (client)

When :client is provided (or defaulted), the stream uses an existing client from your application's supervision tree. The client remains running after the stream completes.

# In your application.ex
children = [
  {Pulsar, host: "pulsar://localhost:6650"}
]

# Use the existing client
Pulsar.Reader.stream(topic, client: :default)
|> Enum.take(10)
# Client remains running

Partitioned Topics

The Reader supports partitioned topics. When reading from a partitioned topic, messages from all partitions are merged into a single stream. Note that message ordering across partitions is not guaranteed - messages may arrive interleaved from different partitions.

If you need per-partition ordering, consider using separate Reader streams for each partition (e.g., "persistent://tenant/ns/topic-partition-0").

Process Ownership

The stream is bound to the process that creates it. Messages are delivered to the creating process's mailbox, so you cannot pass the stream to another process for consumption.

For multi-process consumption patterns, use the Pulsar.Consumer API directly or consider off_broadway_pulsar for Broadway-based pipelines.

Stream Termination

The stream terminates when any of these conditions is met:

  • The consumer receives all requested messages (e.g., via Enum.take/2)
  • The inactivity timeout is reached (default: 60 seconds)
  • The stream is halted by downstream processing

Summary

Functions

Creates a stream of messages from a Pulsar topic.

Functions

stream(topic, opts \\ [])

@spec stream(
  String.t(),
  keyword()
) :: Enumerable.t()

Creates a stream of messages from a Pulsar topic.

Returns a Stream that yields Pulsar.Message structs. If initialization fails, the stream emits {:error, reason} as the first (and only) element.

The stream handles connection lifecycle automatically based on whether you provide :host or :client.

Examples

# Read with internal connection (closes automatically)
Pulsar.Reader.stream("persistent://public/default/topic",
  host: "pulsar://localhost:6650",
  start_position: :earliest
)
|> Enum.take(5)

# Read with external client (remains open)
Pulsar.Reader.stream("persistent://public/default/topic",
  client: :default,
  start_position: :latest
)
|> Stream.filter(&interesting?/1)
|> Enum.to_list()

# Handle errors (emitted as first element if initialization fails)
Pulsar.Reader.stream("persistent://public/default/topic",
  host: "pulsar://invalid:6650"
)
|> Enum.take(1)
|> case do
  [{:error, reason}] -> Logger.error("Failed: #{inspect(reason)}")
  messages -> process(messages)
end