GenRMQ.Publisher behaviour (gen_rmq v4.0.0) View Source

A behaviour module for implementing the RabbitMQ publisher

Link to this section Summary

Callbacks

Invoked to provide publisher configuration

Functions

Returns a specification to start this module under a supervisor.

Get the number of active consumers on the provided queue. If a nonexistent queue is provided, an error will be raised.

Return whether the provided queue is empty or not. If a nonexistent queue is provided, an error will be raised.

Get the number of messages currently ready for delivery in the provided queue. If a nonexistent queue is provided, an error will be raised.

Drop all message from the provided queue. If a nonexistent queue is provided, an error will be raised.

Starts GenRMQ.Publisher with given callback module linked to the current process

Get the message count and consumer count for a particular queue. If a nonexistent queue is provided, an error will be raised.

Link to this section Callbacks

Specs

init() :: [
  connection: keyword() | {String.t(), String.t()} | :undefined | keyword(),
  exchange: GenRMQ.Binding.exchange(),
  uri: String.t(),
  app_id: atom(),
  enable_confirmations: boolean(),
  max_confirmation_wait_time: integer()
]

Invoked to provide publisher configuration

Return values

Mandatory:

connection - RabbitMQ connection options. Accepts same arguments as AMQP-library's Connection.open/2.

exchange - name, :default or {type, name} of the target exchange. If it does not exist, it will be created. Supported types: :direct, :fanout, :topic

Optional:

uri - RabbitMQ uri. Deprecated. Please use connection.

app_id - publishing application ID. By default it is :gen_rmq.

enable_confirmations - activates publishing confirmations on the channel. Confirmations are disabled by default.

max_confirmation_wait_time - maximum time in milliseconds to wait for a confirmation. By default it is 5_000 (5s).

Examples:

def init() do
  [
    exchange: "gen_rmq_exchange",
    connection: "amqp://guest:guest@localhost:5672",
    uri: "amqp://guest:guest@localhost:5672",
    app_id: :my_app_id,
    enable_confirmations: true,
    max_confirmation_wait_time: 5_000
  ]
end

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

consumer_count(publisher, queue)

View Source

Specs

consumer_count(publisher :: atom() | pid(), queue :: String.t()) ::
  integer() | no_return()

Get the number of active consumers on the provided queue. If a nonexistent queue is provided, an error will be raised.

publisher - name or PID of the publisher

queue - name of the queue

Link to this function

empty?(publisher, queue)

View Source

Specs

empty?(publisher :: atom() | pid(), queue :: String.t()) ::
  boolean() | no_return()

Return whether the provided queue is empty or not. If a nonexistent queue is provided, an error will be raised.

publisher - name or PID of the publisher

queue - name of the queue

Link to this function

message_count(publisher, queue)

View Source

Specs

message_count(publisher :: atom() | pid(), queue :: String.t()) ::
  integer() | no_return()

Get the number of messages currently ready for delivery in the provided queue. If a nonexistent queue is provided, an error will be raised.

publisher - name or PID of the publisher

queue - name of the queue

Link to this function

publish(publisher, message, routing_key \\ "", metadata \\ [])

View Source

Specs

publish(
  publisher :: atom() | pid(),
  message :: String.t(),
  routing_key :: String.t(),
  metadata :: Keyword.t()
) ::
  :ok
  | {:ok, :confirmed}
  | {:error, reason :: :blocked | :closing | :confirmation_timeout}

Publishes given message

publisher - name or PID of the publisher

message - raw payload to deliver

routing_key - optional routing key to set for given message

metadata - optional metadata to set for given message. Keys that are not allowed in metadata are moved under the :headers field. Do not include a :headers field here: it will be created automatically with all non-standard keys that you have provided. For a full list of options see AMQP.Basic.publish/5

Examples:

GenRMQ.Publisher.publish(TestPublisher, "{"msg": "hello"})

Specs

purge(publisher :: atom() | pid(), queue :: String.t()) ::
  {:ok, map()} | AMQP.Basic.error()

Drop all message from the provided queue. If a nonexistent queue is provided, an error will be raised.

publisher - name or PID of the publisher

queue - name of the queue

Link to this function

start_link(module, options \\ [])

View Source

Specs

start_link(module :: module(), options :: Keyword.t()) ::
  {:ok, pid()} | {:error, term()}

Starts GenRMQ.Publisher with given callback module linked to the current process

module- callback module implementing GenRMQ.Publisher behaviour

Options

  • :name - used for name registration

Return values

If the publisher is successfully created and initialized, this function returns {:ok, pid}, where pid is the PID of the publisher. If a process with the specified publisher name already exists, this function returns {:error, {:already_started, pid}} with the PID of that process.

Examples:

GenRMQ.Publisher.start_link(TestPublisher, name: :publisher)
Link to this function

status(publisher, queue)

View Source

Specs

status(publisher :: atom() | pid(), queue :: String.t()) ::
  {:ok, map()} | AMQP.Basic.error()

Get the message count and consumer count for a particular queue. If a nonexistent queue is provided, an error will be raised.

publisher - name or PID of the publisher

queue - name of the queue