PhoenixMicro.Pipeline (PhoenixMicro v1.0.0)

Copy Markdown View Source

A Broadway-backed message-processing pipeline for PhoenixMicro.Consumer.

Why Broadway instead of bare Tasks?

The previous Consumer.Worker dispatched messages using Task.start/1: a fire-and-forget approach that offers no backpressure. Under load, the broker can push messages faster than handlers can process them, causing:

  • Unbounded process growth (OOM risk).
  • No visibility into queue depth.
  • All in-flight messages lost on node restart.

Broadway solves this with a demand-driven pull model:

Transport broker
         push (unbounded)
      
BroadwayProducer   demand  Processor pool (N workers)
                                    
         pull (demand-gated)        
       your handle/2

Processors only request more messages when they have capacity, creating true end-to-end backpressure from handler back to broker.

Pipeline topology

BroadwayProducer (1 process)
      
        (up to :concurrency messages in flight)
Processor pool   runs middleware + consumer.handle/2
      
        (optional  only when :batch_size > 1)
Batcher pool     runs consumer.handle_batch/3 if defined

Usage

PhoenixMicro.Pipeline is started automatically for every registered consumer by ConsumerManager. You do not start it directly.

You configure it via the Consumer DSL:

defmodule MyApp.PaymentsConsumer do
  use PhoenixMicro.Consumer

  topic       "payments.created"
  concurrency 10        # processor pool size
  batch_size  50        # group messages before handle_batch/3
  batch_timeout 1_000   # flush batch after this many ms even if not full

  def handle(message, _ctx), do: :ok

  # Optional batch handler — only called when batch_size > 1
  def handle_batch(_batch_name, messages, _batch_info, _ctx) do
    Enum.map(messages, fn msg -> Broadway.Message.ack(msg) end)
  end
end

Telemetry

In addition to the standard [:phoenix_micro, :message, *] events, the pipeline emits:

  • [:phoenix_micro, :pipeline, :demand] — processor demand updates
  • [:phoenix_micro, :pipeline, :enqueued] — message entered buffer
  • [:phoenix_micro, :pipeline, :buffer_full] — buffer at max_demand cap
  • [:broadway, :processor, :message, :start/stop/exception] — Broadway's own spans

Summary

Functions

Returns a child spec suitable for DynamicSupervisor.

Starts a supervised Broadway pipeline for the given consumer module.

Functions

child_spec(arg)

@spec child_spec({module(), keyword()}) :: Supervisor.child_spec()

Returns a child spec suitable for DynamicSupervisor.

start_link(consumer_module, opts \\ [])

@spec start_link(
  module(),
  keyword()
) :: GenServer.on_start()

Starts a supervised Broadway pipeline for the given consumer module.

Called automatically by PhoenixMicro.Supervisor.ConsumerManager.