Pulsar.ProducerGroup (Pulsar v2.8.11)

Copy Markdown View Source

A supervisor that manages a group of producer processes for a single topic.

Examples

# Start a producer group with default settings (1 producer)
{:ok, group_pid} = ProducerGroup.start_link(
  "my-topic-producer",
  "persistent://public/default/my-topic"
)

# Start a producer group with 3 producers
{:ok, group_pid} = ProducerGroup.start_link(
  "my-topic-producer",
  "persistent://public/default/my-topic",
  producer_count: 3
)

# Start with batching enabled
{:ok, group_pid} = ProducerGroup.start_link(
  "my-topic-producer",
  "persistent://public/default/my-topic",
  batch_enabled: true,
  batch_size: 100,
  flush_interval: 10
)

# Get all producer PIDs from the group
producer_pids = ProducerGroup.get_producers(group_pid)

Summary

Functions

Returns a specification to start this module under a supervisor.

Gets all producer process PIDs managed by this producer group.

Sends a message through a producer in this group.

Starts a producer group supervisor.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_producers(supervisor_pid)

Gets all producer process PIDs managed by this producer group.

Returns a list of producer PIDs that are currently alive. Filters out producers that are restarting or undefined.

send_message(group_pid, message, opts \\ [])

@spec send_message(pid(), binary(), keyword()) :: {:ok, map()} | {:error, term()}

Sends a message through a producer in this group.

Parameters

  • group_pid - The producer group supervisor PID
  • message - Binary message payload
  • opts - Optional parameters:
    • :timeout - Timeout in milliseconds (default: 5000)
    • :partition_key - Partition routing key (string)
    • :ordering_key - Key for ordering in Key_Shared subscriptions (binary)
    • :properties - Custom message metadata as a map
    • :event_time - Application event timestamp (DateTime or milliseconds)
    • :deliver_at_time - Absolute delayed delivery time (DateTime or milliseconds)
    • :deliver_after - Relative delayed delivery in milliseconds from now

Returns {:error, :no_producers_available} if all producers in the group are dead or restarting. Returns {:error, :producer_died} if the producer crashes during the send operation.

start_link(name, topic, opts \\ [])

Starts a producer group supervisor.

Parameters

  • name - Unique name for this producer group
  • topic - The topic to publish to
  • opts - Additional options:
    • :producer_count - Number of producer processes in this group (default: 1)
    • :access_mode - Producer access mode (default: :Shared)
    • :batch_enabled - Enable batching (default: false)
    • :batch_size - Max messages per batch (default: 100)
    • :flush_interval - Flush interval in ms (default: 10)
    • Other options passed to individual producer processes

Returns

{:ok, pid} - The producer group supervisor PID {:error, reason} - Error if the supervisor failed to start

stop(supervisor_pid, reason \\ :normal, timeout \\ :infinity)