PhiAccrualAmqp. Consumer
(phi_accrual_amqp v0.1.0)
View Source
AMQP consumer that feeds broker deliveries into the PhiAccrual
core detector.
Opens an AMQP connection, opens a channel, subscribes to one
configured queue, and on every delivery calls
PhiAccrual.observe(detector_key, receipt_ts) where receipt_ts
comes from :erlang.monotonic_time(:millisecond) at the instant
the delivery message lands in this GenServer's mailbox. The
detector key is extracted from the envelope by
PhiAccrualAmqp.Envelope.extract/2.
Clock discipline (read this)
phi_accrual's estimator works on local monotonic time only.
The publisher's BasicProperties.timestamp and any broker-stamped
header (e.g., from the rabbitmq_message_timestamp plugin) are
cross-process wall clocks; using them to feed the EWMA breaks the
detector. This module passes them through as diagnostic telemetry
metadata but never as the value handed to observe/2.
Liveness caveat (read this too)
In AMQP, "delivery received" proves three things are alive in
combination: publisher, broker, and the network paths between
them and you. A high phi value does NOT pin the fault on the
publisher. If you need publisher-only liveness, choose a transport
with no intermediary (e.g., phi_accrual_udp).
Mapping deliveries to detector keys
See PhiAccrualAmqp.Envelope for the resolver contract. The
default extracts meta.routing_key. For static N-queues-per-node
topologies pass a constant resolver per consumer:
Consumer.start_link(
queue: "heartbeats.node_a",
key_resolver: fn _meta -> :node_a end
)Connection lifecycle
The consumer manages its own connection, channel, and subscription.
On startup it schedules an async :connect so the supervisor can
come up before the broker is reachable. On any failure — broker
unreachable, channel error, server-initiated basic.cancel,
connection or channel process death — the consumer tears down what
it has and reconnects with exponential backoff between
:reconnect_min_ms and :reconnect_max_ms.
This deliberately differs from PhiAccrualUdp.Listener's fail-fast
socket open: an AMQP connection is a remote-broker contract that
can blip during normal operation, while a UDP socket open is a
local syscall that essentially never fails after success.
Telemetry
[:phi_accrual_amqp, :connection, :up]
measurements: %{}
metadata: %{queue}
[:phi_accrual_amqp, :connection, :down]
measurements: %{}
metadata: %{queue, reason}
[:phi_accrual_amqp, :consumer, :registered]
measurements: %{}
metadata: %{queue, consumer_tag}
[:phi_accrual_amqp, :consumer, :cancelled]
measurements: %{}
metadata: %{queue, consumer_tag, reason}
[:phi_accrual_amqp, :sample, :received]
measurements: %{}
metadata: %{detector_key, envelope_timestamp, routing_key, exchange, queue}
# envelope_timestamp may be nil if the publisher didn't set one;
# it is NEVER what gets passed to PhiAccrual.observe/2.
[:phi_accrual_amqp, :extract, :error]
measurements: %{}
metadata: %{reason, routing_key, exchange, queue}
# reason ∈ [:no_detector_key, :resolver_raised]The :sample, :received event name is shared with phi_accrual_udp, but
the payload shape differs (identity key detector_key vs node; timestamp
in metadata vs measurements). Handlers are not cross-transport drop-in.
Summary
Functions
Returns a specification to start this module under a supervisor.
Types
@type opts() :: [ queue: String.t(), url: String.t(), connection_opts: keyword() | String.t(), key_resolver: PhiAccrualAmqp.Envelope.resolver(), reconnect_min_ms: pos_integer(), reconnect_max_ms: pos_integer(), connect: boolean(), name: GenServer.name() ]
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec start_link(opts()) :: GenServer.on_start()