# `Pulsar.Producer`
[🔗](https://github.com/efcasado/pulsar-elixir/blob/main/lib/pulsar/producer.ex#L1)

Pulsar producer process that communicates with broker processes.

This producer uses service discovery to find the appropriate broker
for the topic and then communicates with that broker process.

Initialization follows a multi-phase pattern using `{:continue, ...}` to
avoid blocking the caller during broker discovery and registration.

# `t`

```elixir
@type t() :: %Pulsar.Producer{
  access_mode: atom(),
  batch: [{map(), GenServer.from()}],
  batch_enabled: boolean(),
  batch_flush_timer: reference() | nil,
  batch_size: non_neg_integer(),
  batch_size_threshold: non_neg_integer(),
  broker_monitor: reference(),
  broker_pid: pid(),
  chunking_enabled: boolean(),
  client: term(),
  compression: :NONE | :LZ4 | :ZLIB | :SNAPPY | :ZSTD,
  flush_interval: non_neg_integer(),
  max_message_size: non_neg_integer(),
  pending_sends: %{required(integer()) =&gt; {GenServer.from(), map()}},
  producer_id: integer(),
  producer_name: String.t() | nil,
  ready: boolean() | nil,
  registration_request_id: integer() | nil,
  schema: Pulsar.Schema.t() | nil,
  schema_version: binary() | nil,
  sequence_id: integer(),
  topic: String.t(),
  topic_epoch: integer() | nil
}
```

# `child_spec`

Returns a specification to start this module under a supervisor.

See `Supervisor`.

# `send_message`

```elixir
@spec send_message(pid(), binary(), keyword()) :: {:ok, map()} | {:error, term()}
```

Sends a message through this producer.
Waits for acknowledgment from the broker.
Returns `{:ok, message_id}` on success or `{:error, reason}` on failure.

## Parameters

- `producer_pid` - The producer process PID
- `message` - Binary message payload
- `opts` - Optional parameters:
  - `:timeout` - Timeout in milliseconds (default: 5000)
  - `:partition_key` - Partition routing key (string)
  - `:ordering_key` - Key for ordering in Key_Shared subscriptions (binary)
  - `:properties` - Custom message metadata as a map (e.g., `%{"trace_id" => "abc"}`)
  - `:event_time` - Application event timestamp (DateTime or milliseconds since epoch)
  - `:deliver_at_time` - Absolute delayed delivery time (DateTime or milliseconds since epoch)
  - `:deliver_after` - Relative delayed delivery in milliseconds from now

# `start_link`

Starts a producer process.

## Parameters

- `topic` - The topic to publish to
- `opts` - Additional options:
  - `:name` - Producer name (optional, will be auto-generated if not provided)
  - `:access_mode` - Producer access mode (default: `:Shared`). Available modes:
    - `:Shared` - Multiple producers can publish on the topic (default)
    - `:Exclusive` - Only one producer can publish. If another producer tries to connect,
      it will receive an error immediately. The old producer is evicted if it experiences
      a network partition with the broker.
    - `:WaitForExclusive` - If there is already a producer, wait until exclusive access is granted
    - `:ExclusiveWithFencing` - If there is already a producer, it will be removed (fenced out)
  - `:compression` - Compression algorithm (default: :NONE)
  - `:chunking_enabled` - Enable message chunking for large messages (default: false)
  - `:max_message_size` - Maximum size of each chunk in bytes when chunking is enabled (default: 5_242_880, which is 5MB)
  - `:schema` - Schema configuration as keyword list (optional):
    - `:type` - (required) Schema type (e.g., :Json, :String, :Avro)
    - `:definition` - Schema definition (required for non-primitive types like Json, Avro)
    - `:name` - Optional schema name
    - `:properties` - Optional metadata properties as a map
  - `:startup_delay_ms` - Fixed startup delay in milliseconds before producer initialization (default: 1000, matches broker conn_timeout)
  - `:startup_jitter_ms` - Maximum random startup delay in milliseconds to avoid thundering herd (default: 1000)

The total startup delay is `startup_delay_ms + random(0, startup_jitter_ms)`, applied on every producer start/restart.
The default `startup_delay_ms` matches the broker's `conn_timeout` to ensure the broker has time to reconnect
before producers start requesting topic lookups.

The producer will automatically use service discovery to find the broker.
If no name is provided, the broker will assign a unique producer name.

## Examples

    # Default shared mode
    {:ok, producer} = Producer.start_link("persistent://public/default/my-topic")

    # With custom name and exclusive mode
    {:ok, producer} = Producer.start_link(
      "persistent://public/default/my-topic",
      name: "my-producer",
      access_mode: :Exclusive
    )

    # With schema
    {:ok, producer} = Producer.start_link(
      "persistent://public/default/my-topic",
      schema: [type: :Json, definition: json_schema_def]
    )

# `stop`

```elixir
@spec stop(GenServer.server(), term(), timeout()) :: :ok
```

Gracefully stops a producer process.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
