Conduit
A message queue framework, with support for middleware and multiple adapters.
Installation
The package can be installed as:
Add
conduitto your list of dependencies inmix.exs:def deps do [{:conduit, "~> 0.7.0"}] endEnsure
conduitis started before your application:def application do [applications: [:conduit]] end
Getting Started
Somewhere in your application you should define a broker. Typically under lib/my_app/broker.ex or
web/broker.ex if you’re using Phoenix.
defmodule MyApp.Broker do
use Conduit.Broker, otp_app: :my_app
end
Next, you’ll want to supervise MyApp.Broker in your applications supervisor.
# lib/my_app.ex
defmodule MyApp do
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
# ...
supervisor(MyApp.Broker, [])
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
end
Setting Up an Adapter
Next you’ll want to find the adapter that matches the message queue (MQ) you’re using. Currently, there is only an adapter for MQ’s with support for AMQP 0-9-1. More protocols and MQ’s will be supported in the future.
- AMQP 0-9-1 - ConduitAMQP
- STOMP - TODO
- SQS - TODO
- Beanstalkd - TODO
- Kafka - TODO
- ZeroMQ - TODO
Configuring the Broker Topology
MQ’s have queues which need to be setup and may involve other concepts as well, including exchanges and bindings. Conduit attemps to stay out of the way when you need to define these things because each MQ has a different opinion on what you need.
Because of that, you’ll need to looks at the specific adapter for what options are available.
Testing
Example Broker
The Broker is responsible for describing how to setup your message queue routing, defining subscribers, publishers, and pipelines for subscribers and publishers. Here is an example Broker that sets up it’s queue, exchange, and defines a subscribers and publishers.
defmodule MyApp.Broker do
use Conduit.Broker, otp_app: :my_app
configure do
exchange "amq.topic"
queue "my_app.created.user", from: ["#.created.user"]
end
pipeline :in_tracking do
plug Conduit.Plug.CorrelationId
plug Conduit.Plug.LogIncoming
end
pipeline :error_handling do
plug Conduit.Plug.DeadLetter, broker: MyApp.Broker, publish_to: :error
plug Conduit.Plug.Retry, attempts: 5
end
pipeline :deserialize do
plug Conduit.Plug.Decode, content_encoding: "gzip"
plug Conduit.Plug.Parse, content_type: "application/json"
end
pipeline :out_tracking do
plug Conduit.Plug.CorrelationId
plug Conduit.Plug.CreatedBy, app: "my_app"
plug Conduit.Plug.CreatedAt
plug Conduit.Plug.LogOutgoing
end
pipeline :serialize do
plug Conduit.Plug.Format, content_type: "application/json"
plug Conduit.Plug.Encode, content_encoding: "gzip"
end
pipeline :error_destination do
plug :put_destination, &(&1.source <> ".error")
end
incoming MyApp do
pipe_through [:in_tracking, :error_handling, :deserialize]
subscribe :welcome_email, WelcomeEmailSubscriber, from: "my_app.created.user"
subscribe :setup_billing, BillingSubscriber, from: "my_app.created.user"
end
outgoing do
pipe_through [:out_tracking, :serialize]
publish :user_created, to: "my_app.created.user", exchange: "amq.topic"
end
outgoing do
pipe_through [:error_destination, :out_tracking, :serialize]
publish :error, exchange: "amq.topic"
end
end