OffBroadway.Pulsar.Producer (off_broadway_pulsar v1.3.5)

Copy Markdown View Source

A Broadway producer for Apache Pulsar.

This producer receives messages from Pulsar topics and forwards them to the Broadway pipeline. It implements flow control using Pulsar's permit window mechanism, which proactively requests batches of messages rather than requesting per-message.

Supports two connection patterns:

  • Producer-managed: Pass :host to have the producer start its own Pulsar connection
  • Application-managed: Omit :host to use a globally started Pulsar connection

See start_link/1 for detailed configuration options.

Summary

Functions

Starts an OffBroadway.Pulsar producer process linked to the current process.

Functions

start_link(opts)

Starts an OffBroadway.Pulsar producer process linked to the current process.

Configuration

  • :host - Broker URL (e.g., "pulsar://localhost:6650") (optional). If provided, the producer will start its own Pulsar connection. If not provided, the producer assumes Pulsar is already started globally (e.g., in your application's supervision tree).
  • :client - Client name to use when :host is not provided (optional, default: :default). Only used when connecting to a globally started Pulsar instance.
  • :topic - A single Pulsar topic to consume from (required if :topics is not set)
  • :topics - A list of Pulsar topics to consume from (required if :topic is not set). One consumer is started per topic. Providing a single topic via :topic is equivalent to topics: [topic] and is kept for backwards compatibility.
  • :subscription - The subscription name (required)
  • :conn_opts - Connection options passed to Pulsar.start/1 (optional, only used if :host is provided):
    • :socket_opts - Socket options (e.g., [verify: :verify_none])
    • :auth - Authentication configuration
    • :conn_timeout - Connection timeout in milliseconds
  • :consumer_opts - Consumer-specific options passed to Pulsar.start_consumer/4 (optional). Applied to all topics when using :topics.
    • :subscription_type - Subscription type (:Exclusive, :Shared, :Key_Shared, default: :Shared)
    • :initial_position - Initial position (:latest or :earliest, default: :latest)
    • :durable - Whether subscription is durable (default: true)
    • :force_create_topic - Force topic creation (default: true)
    • :start_message_id - Start from specific message ID
    • :start_timestamp - Start from timestamp
    • :redelivery_interval - Redelivery interval in milliseconds for NACKed messages
    • :dead_letter_policy - Dead letter queue configuration
    • :startup_delay_ms - Fixed startup delay in milliseconds before consumer initialization (default: 0)
    • :startup_jitter_ms - Random startup delay (0 to N ms) to avoid thundering herd on consumer restart (default: 0)
    • :max_pending_chunked_messages - Maximum number of concurrent chunked messages to buffer (default: 10)
    • :expire_incomplete_chunked_message_after - Timeout in milliseconds for incomplete chunked messages (default: 60_000)
    • :chunk_cleanup_interval - Interval in milliseconds for checking expired chunked messages (default: 30_000)
    • :read_compacted - If true, reads messages from the compacted topic ledger (default: false)

The total consumer startup delay is startup_delay_ms + random(0, startup_jitter_ms), applied on every consumer start/restart.

Flow Control Options

The producer uses Pulsar's native permit window flow control. These options match the naming convention used in pulsar-elixir consumer:

  • :flow_initial - Initial permits requested at startup (optional, default: 100)
  • :flow_threshold - Trigger refill when permits drop to this level (optional, default: 50)
  • :flow_refill - Number of permits to request on each refill (optional, default: 50)

Note: These flow control options are for the Broadway producer level and override the consumer's automatic flow control (which is disabled by setting flow_initial: 0 in the underlying consumer).

Broadway processor demands are satisfied from the already-requested permit window, eliminating per-demand flow requests.

When using producer: [concurrency: N] with N > 1, each producer maintains its own independent permit window.

Usage Patterns

Pattern 1: Single topic

producer: [
  module: {OffBroadway.Pulsar.Producer,
    host: "pulsar://localhost:6650",
    topic: "my-topic",
    subscription: "my-subscription"
  }
]

Pattern 2: Multiple topics

producer: [
  module: {OffBroadway.Pulsar.Producer,
    host: "pulsar://localhost:6650",
    topics: ["topic-a", "topic-b", "topic-c"],
    subscription: "my-subscription"
  }
]

Pattern 3: Application-managed connection (no host)

# In your application.ex:
children = [
  {Pulsar, host: "pulsar://localhost:6650"},
  MyApp.PulsarPipeline
]

# In your producer config:
producer: [
  module: {OffBroadway.Pulsar.Producer,
    topics: ["topic-a", "topic-b"],
    subscription: "my-subscription"
  }
]