What is a Reader?

A Reader is a high-level interface for reading messages from Pulsar topics using Elixir's Stream abstraction. Unlike Consumers, which are callback-based and designed for continuous message processing with persistent subscriptions, readers are designed for:

  • Batch processing: Reading a sequence of messages and stopping
  • Stream pipelines: Transforming and filtering data using Elixir's functional Enum and Stream modules
  • Replay: Reading messages from a specific position (e.g., from the beginning or a specific message ID)
  • One-off tasks: Scripts or jobs that need to consume data without setting up a full Consumer supervision tree

Readers use non-durable subscriptions, meaning they don't persist their position on the broker. Each time you start a Reader, you specify where to start reading from.

Basic Usage

The simplest way to use a Reader is to stream messages from a topic using an internal client:

"persistent://public/default/my-topic"
|> Pulsar.Reader.stream(host: "pulsar://localhost:6650")
|> Stream.map(fn msg -> msg.payload end)
|> Enum.take(10)

This creates a stream that:

  1. Connects to the Pulsar broker
  2. Reads 10 messages from the topic (starting from :earliest by default)
  3. Extracts the payload
  4. Automatically closes the connection when done

Note

The Reader stream is bound to the process that creates it.

Messages are delivered to the creating process's mailbox. You cannot create a stream in one process and pass it to another for consumption. If you need concurrent consumption:

  1. Create multiple streams in separate processes (e.g., inside Task.async)
  2. Use partitioned topics (the Reader handles them automatically, merging partitions into a single stream)

Connection Management

You can choose between two connection modes:

Internal Client (Host Mode)

Provide a :host URL, and the stream will manage its own temporary Pulsar client. The client is started when the stream begins and stopped when it terminates.

Pulsar.Reader.stream(topic, host: "pulsar://localhost:6650")

External Client (Client Mode)

Use an existing Pulsar client from your application's supervision tree. This is more efficient if you're running multiple streams or already have a client connection.

# In application.ex
children = [
  {Pulsar, host: "pulsar://localhost:6650", name: :my_app_client}
]

# In your code
Pulsar.Reader.stream(topic, client: :my_app_client)

Start Positions

You can control where the Reader starts consuming messages:

From Earliest/Latest

# Start from the oldest available message (default)
Pulsar.Reader.stream(topic, start_position: :earliest)

# Start only with new messages published after the reader starts
Pulsar.Reader.stream(topic, start_position: :latest)

From Specific Message ID

Resume reading from a specific message (inclusive):

message_id = {ledger_id, entry_id} # e.g. {123, 456}

Pulsar.Reader.stream(topic, start_message_id: message_id)

From Timestamp

Read messages published at or after a specific timestamp (Unix timestamp in milliseconds):

timestamp = :os.system_time(:millisecond) - 3600_000 # 1 hour ago

Pulsar.Reader.stream(topic, start_timestamp: timestamp)

Stream Processing Examples

Filter and Map

Read messages, filter for interesting ones, and transform them:

topic
|> Pulsar.Reader.stream(client: :default)
|> Stream.map(fn msg -> Jason.decode!(msg.payload) end)
|> Stream.filter(fn event -> event["type"] == "user_signup" end)
|> Stream.map(fn event -> event["user_id"] end)
|> Enum.each(&IO.inspect/1)

Batch Processing

Process messages in chunks using Stream.chunk_every/2:

topic
|> Pulsar.Reader.stream(client: :default)
|> Stream.chunk_every(100)
|> Enum.each(fn batch ->
  # Insert batch of 100 messages into database
  Repo.insert_all(User, batch)
end)

Timeout Handling

By default, the stream waits up to 60 seconds for new messages before terminating. You can adjust this with :timeout:

topic
|> Pulsar.Reader.stream(client: :default, timeout: 5000) # 5s timeout
|> Enum.to_list()

Error Handling

If initialization fails (e.g., invalid topic, connection error), the stream emits {:error, reason} as its first and only element:

topic
|> Pulsar.Reader.stream(host: "pulsar://invalid:6650")
|> Enum.take(1)
|> case do
  [{:error, reason}] -> Logger.error("Failed: #{inspect(reason)}")
  messages -> process(messages)
end

Flow Control

The Reader manages flow control internally. You can configure the number of permits (messages requested from the broker) using :flow_permits:

# Request 50 messages at a time (default: 100)
Pulsar.Reader.stream(topic, flow_permits: 50)

For most use cases, the default is fine. Adjust this if you're processing very large messages or want finer-grained control over memory usage.

Configuration Options

OptionTypeDefaultDescription
:hoststring-Pulsar broker URL (mutually exclusive with :client)
:nameatom:defaultName for the internal client (only used with :host)
:authtuple-Authentication configuration (only used with :host)
:socket_optslist-Socket options (only used with :host)
:clientatom:defaultName of existing client (mutually exclusive with :host)
:start_positionatom:earliest:earliest or :latest
:start_message_idtuple-{ledger_id, entry_id} tuple to start from
:start_timestampinteger-Unix timestamp (ms) to start from
:flow_permitsinteger100Number of messages to request per flow batch
:timeoutinteger60_000Inactivity timeout in milliseconds
:read_compactedbooleanfalseRead only latest value for each key (compacted topics)
:startup_delay_msinteger0Delay before consumer starts (ms)
:startup_jitter_msinteger0Random jitter added to startup delay (ms)