rabbit_mq v0.0.2 RabbitMQ.Producer
This module can be used to start and maintain a pool of Producer workers.
Example usage
rabbit_mq allows you to design consistent, SDK-like Producers.
ℹ️ The following example assumes that the "customer" exchange already exists.
First, define your (ideally domain-specific) Producer:
defmodule RabbitSample.CustomerProducer do
@moduledoc """
Publishes pre-configured events onto the "customer" exchange.
"""
use RabbitMQ.Producer, exchange: "customer", worker_count: 3
@doc """
Publishes an event routed via "customer.created".
"""
def customer_created(customer_id) when is_binary(customer_id) do
opts = [
content_type: "application/json",
correlation_id: UUID.uuid4(),
mandatory: true
]
payload = Jason.encode!(%{v: "1.0.0", customer_id: customer_id})
publish(payload, "customer.created", opts)
end
@doc """
Publishes an event routed via "customer.updated".
"""
def customer_updated(updated_customer) when is_map(updated_customer) do
opts = [
content_type: "application/json",
correlation_id: UUID.uuid4(),
mandatory: true
]
payload = Jason.encode!(%{v: "1.0.0", customer_data: updated_customer})
publish(payload, "customer.updated", opts)
end
end
Then, start as normal under your existing supervision tree:
children = [
RabbitSample.Topology,
RabbitSample.CustomerProducer,
RabbitSample.CustomerCreatedConsumer,
RabbitSample.CustomerUpdatedConsumer
]
opts = [strategy: :one_for_one, name: RabbitSample.Supervisor]
Supervisor.start_link(children, opts)
Finally, call the exposed methods from your application:
RabbitSample.CustomerProducer.customer_created(customer_id)
RabbitSample.CustomerProducer.customer_updated(updated_customer)
⚠️ Please note that all Producer workers implement "reliable publishing". Each Producer worker handles its publisher confirms asynchronously, striking a delicate balance between performance and reliability.
To understand why this is important, please refer to the reliable publishing implementation guide.
ℹ️ In the unlikely event of an unexpected Publisher nack,
your server will be notified via the on_unexpected_nack/2 callback,
letting you handle such exceptions in any way you see fit.
Configuration
The following options can be used with RabbitMQ.Producer;
:confirm_type; publisher acknowledgement mode. Only:asyncis supported for now. Please consult the Publisher Confirms section for more details. Defaults to:async.:exchange; the name of the exchange onto which the producer workers will publish. Required.:worker_count; number of workers to be spawned. Cannot be greater than:max_channels_per_connectionset in config. Defaults to3.
When you use RabbitMQ.Consumer, a few things happen;
- The module turns into a
GenServer. - The server starts and supervises the desired number of workers.
publish/3becomes available in your module.
publish/3 is a private function with the following signature;
@type payload :: String.t()
@type routing_key :: String.t()
@type opts :: keyword()
@type result :: :ok | {:error, :correlation_id_missing}
publish(payload(), routing_key(), opts()) :: result()
⚠️ Please note that correlation_id is always required and failing to provide it will result in an exception.
ℹ️ To see which options can be passed as opts to publish/3, visit https://hexdocs.pm/amqp/AMQP.Basic.html#publish/5.
Link to this section Summary
Functions
Returns a specification to start this module under a supervisor.
Starts this module as a process via GenServer.start_link/3.
Invoked when the server is about to exit. It should do any cleanup required. See https://hexdocs.pm/elixir/GenServer.html#c:terminate/2 for more details.
Link to this section Functions
child_spec(init_arg)
Returns a specification to start this module under a supervisor.
See Supervisor.
Starts this module as a process via GenServer.start_link/3.
Only used by the module's child_spec.
terminate(reason, state)
Invoked when the server is about to exit. It should do any cleanup required. See https://hexdocs.pm/elixir/GenServer.html#c:terminate/2 for more details.