A high-level interface for reading messages from Pulsar topics using Elixir's Stream API. The reader uses non-durable subscriptions, meaning it doesn't persist its position and starts fresh on each connection.
Usage
Basic usage with automatic connection:
# Read 10 messages from earliest
Pulsar.Reader.stream("persistent://public/default/my-topic",
host: "pulsar://localhost:6650",
start_position: :earliest
)
|> Stream.take(10)
|> Enum.each(fn message ->
IO.inspect(message.payload)
end)Using an external client (recommended for production):
# In your application supervision tree
{:ok, _pid} = Pulsar.start(host: "pulsar://localhost:6650")
# Later, in your code
Pulsar.Reader.stream("persistent://public/default/my-topic",
client: :default,
start_position: :earliest
)
|> Stream.map(fn message -> process(message) end)
|> Stream.run()With custom flow control:
Pulsar.Reader.stream("persistent://public/default/my-topic",
host: "pulsar://localhost:6650",
flow_permits: 50 # Request 50 messages at a time
)
|> Enum.take(100)Reading from a specific message:
Pulsar.Reader.stream("persistent://public/default/my-topic",
host: "pulsar://localhost:6650",
start_message_id: {123, 456} # {ledger_id, entry_id}
)
|> Stream.each(&process/1)
|> Stream.run()Options
:host- Pulsar broker URL (e.g., "pulsar://localhost:6650"). If provided, creates a temporary client for this stream. Mutually exclusive with:client.:name- Name for the internal client (default::default). Only used with:host. Use this to avoid conflicts when running multiple readers with:host.:auth- Authentication configuration (only used with:host):socket_opts- Socket options (only used with:host):client- Name of existing Pulsar client to use (default::default). Use this when Pulsar is already started in your supervision tree. Mutually exclusive with:host.:start_position- Where to start reading (:earliestor:latest, default::earliest):start_message_id- Start from specific message ID as{ledger_id, entry_id}tuple:start_timestamp- Start from specific timestamp (milliseconds since epoch):flow_permits- Number of messages to request per flow command (default: 100):read_compacted- Only read non-deleted messages from compacted topics (default: false):timeout- Inactivity timeout in milliseconds (default:60_000). Stream halts if no message is received within this time.
Connection Management
Internal Client Mode (host)
When :host is provided, the stream creates a temporary client that lives
only for the duration of the stream. The connection is automatically closed when
the stream completes or is halted.
Pulsar.Reader.stream(topic, host: "pulsar://localhost:6650")
|> Enum.take(10)
# Connection automatically closed after consuming 10 messagesExternal Client Mode (client)
When :client is provided (or defaulted), the stream uses an existing client
from your application's supervision tree. The client remains running after
the stream completes.
# In your application.ex
children = [
{Pulsar, host: "pulsar://localhost:6650"}
]
# Use the existing client
Pulsar.Reader.stream(topic, client: :default)
|> Enum.take(10)
# Client remains runningPartitioned Topics
The Reader supports partitioned topics. When reading from a partitioned topic, messages from all partitions are merged into a single stream. Note that message ordering across partitions is not guaranteed - messages may arrive interleaved from different partitions.
If you need per-partition ordering, consider using separate Reader streams for
each partition (e.g., "persistent://tenant/ns/topic-partition-0").
Process Ownership
The stream is bound to the process that creates it. Messages are delivered to the creating process's mailbox, so you cannot pass the stream to another process for consumption.
For multi-process consumption patterns, use the Pulsar.Consumer API directly
or consider off_broadway_pulsar
for Broadway-based pipelines.
Stream Termination
The stream terminates when any of these conditions is met:
- The consumer receives all requested messages (e.g., via
Enum.take/2) - The inactivity timeout is reached (default: 60 seconds)
- The stream is halted by downstream processing
Summary
Functions
Creates a stream of messages from a Pulsar topic.
Functions
@spec stream( String.t(), keyword() ) :: Enumerable.t()
Creates a stream of messages from a Pulsar topic.
Returns a Stream that yields Pulsar.Message structs. If initialization
fails, the stream emits {:error, reason} as the first (and only) element.
The stream handles connection lifecycle automatically based on whether
you provide :host or :client.
Examples
# Read with internal connection (closes automatically)
Pulsar.Reader.stream("persistent://public/default/topic",
host: "pulsar://localhost:6650",
start_position: :earliest
)
|> Enum.take(5)
# Read with external client (remains open)
Pulsar.Reader.stream("persistent://public/default/topic",
client: :default,
start_position: :latest
)
|> Stream.filter(&interesting?/1)
|> Enum.to_list()
# Handle errors (emitted as first element if initialization fails)
Pulsar.Reader.stream("persistent://public/default/topic",
host: "pulsar://invalid:6650"
)
|> Enum.take(1)
|> case do
[{:error, reason}] -> Logger.error("Failed: #{inspect(reason)}")
messages -> process(messages)
end