A Broadway-backed message-processing pipeline for PhoenixMicro.Consumer.
Why Broadway instead of bare Tasks?
The previous Consumer.Worker dispatched messages using Task.start/1:
a fire-and-forget approach that offers no backpressure. Under load, the
broker can push messages faster than handlers can process them, causing:
- Unbounded process growth (OOM risk).
- No visibility into queue depth.
- All in-flight messages lost on node restart.
Broadway solves this with a demand-driven pull model:
Transport broker
│ push (unbounded)
▼
BroadwayProducer ←── demand ── Processor pool (N workers)
│ │
│ pull (demand-gated) ▼
└─────────────────────── your handle/2Processors only request more messages when they have capacity, creating true end-to-end backpressure from handler back to broker.
Pipeline topology
BroadwayProducer (1 process)
│
▼ (up to :concurrency messages in flight)
Processor pool ←── runs middleware + consumer.handle/2
│
▼ (optional — only when :batch_size > 1)
Batcher pool ←── runs consumer.handle_batch/3 if definedUsage
PhoenixMicro.Pipeline is started automatically for every registered
consumer by ConsumerManager. You do not start it directly.
You configure it via the Consumer DSL:
defmodule MyApp.PaymentsConsumer do
use PhoenixMicro.Consumer
topic "payments.created"
concurrency 10 # processor pool size
batch_size 50 # group messages before handle_batch/3
batch_timeout 1_000 # flush batch after this many ms even if not full
def handle(message, _ctx), do: :ok
# Optional batch handler — only called when batch_size > 1
def handle_batch(_batch_name, messages, _batch_info, _ctx) do
Enum.map(messages, fn msg -> Broadway.Message.ack(msg) end)
end
endTelemetry
In addition to the standard [:phoenix_micro, :message, *] events,
the pipeline emits:
[:phoenix_micro, :pipeline, :demand]— processor demand updates[:phoenix_micro, :pipeline, :enqueued]— message entered buffer[:phoenix_micro, :pipeline, :buffer_full]— buffer at max_demand cap[:broadway, :processor, :message, :start/stop/exception]— Broadway's own spans
Summary
Functions
Returns a child spec suitable for DynamicSupervisor.
Starts a supervised Broadway pipeline for the given consumer module.
Functions
@spec child_spec({module(), keyword()}) :: Supervisor.child_spec()
Returns a child spec suitable for DynamicSupervisor.
@spec start_link( module(), keyword() ) :: GenServer.on_start()
Starts a supervised Broadway pipeline for the given consumer module.
Called automatically by PhoenixMicro.Supervisor.ConsumerManager.