Plug.AMQP.ConsumerProducer (PlugAMQP v0.6.0) View Source

A consumer-producer for RPC over AMQP protocol.

This module provides a consumer-producer for managing remote procedure calls over AMQP. A consumer-producer creates at least one connection to an AMQP broker. Request are received by consuming from the given queue. Responses are routed based on the reply_to message header.

The response properties mirrors the properties of the request. For example, if a request is sent using persistence, the response will be sent using persistence too.

Usage

To use this consumer-producer we must provide a request_handler/0 function. The consumer-producer will call this function to manage requests. Handlers can use send_resp/3 to return a response of the request producer (client). Check options/0 to get more info about all the options available.

Example

Note that we avoid creating a response queue here using the RabbitMQ Direct Reply-To extension.

# Setting Up A Client

{:ok, conn} = AMQP.Connection.open()
{:ok, chan} = AMQP.Channel.open(conn)

{:ok, _info} = AMQP.Queue.declare(chan, "plug_amqp.consumer_producer.requests")

{:ok, _ctag} = AMQP.Basic.consume(chan, "amq.rabbitmq.reply-to", nil, no_ack: true)

# Setting Up Consumer-Producer Server

{:ok, _server} =
  Plug.AMQP.ConsumerProducer.start_link(
    consumer_queue: "plug_amqp.consumer_producer.requests",
    request_handler: fn server, request, _headers ->
      number = String.to_integer(request)
      Plug.AMQP.ConsumerProducer.send_resp(server, to_string(number + 1))

      :ok
    end
  )

# Sending A Request

IO.puts("Sending a request with payload \"1\"")

AMQP.Basic.publish(chan, "", "plug_amqp.consumer_producer.requests", "1",
  reply_to: "amq.rabbitmq.reply-to"
)

# Receiving A Response

receive do
  {:basic_deliver, response, _meta} ->
    IO.puts("Received a response with payload \"#{response}\"")
end

Link to this section Summary

Types

A header of a request or a response

A list of headers

A set of options available to configure the AMQP Consumer-Producer.

A list of option/0s.

The payload of a request or a response.

A handler for managing requests.

A response of a request.

Functions

Returns a specification to start this module under a supervisor.

Sends a response from a request handler

Starts a new AMQP consumer-producer.

Link to this section Types

Specs

header() :: {String.t(), binary() | number()}

A header of a request or a response

Specs

headers() :: [header()]

A list of headers

Specs

option() ::
  {:backoff, Backoff.options()}
  | {:connection_name, String.t()}
  | {:connection_options, keyword() | String.t()}
  | {:consumer_qos_options, keyword()}
  | {:consumer_queue, String.t()}
  | {:request_handler, request_handler()}
  | {:request_handler_supervisor, Supervisor.supervisor()}

A set of options available to configure the AMQP Consumer-Producer.

Note that consumer_queue option is required.

  • backoff: how the backoff delay should behave. More info at Backoff.options/0.

  • connection_name: the name of the connections created by the AMQP Consumer-Producer. Note that the Consumer-Producer can create several connections. In that case, the provided connection name will be used as a prefix.

  • connection_options: configures the broker connection. All available options are available at AMQP.Connection.open/2.

  • consumer_qos_options: configures the consumer QoS options. More info at AMQP.Basic.qos/2. global option does not apply in this context.

  • consumer_queue: the name of the queue for consuming requests. Required.

  • request_handler: the function of the type request_handler/0 that will handle request received by the Consumer-Producer.

  • request_handler_supervisor: an instance of a Task.Supervisor to supervise handlers. The Consumer-Producer will use this supervisor to run request handlers, managing any kind of error, raise or exit produced by the handler.

Specs

options() :: [option() | {atom(), any()}]

A list of option/0s.

Specs

payload() :: String.t()

The payload of a request or a response.

Specs

request_handler() ::
  (pid(), payload(), headers() -> :ok | any())
  | {:module, :atom}
  | {:module, :atom, any()}

A handler for managing requests.

Specs

response() :: {:ok, payload(), headers()} | {:error, any()}

A response of a request.

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

send_resp(server, payload, headers \\ [])

View Source

Specs

send_resp(GenServer.server(), payload(), headers()) :: :ok

Sends a response from a request handler

Specs

start_link(options()) :: GenServer.on_start()

Starts a new AMQP consumer-producer.