A GenStage Producer that bridges any PhoenixMicro.Transport into a
Broadway pipeline.
This module implements the Broadway.Producer behaviour and sits at the
head of every PhoenixMicro.Pipeline. It:
- Subscribes to the configured transport topic on
init. - Buffers incoming
PhoenixMicro.Messagestructs in an internal queue. - Drains the queue on demand — Broadway's processors pull messages, so the producer never pushes faster than downstream can handle (true backpressure).
- Emits Telemetry demand/buffer events for operational visibility.
- Re-subscribes automatically if the transport subscription is lost.
How demand works
Broadway calls handle_demand/2 whenever a processor slot becomes free.
BroadwayProducer dispatches up to demand buffered messages as
Broadway.Message structs. If the buffer is empty it stores the pending
demand and dispatches messages as they arrive via handle_info/2.
This creates the classic GenStage pull model:
Transport broker
│ (push — unbounded)
▼
BroadwayProducer buffer ←── demand ── Broadway processors
│ (pull — demand-driven)
▼
Broadway processorsThe buffer size is capped by :max_demand (default 1000) to prevent
unbounded memory growth. When the buffer is full, incoming messages are
nacked so the broker can redeliver them later.