# `OffBroadway.Pulsar.Producer`
[🔗](https://github.com/efcasado/off_broadway_pulsar/blob/main/lib/off_broadway/pulsar/producer.ex#L1)

A Broadway producer for Apache Pulsar.

This producer receives messages from Pulsar topics and forwards them to the
Broadway pipeline. It implements flow control using Pulsar's permit window
mechanism, which proactively requests batches of messages rather than
requesting per-message.

Supports two connection patterns:
- **Producer-managed**: Pass `:host` to have the producer start its own Pulsar connection
- **Application-managed**: Omit `:host` to use a globally started Pulsar connection

See `start_link/1` for detailed configuration options.

# `start_link`

Starts an `OffBroadway.Pulsar` producer process linked to the current
process.

## Configuration

- `:host` - Broker URL (e.g., "pulsar://localhost:6650") (optional).
  If provided, the producer will start its own Pulsar connection.
  If not provided, the producer assumes Pulsar is already started globally
  (e.g., in your application's supervision tree).
- `:client` - Client name to use when `:host` is not provided (optional, default: `:default`).
  Only used when connecting to a globally started Pulsar instance.
- `:topic` - A single Pulsar topic to consume from (required if `:topics` is not set)
- `:topics` - A list of Pulsar topics to consume from (required if `:topic` is not set).
  One consumer is started per topic. Providing a single topic via `:topic` is equivalent
  to `topics: [topic]` and is kept for backwards compatibility.
- `:subscription` - The subscription name (required)
- `:conn_opts` - Connection options passed to `Pulsar.start/1` (optional, only used if `:host` is provided):
  - `:socket_opts` - Socket options (e.g., `[verify: :verify_none]`)
  - `:auth` - Authentication configuration
  - `:conn_timeout` - Connection timeout in milliseconds
- `:consumer_opts` - Consumer-specific options passed to `Pulsar.start_consumer/4` (optional).
  Applied to all topics when using `:topics`.
  - `:subscription_type` - Subscription type (`:Exclusive`, `:Shared`, `:Key_Shared`, default: `:Shared`)
  - `:initial_position` - Initial position (`:latest` or `:earliest`, default: `:latest`)
  - `:durable` - Whether subscription is durable (default: `true`)
  - `:force_create_topic` - Force topic creation (default: `true`)
  - `:start_message_id` - Start from specific message ID
  - `:start_timestamp` - Start from timestamp
  - `:redelivery_interval` - Redelivery interval in milliseconds for NACKed messages
  - `:dead_letter_policy` - Dead letter queue configuration
  - `:startup_delay_ms` - Fixed startup delay in milliseconds before consumer initialization (default: 0)
  - `:startup_jitter_ms` - Random startup delay (0 to N ms) to avoid thundering herd on consumer restart (default: 0)
  - `:max_pending_chunked_messages` - Maximum number of concurrent chunked messages to buffer (default: 10)
  - `:expire_incomplete_chunked_message_after` - Timeout in milliseconds for incomplete chunked messages (default: 60_000)
  - `:chunk_cleanup_interval` - Interval in milliseconds for checking expired chunked messages (default: 30_000)
  - `:read_compacted` - If true, reads messages from the compacted topic ledger (default: false)

The total consumer startup delay is `startup_delay_ms + random(0, startup_jitter_ms)`, applied on every consumer start/restart.

## Flow Control Options

The producer uses Pulsar's native permit window flow control. These options
match the naming convention used in `pulsar-elixir` consumer:

- `:flow_initial` - Initial permits requested at startup (optional, default: 100)
- `:flow_threshold` - Trigger refill when permits drop to this level (optional, default: 50)
- `:flow_refill` - Number of permits to request on each refill (optional, default: 50)

Note: These flow control options are for the Broadway producer level and override
the consumer's automatic flow control (which is disabled by setting `flow_initial: 0`
in the underlying consumer).

Broadway processor demands are satisfied from the already-requested permit window,
eliminating per-demand flow requests.

When using `producer: [concurrency: N]` with N > 1, each producer maintains
its own independent permit window.

## Usage Patterns

### Pattern 1: Single topic

    producer: [
      module: {OffBroadway.Pulsar.Producer,
        host: "pulsar://localhost:6650",
        topic: "my-topic",
        subscription: "my-subscription"
      }
    ]

### Pattern 2: Multiple topics

    producer: [
      module: {OffBroadway.Pulsar.Producer,
        host: "pulsar://localhost:6650",
        topics: ["topic-a", "topic-b", "topic-c"],
        subscription: "my-subscription"
      }
    ]

### Pattern 3: Application-managed connection (no host)

    # In your application.ex:
    children = [
      {Pulsar, host: "pulsar://localhost:6650"},
      MyApp.PulsarPipeline
    ]

    # In your producer config:
    producer: [
      module: {OffBroadway.Pulsar.Producer,
        topics: ["topic-a", "topic-b"],
        subscription: "my-subscription"
      }
    ]

---

*Consult [api-reference.md](api-reference.md) for complete listing*
