PhoenixMicro.Transport.BroadwayProducer (PhoenixMicro v1.0.0)

Copy Markdown View Source

A GenStage Producer that bridges any PhoenixMicro.Transport into a Broadway pipeline.

This module implements the Broadway.Producer behaviour and sits at the head of every PhoenixMicro.Pipeline. It:

  1. Subscribes to the configured transport topic on init.
  2. Buffers incoming PhoenixMicro.Message structs in an internal queue.
  3. Drains the queue on demand — Broadway's processors pull messages, so the producer never pushes faster than downstream can handle (true backpressure).
  4. Emits Telemetry demand/buffer events for operational visibility.
  5. Re-subscribes automatically if the transport subscription is lost.

How demand works

Broadway calls handle_demand/2 whenever a processor slot becomes free. BroadwayProducer dispatches up to demand buffered messages as Broadway.Message structs. If the buffer is empty it stores the pending demand and dispatches messages as they arrive via handle_info/2.

This creates the classic GenStage pull model:

Transport broker
        (push  unbounded)
      
BroadwayProducer buffer   demand  Broadway processors
        (pull  demand-driven)
      
Broadway processors

The buffer size is capped by :max_demand (default 1000) to prevent unbounded memory growth. When the buffer is full, incoming messages are nacked so the broker can redeliver them later.