PhiAccrualAmqp.Consumer (phi_accrual_amqp v0.1.0)

View Source

AMQP consumer that feeds broker deliveries into the PhiAccrual core detector.

Opens an AMQP connection, opens a channel, subscribes to one configured queue, and on every delivery calls PhiAccrual.observe(detector_key, receipt_ts) where receipt_ts comes from :erlang.monotonic_time(:millisecond) at the instant the delivery message lands in this GenServer's mailbox. The detector key is extracted from the envelope by PhiAccrualAmqp.Envelope.extract/2.

Clock discipline (read this)

phi_accrual's estimator works on local monotonic time only. The publisher's BasicProperties.timestamp and any broker-stamped header (e.g., from the rabbitmq_message_timestamp plugin) are cross-process wall clocks; using them to feed the EWMA breaks the detector. This module passes them through as diagnostic telemetry metadata but never as the value handed to observe/2.

Liveness caveat (read this too)

In AMQP, "delivery received" proves three things are alive in combination: publisher, broker, and the network paths between them and you. A high phi value does NOT pin the fault on the publisher. If you need publisher-only liveness, choose a transport with no intermediary (e.g., phi_accrual_udp).

Mapping deliveries to detector keys

See PhiAccrualAmqp.Envelope for the resolver contract. The default extracts meta.routing_key. For static N-queues-per-node topologies pass a constant resolver per consumer:

Consumer.start_link(
  queue: "heartbeats.node_a",
  key_resolver: fn _meta -> :node_a end
)

Connection lifecycle

The consumer manages its own connection, channel, and subscription. On startup it schedules an async :connect so the supervisor can come up before the broker is reachable. On any failure — broker unreachable, channel error, server-initiated basic.cancel, connection or channel process death — the consumer tears down what it has and reconnects with exponential backoff between :reconnect_min_ms and :reconnect_max_ms.

This deliberately differs from PhiAccrualUdp.Listener's fail-fast socket open: an AMQP connection is a remote-broker contract that can blip during normal operation, while a UDP socket open is a local syscall that essentially never fails after success.

Telemetry

[:phi_accrual_amqp, :connection, :up]
  measurements: %{}
  metadata:     %{queue}

[:phi_accrual_amqp, :connection, :down]
  measurements: %{}
  metadata:     %{queue, reason}

[:phi_accrual_amqp, :consumer, :registered]
  measurements: %{}
  metadata:     %{queue, consumer_tag}

[:phi_accrual_amqp, :consumer, :cancelled]
  measurements: %{}
  metadata:     %{queue, consumer_tag, reason}

[:phi_accrual_amqp, :sample, :received]
  measurements: %{}
  metadata:     %{detector_key, envelope_timestamp, routing_key, exchange, queue}
  # envelope_timestamp may be nil if the publisher didn't set one;
  # it is NEVER what gets passed to PhiAccrual.observe/2.

[:phi_accrual_amqp, :extract, :error]
  measurements: %{}
  metadata:     %{reason, routing_key, exchange, queue}
  # reason ∈ [:no_detector_key, :resolver_raised]

The :sample, :received event name is shared with phi_accrual_udp, but the payload shape differs (identity key detector_key vs node; timestamp in metadata vs measurements). Handlers are not cross-transport drop-in.

Summary

Functions

Returns a specification to start this module under a supervisor.

Types

opts()

@type opts() :: [
  queue: String.t(),
  url: String.t(),
  connection_opts: keyword() | String.t(),
  key_resolver: PhiAccrualAmqp.Envelope.resolver(),
  reconnect_min_ms: pos_integer(),
  reconnect_max_ms: pos_integer(),
  connect: boolean(),
  name: GenServer.name()
]

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

start_link(opts)

@spec start_link(opts()) :: GenServer.on_start()