Pulsar.Producer (Pulsar v2.8.13)

Copy Markdown View Source

Pulsar producer process that communicates with broker processes.

This producer uses service discovery to find the appropriate broker for the topic and then communicates with that broker process.

Initialization follows a multi-phase pattern using {:continue, ...} to avoid blocking the caller during broker discovery and registration.

Summary

Functions

Returns a specification to start this module under a supervisor.

Sends a message through this producer. Waits for acknowledgment from the broker. Returns {:ok, message_id} on success or {:error, reason} on failure.

Starts a producer process.

Gracefully stops a producer process.

Types

t()

@type t() :: %Pulsar.Producer{
  access_mode: atom(),
  batch: [{map(), GenServer.from()}],
  batch_enabled: boolean(),
  batch_flush_timer: reference() | nil,
  batch_size: non_neg_integer(),
  batch_size_threshold: non_neg_integer(),
  broker_monitor: reference(),
  broker_pid: pid(),
  chunking_enabled: boolean(),
  client: term(),
  compression: :NONE | :LZ4 | :ZLIB | :SNAPPY | :ZSTD,
  flush_interval: non_neg_integer(),
  max_message_size: non_neg_integer(),
  pending_sends: %{required(integer()) => {GenServer.from(), map()}},
  producer_id: integer(),
  producer_name: String.t() | nil,
  ready: boolean() | nil,
  registration_request_id: integer() | nil,
  schema: Pulsar.Schema.t() | nil,
  schema_version: binary() | nil,
  sequence_id: integer(),
  topic: String.t(),
  topic_epoch: integer() | nil
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

send_message(producer_pid, message, opts \\ [])

@spec send_message(pid(), binary(), keyword()) :: {:ok, map()} | {:error, term()}

Sends a message through this producer. Waits for acknowledgment from the broker. Returns {:ok, message_id} on success or {:error, reason} on failure.

Parameters

  • producer_pid - The producer process PID
  • message - Binary message payload
  • opts - Optional parameters:
    • :timeout - Timeout in milliseconds (default: 5000)
    • :partition_key - Partition routing key (string)
    • :ordering_key - Key for ordering in Key_Shared subscriptions (binary)
    • :properties - Custom message metadata as a map (e.g., %{"trace_id" => "abc"})
    • :event_time - Application event timestamp (DateTime or milliseconds since epoch)
    • :deliver_at_time - Absolute delayed delivery time (DateTime or milliseconds since epoch)
    • :deliver_after - Relative delayed delivery in milliseconds from now

start_link(topic, opts \\ [])

Starts a producer process.

Parameters

  • topic - The topic to publish to
  • opts - Additional options:
    • :name - Producer name (optional, will be auto-generated if not provided)
    • :access_mode - Producer access mode (default: :Shared). Available modes:
      • :Shared - Multiple producers can publish on the topic (default)
      • :Exclusive - Only one producer can publish. If another producer tries to connect, it will receive an error immediately. The old producer is evicted if it experiences a network partition with the broker.
      • :WaitForExclusive - If there is already a producer, wait until exclusive access is granted
      • :ExclusiveWithFencing - If there is already a producer, it will be removed (fenced out)
    • :compression - Compression algorithm (default: :NONE)
    • :chunking_enabled - Enable message chunking for large messages (default: false)
    • :max_message_size - Maximum size of each chunk in bytes when chunking is enabled (default: 5_242_880, which is 5MB)
    • :schema - Schema configuration as keyword list (optional):
      • :type - (required) Schema type (e.g., :Json, :String, :Avro)
      • :definition - Schema definition (required for non-primitive types like Json, Avro)
      • :name - Optional schema name
      • :properties - Optional metadata properties as a map
    • :startup_delay_ms - Fixed startup delay in milliseconds before producer initialization (default: 1000, matches broker conn_timeout)
    • :startup_jitter_ms - Maximum random startup delay in milliseconds to avoid thundering herd (default: 1000)

The total startup delay is startup_delay_ms + random(0, startup_jitter_ms), applied on every producer start/restart. The default startup_delay_ms matches the broker's conn_timeout to ensure the broker has time to reconnect before producers start requesting topic lookups.

The producer will automatically use service discovery to find the broker. If no name is provided, the broker will assign a unique producer name.

Examples

# Default shared mode
{:ok, producer} = Producer.start_link("persistent://public/default/my-topic")

# With custom name and exclusive mode
{:ok, producer} = Producer.start_link(
  "persistent://public/default/my-topic",
  name: "my-producer",
  access_mode: :Exclusive
)

# With schema
{:ok, producer} = Producer.start_link(
  "persistent://public/default/my-topic",
  schema: [type: :Json, definition: json_schema_def]
)

stop(producer, reason \\ :normal, timeout \\ :infinity)

@spec stop(GenServer.server(), term(), timeout()) :: :ok

Gracefully stops a producer process.