Plug.AMQP.ConsumerProducer (PlugAMQP v0.6.0) View Source
A consumer-producer for RPC over AMQP protocol.
This module provides a consumer-producer for managing remote procedure calls
over AMQP. A consumer-producer creates at least one connection to an AMQP
broker. Request are received by consuming from the given queue. Responses are
routed based on the reply_to
message header.
The response properties mirrors the properties of the request. For example, if a request is sent using persistence, the response will be sent using persistence too.
Usage
To use this consumer-producer we must provide a request_handler/0
function. The consumer-producer will call this function to manage requests.
Handlers can use send_resp/3
to return a response of the request producer
(client). Check options/0
to get more info about all the options
available.
Example
Note that we avoid creating a response queue here using the RabbitMQ Direct Reply-To extension.
# Setting Up A Client
{:ok, conn} = AMQP.Connection.open()
{:ok, chan} = AMQP.Channel.open(conn)
{:ok, _info} = AMQP.Queue.declare(chan, "plug_amqp.consumer_producer.requests")
{:ok, _ctag} = AMQP.Basic.consume(chan, "amq.rabbitmq.reply-to", nil, no_ack: true)
# Setting Up Consumer-Producer Server
{:ok, _server} =
Plug.AMQP.ConsumerProducer.start_link(
consumer_queue: "plug_amqp.consumer_producer.requests",
request_handler: fn server, request, _headers ->
number = String.to_integer(request)
Plug.AMQP.ConsumerProducer.send_resp(server, to_string(number + 1))
:ok
end
)
# Sending A Request
IO.puts("Sending a request with payload \"1\"")
AMQP.Basic.publish(chan, "", "plug_amqp.consumer_producer.requests", "1",
reply_to: "amq.rabbitmq.reply-to"
)
# Receiving A Response
receive do
{:basic_deliver, response, _meta} ->
IO.puts("Received a response with payload \"#{response}\"")
end
Link to this section Summary
Types
A header of a request or a response
A list of headers
A set of options available to configure the AMQP Consumer-Producer.
The payload of a request or a response.
A handler for managing requests.
A response of a request.
Functions
Returns a specification to start this module under a supervisor.
Sends a response from a request handler
Starts a new AMQP consumer-producer.
Link to this section Types
Specs
A header of a request or a response
Specs
headers() :: [header()]
A list of headers
Specs
option() :: {:backoff, Backoff.options()} | {:connection_name, String.t()} | {:connection_options, keyword() | String.t()} | {:consumer_qos_options, keyword()} | {:consumer_queue, String.t()} | {:request_handler, request_handler()} | {:request_handler_supervisor, Supervisor.supervisor()}
A set of options available to configure the AMQP Consumer-Producer.
Note that consumer_queue
option is required.
backoff
: how the backoff delay should behave. More info atBackoff.options/0
.connection_name
: the name of the connections created by the AMQP Consumer-Producer. Note that the Consumer-Producer can create several connections. In that case, the provided connection name will be used as a prefix.connection_options
: configures the broker connection. All available options are available atAMQP.Connection.open/2
.consumer_qos_options
: configures the consumer QoS options. More info atAMQP.Basic.qos/2
.global
option does not apply in this context.consumer_queue
: the name of the queue for consuming requests. Required.request_handler
: the function of the typerequest_handler/0
that will handle request received by the Consumer-Producer.request_handler_supervisor
: an instance of aTask.Supervisor
to supervise handlers. The Consumer-Producer will use this supervisor to run request handlers, managing any kind of error, raise or exit produced by the handler.
Specs
A list of option/0
s.
Specs
payload() :: String.t()
The payload of a request or a response.
Specs
request_handler() :: (pid(), payload(), headers() -> :ok | any()) | {:module, :atom} | {:module, :atom, any()}
A handler for managing requests.
Specs
A response of a request.
Link to this section Functions
Returns a specification to start this module under a supervisor.
See Supervisor
.
Specs
send_resp(GenServer.server(), payload(), headers()) :: :ok
Sends a response from a request handler
Specs
start_link(options()) :: GenServer.on_start()
Starts a new AMQP consumer-producer.