Pulsar producer process that communicates with broker processes.
This producer uses service discovery to find the appropriate broker for the topic and then communicates with that broker process.
Initialization follows a multi-phase pattern using {:continue, ...} to
avoid blocking the caller during broker discovery and registration.
Summary
Functions
Returns a specification to start this module under a supervisor.
Sends a message through this producer.
Waits for acknowledgment from the broker.
Returns {:ok, message_id} on success or {:error, reason} on failure.
Starts a producer process.
Gracefully stops a producer process.
Types
@type t() :: %Pulsar.Producer{ access_mode: atom(), batch: [{map(), GenServer.from()}], batch_enabled: boolean(), batch_flush_timer: reference() | nil, batch_size: non_neg_integer(), batch_size_threshold: non_neg_integer(), broker_monitor: reference(), broker_pid: pid(), chunking_enabled: boolean(), client: term(), compression: :NONE | :LZ4 | :ZLIB | :SNAPPY | :ZSTD, flush_interval: non_neg_integer(), max_message_size: non_neg_integer(), pending_sends: %{required(integer()) => {GenServer.from(), map()}}, producer_id: integer(), producer_name: String.t() | nil, ready: boolean() | nil, registration_request_id: integer() | nil, schema: Pulsar.Schema.t() | nil, schema_version: binary() | nil, sequence_id: integer(), topic: String.t(), topic_epoch: integer() | nil }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Sends a message through this producer.
Waits for acknowledgment from the broker.
Returns {:ok, message_id} on success or {:error, reason} on failure.
Parameters
producer_pid- The producer process PIDmessage- Binary message payloadopts- Optional parameters::timeout- Timeout in milliseconds (default: 5000):partition_key- Partition routing key (string):ordering_key- Key for ordering in Key_Shared subscriptions (binary):properties- Custom message metadata as a map (e.g.,%{"trace_id" => "abc"}):event_time- Application event timestamp (DateTime or milliseconds since epoch):deliver_at_time- Absolute delayed delivery time (DateTime or milliseconds since epoch):deliver_after- Relative delayed delivery in milliseconds from now
Starts a producer process.
Parameters
topic- The topic to publish toopts- Additional options::name- Producer name (optional, will be auto-generated if not provided):access_mode- Producer access mode (default::Shared). Available modes::Shared- Multiple producers can publish on the topic (default):Exclusive- Only one producer can publish. If another producer tries to connect, it will receive an error immediately. The old producer is evicted if it experiences a network partition with the broker.:WaitForExclusive- If there is already a producer, wait until exclusive access is granted:ExclusiveWithFencing- If there is already a producer, it will be removed (fenced out)
:compression- Compression algorithm (default: :NONE):chunking_enabled- Enable message chunking for large messages (default: false):max_message_size- Maximum size of each chunk in bytes when chunking is enabled (default: 5_242_880, which is 5MB):schema- Schema configuration as keyword list (optional)::type- (required) Schema type (e.g., :Json, :String, :Avro):definition- Schema definition (required for non-primitive types like Json, Avro):name- Optional schema name:properties- Optional metadata properties as a map
:startup_delay_ms- Fixed startup delay in milliseconds before producer initialization (default: 1000, matches broker conn_timeout):startup_jitter_ms- Maximum random startup delay in milliseconds to avoid thundering herd (default: 1000)
The total startup delay is startup_delay_ms + random(0, startup_jitter_ms), applied on every producer start/restart.
The default startup_delay_ms matches the broker's conn_timeout to ensure the broker has time to reconnect
before producers start requesting topic lookups.
The producer will automatically use service discovery to find the broker. If no name is provided, the broker will assign a unique producer name.
Examples
# Default shared mode
{:ok, producer} = Producer.start_link("persistent://public/default/my-topic")
# With custom name and exclusive mode
{:ok, producer} = Producer.start_link(
"persistent://public/default/my-topic",
name: "my-producer",
access_mode: :Exclusive
)
# With schema
{:ok, producer} = Producer.start_link(
"persistent://public/default/my-topic",
schema: [type: :Json, definition: json_schema_def]
)
@spec stop(GenServer.server(), term(), timeout()) :: :ok
Gracefully stops a producer process.