HareMq.Worker.Consumer (hare_mq v1.4.0)

Copy Markdown

Internal GenServer that manages a single AMQP channel and processes messages.

Not intended to be used directly — instantiated by HareMq.Consumer and HareMq.DynamicConsumer macros.

Lifecycle

  1. On init/1 sends itself {:connect, opts}.
  2. Opens a channel on the active HareMq.Connection.
  3. Calls declare_queues/1 — for stream queues only x-queue-type: stream is declared; standard queues get the full exchange + delay + dead-letter topology.
  4. Calls Basic.consume/3 — stream consumers pass x-stream-offset in the arguments; standard consumers use no extra arguments.
  5. On graceful cancel or channel/connection drop the GenServer stops and is restarted by its supervisor.

Stream behaviour

When config[:stream] is true:

  • Only the main queue is declared (x-queue-type: stream).
  • Basic.consume/3 includes the x-stream-offset argument derived from config[:stream_offset] (string, integer, or %DateTime{}).
  • process_result/5 always acks regardless of the consume_fn return value — stream logs are immutable so nack/retry is meaningless.

Summary

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

declare_queues(config)

get_channel(consumer_name)

init(opts)

Callback implementation for GenServer.init/1.

republish_dead_messages(consumer_name, count \\ 1)

start_link(opts)