A Broadway producer for Apache Pulsar.
This producer receives messages from Pulsar topics and forwards them to the Broadway pipeline. It implements flow control using Pulsar's permit window mechanism, which proactively requests batches of messages rather than requesting per-message.
Supports two connection patterns:
- Producer-managed: Pass
:hostto have the producer start its own Pulsar connection - Application-managed: Omit
:hostto use a globally started Pulsar connection
See start_link/1 for detailed configuration options.
Summary
Functions
Starts an OffBroadway.Pulsar producer process linked to the current
process.
Functions
Starts an OffBroadway.Pulsar producer process linked to the current
process.
Configuration
:host- Broker URL (e.g., "pulsar://localhost:6650") (optional). If provided, the producer will start its own Pulsar connection. If not provided, the producer assumes Pulsar is already started globally (e.g., in your application's supervision tree).:client- Client name to use when:hostis not provided (optional, default::default). Only used when connecting to a globally started Pulsar instance.:topic- A single Pulsar topic to consume from (required if:topicsis not set):topics- A list of Pulsar topics to consume from (required if:topicis not set). One consumer is started per topic. Providing a single topic via:topicis equivalent totopics: [topic]and is kept for backwards compatibility.:subscription- The subscription name (required):conn_opts- Connection options passed toPulsar.start/1(optional, only used if:hostis provided)::socket_opts- Socket options (e.g.,[verify: :verify_none]):auth- Authentication configuration:conn_timeout- Connection timeout in milliseconds
:consumer_opts- Consumer-specific options passed toPulsar.start_consumer/4(optional). Applied to all topics when using:topics.:subscription_type- Subscription type (:Exclusive,:Shared,:Key_Shared, default::Shared):initial_position- Initial position (:latestor:earliest, default::latest):durable- Whether subscription is durable (default:true):force_create_topic- Force topic creation (default:true):start_message_id- Start from specific message ID:start_timestamp- Start from timestamp:redelivery_interval- Redelivery interval in milliseconds for NACKed messages:dead_letter_policy- Dead letter queue configuration:startup_delay_ms- Fixed startup delay in milliseconds before consumer initialization (default: 0):startup_jitter_ms- Random startup delay (0 to N ms) to avoid thundering herd on consumer restart (default: 0):max_pending_chunked_messages- Maximum number of concurrent chunked messages to buffer (default: 10):expire_incomplete_chunked_message_after- Timeout in milliseconds for incomplete chunked messages (default: 60_000):chunk_cleanup_interval- Interval in milliseconds for checking expired chunked messages (default: 30_000):read_compacted- If true, reads messages from the compacted topic ledger (default: false)
The total consumer startup delay is startup_delay_ms + random(0, startup_jitter_ms), applied on every consumer start/restart.
Flow Control Options
The producer uses Pulsar's native permit window flow control. These options
match the naming convention used in pulsar-elixir consumer:
:flow_initial- Initial permits requested at startup (optional, default: 100):flow_threshold- Trigger refill when permits drop to this level (optional, default: 50):flow_refill- Number of permits to request on each refill (optional, default: 50)
Note: These flow control options are for the Broadway producer level and override
the consumer's automatic flow control (which is disabled by setting flow_initial: 0
in the underlying consumer).
Broadway processor demands are satisfied from the already-requested permit window, eliminating per-demand flow requests.
When using producer: [concurrency: N] with N > 1, each producer maintains
its own independent permit window.
Usage Patterns
Pattern 1: Single topic
producer: [
module: {OffBroadway.Pulsar.Producer,
host: "pulsar://localhost:6650",
topic: "my-topic",
subscription: "my-subscription"
}
]Pattern 2: Multiple topics
producer: [
module: {OffBroadway.Pulsar.Producer,
host: "pulsar://localhost:6650",
topics: ["topic-a", "topic-b", "topic-c"],
subscription: "my-subscription"
}
]Pattern 3: Application-managed connection (no host)
# In your application.ex:
children = [
{Pulsar, host: "pulsar://localhost:6650"},
MyApp.PulsarPipeline
]
# In your producer config:
producer: [
module: {OffBroadway.Pulsar.Producer,
topics: ["topic-a", "topic-b"],
subscription: "my-subscription"
}
]