# `Pulsar.Reader`
[🔗](https://github.com/efcasado/pulsar-elixir/blob/main/lib/pulsar/reader.ex#L1)

A high-level interface for reading messages from Pulsar topics using
Elixir's Stream API. The reader uses non-durable subscriptions, meaning
it doesn't persist its position and starts fresh on each connection.

## Usage

Basic usage with automatic connection:

    # Read 10 messages from earliest
    Pulsar.Reader.stream("persistent://public/default/my-topic",
      host: "pulsar://localhost:6650",
      start_position: :earliest
    )
    |> Stream.take(10)
    |> Enum.each(fn message ->
      IO.inspect(message.payload)
    end)

Using an external client (recommended for production):

    # In your application supervision tree
    {:ok, _pid} = Pulsar.start(host: "pulsar://localhost:6650")

    # Later, in your code
    Pulsar.Reader.stream("persistent://public/default/my-topic",
      client: :default,
      start_position: :earliest
    )
    |> Stream.map(fn message -> process(message) end)
    |> Stream.run()

With custom flow control:

    Pulsar.Reader.stream("persistent://public/default/my-topic",
      host: "pulsar://localhost:6650",
      flow_permits: 50  # Request 50 messages at a time
    )
    |> Enum.take(100)

Reading from a specific message:

    Pulsar.Reader.stream("persistent://public/default/my-topic",
      host: "pulsar://localhost:6650",
      start_message_id: {123, 456}  # {ledger_id, entry_id}
    )
    |> Stream.each(&process/1)
    |> Stream.run()

## Options

- `:host` - Pulsar broker URL (e.g., "pulsar://localhost:6650").
  If provided, creates a temporary client for this stream. Mutually exclusive with `:client`.
- `:name` - Name for the internal client (default: `:default`). Only used with `:host`.
  Use this to avoid conflicts when running multiple readers with `:host`.
- `:auth` - Authentication configuration (only used with `:host`)
- `:socket_opts` - Socket options (only used with `:host`)
- `:client` - Name of existing Pulsar client to use (default: `:default`).
  Use this when Pulsar is already started in your supervision tree. Mutually exclusive with `:host`.
- `:start_position` - Where to start reading (`:earliest` or `:latest`, default: `:earliest`)
- `:start_message_id` - Start from specific message ID as `{ledger_id, entry_id}` tuple
- `:start_timestamp` - Start from specific timestamp (milliseconds since epoch)
- `:flow_permits` - Number of messages to request per flow command (default: 100)
- `:read_compacted` - Only read non-deleted messages from compacted topics (default: false)
- `:timeout` - Inactivity timeout in milliseconds (default: `60_000`). Stream halts if
  no message is received within this time.

## Connection Management

### Internal Client Mode (host)
When `:host` is provided, the stream creates a temporary client that lives
only for the duration of the stream. The connection is automatically closed when
the stream completes or is halted.

    Pulsar.Reader.stream(topic, host: "pulsar://localhost:6650")
    |> Enum.take(10)
    # Connection automatically closed after consuming 10 messages

### External Client Mode (client)
When `:client` is provided (or defaulted), the stream uses an existing client
from your application's supervision tree. The client remains running after
the stream completes.

    # In your application.ex
    children = [
      {Pulsar, host: "pulsar://localhost:6650"}
    ]

    # Use the existing client
    Pulsar.Reader.stream(topic, client: :default)
    |> Enum.take(10)
    # Client remains running

## Partitioned Topics

The Reader supports partitioned topics. When reading from a partitioned topic,
messages from all partitions are merged into a single stream. **Note that message
ordering across partitions is not guaranteed** - messages may arrive interleaved
from different partitions.

If you need per-partition ordering, consider using separate Reader streams for
each partition (e.g., `"persistent://tenant/ns/topic-partition-0"`).

## Process Ownership

The stream is bound to the process that creates it. Messages are delivered to
the creating process's mailbox, so you cannot pass the stream to another process
for consumption.

For multi-process consumption patterns, use the `Pulsar.Consumer` API directly
or consider [off_broadway_pulsar](https://github.com/efcasado/off_broadway_pulsar)
for Broadway-based pipelines.

## Stream Termination

The stream terminates when any of these conditions is met:
- The consumer receives all requested messages (e.g., via `Enum.take/2`)
- The inactivity timeout is reached (default: 60 seconds)
- The stream is halted by downstream processing

# `stream`

```elixir
@spec stream(
  String.t(),
  keyword()
) :: Enumerable.t()
```

Creates a stream of messages from a Pulsar topic.

Returns a `Stream` that yields `Pulsar.Message` structs. If initialization
fails, the stream emits `{:error, reason}` as the first (and only) element.

The stream handles connection lifecycle automatically based on whether
you provide `:host` or `:client`.

## Examples

    # Read with internal connection (closes automatically)
    Pulsar.Reader.stream("persistent://public/default/topic",
      host: "pulsar://localhost:6650",
      start_position: :earliest
    )
    |> Enum.take(5)

    # Read with external client (remains open)
    Pulsar.Reader.stream("persistent://public/default/topic",
      client: :default,
      start_position: :latest
    )
    |> Stream.filter(&interesting?/1)
    |> Enum.to_list()

    # Handle errors (emitted as first element if initialization fails)
    Pulsar.Reader.stream("persistent://public/default/topic",
      host: "pulsar://invalid:6650"
    )
    |> Enum.take(1)
    |> case do
      [{:error, reason}] -> Logger.error("Failed: #{inspect(reason)}")
      messages -> process(messages)
    end

---

*Consult [api-reference.md](api-reference.md) for complete listing*
