Carotte 🥕
A type-safe RabbitMQ client for Gleam that provides a clean, idiomatic interface for message queue operations on the Erlang VM.
Features
- Type-safe API - Leverage Gleam’s type system for safe message handling
- High Performance - Built on top of the battle-tested
amqp_clientErlang library - Idiomatic Gleam - Clean, functional API with everything in a single module
- Complete Feature Set - Support for exchanges, queues, publishing, consuming, and more
- Supervised Consumers - OTP-based consumer supervision with automatic restarts
- Connection Helpers - Built-in reconnection support and connection monitoring
- Async Operations - Non-blocking operations with
_asyncvariants - Flexible Message Acknowledgment - Manual acknowledgment support for reliable message processing
- Full Headers Support - Send and receive message headers with type-safe accessors
- Operation-Specific Error Types - Granular error types for precise error handling
Installation
gleam add carotte
Quick Start
import carotte
import gleam/erlang/process
import gleam/io
pub fn main() {
// Connect to RabbitMQ
let assert Ok(client) =
carotte.ClientConfig(
..carotte.default_client(),
host: "localhost",
port: 5672,
)
|> carotte.start()
// Open a channel
let assert Ok(ch) = carotte.open_channel(client)
// Declare an exchange
let assert Ok(_) =
carotte.Exchange(..carotte.exchange("my_exchange"), exchange_type: carotte.Direct)
|> carotte.declare_exchange(ch)
// Declare a durable queue
let assert Ok(_) =
carotte.QueueConfig(..carotte.queue("my_queue"), durable: True)
|> carotte.declare_queue(ch)
// Bind queue to exchange
let assert Ok(_) =
carotte.bind_queue(
channel: ch,
queue: "my_queue",
exchange: "my_exchange",
routing_key: "my_routing_key",
)
// Publish a message (payload is BitArray)
let assert Ok(_) =
carotte.publish(
channel: ch,
exchange: "my_exchange",
routing_key: "my_routing_key",
payload: <<"Hello, RabbitMQ!">>,
options: [],
)
// Start a consumer supervisor
let consumers = process.new_name("consumers")
let assert Ok(consumer) = carotte.start_consumer(consumers)
// Subscribe to messages (supervised) - returns consumer_tag string
let assert Ok(consumer_tag) =
carotte.subscribe(
consumer,
channel: ch,
queue: "my_queue",
callback: fn(msg, _deliver) {
// msg.payload is BitArray - convert to string if needed
let assert Ok(text) = bit_array.to_string(msg.payload)
io.println("Received: " <> text)
// Messages are auto-acknowledged by default
},
)
// Clean up
let assert Ok(_) = carotte.unsubscribe(channel: ch, consumer_tag:)
let assert Ok(_) = carotte.close(client)
}
Core Concepts
Connection Management
Create and configure a RabbitMQ connection:
import gleam/time/duration
let assert Ok(client) =
carotte.ClientConfig(
..carotte.default_client(),
username: "admin",
password: "secret",
host: "rabbitmq.example.com",
virtual_host: "/production",
heartbeat: duration.seconds(30),
connection_timeout: duration.seconds(60),
)
|> carotte.start()
// Check connection status
assert carotte.is_connected(client) == True
// Reconnect if needed
case carotte.is_connected(client) {
True -> client
False -> {
let assert Ok(new_client) = carotte.reconnect(client)
new_client
}
}
Exchanges
Carotte supports all RabbitMQ exchange types:
// Create a durable topic exchange
carotte.Exchange(
..carotte.exchange("logs"),
exchange_type: carotte.Topic,
durable: True,
)
|> carotte.declare_exchange(channel)
// Available exchange types:
// - Direct: Route based on exact routing key match
// - Topic: Route based on routing key patterns
// - Fanout: Route to all bound queues
// - Headers: Route based on message headers
Queues
Declare and configure queues using record update syntax:
carotte.QueueConfig(
..carotte.queue("task_queue"),
durable: True, // Survive broker restart
exclusive: True, // Only one consumer allowed
auto_delete: True, // Delete when last consumer disconnects
)
|> carotte.declare_queue(channel)
Publishing Messages
Publish messages with various options. The payload is a BitArray, which allows sending any binary data:
import gleam/bit_array
import gleam/time/duration
// For text/JSON, convert string to BitArray
let json_payload = bit_array.from_string(json.to_string(user_data))
carotte.publish(
channel: ch,
exchange: "notifications",
routing_key: "user.signup",
payload: json_payload,
options: [
carotte.Persistent(True),
carotte.ContentType("application/json"),
carotte.MessageHeaders(
carotte.headers_from_list([
#("user_id", carotte.StringHeader("123")),
#("retry_count", carotte.IntHeader(0)),
])
),
carotte.Expiration(duration.seconds(60)), // Message expires in 60 seconds
]
)
Supervised Consumers
Carotte integrates with gleam_otp for proper OTP supervision of consumers. The recommended approach is to add the consumer supervisor to your application’s supervision tree using consumer_supervised:
import gleam/erlang/process
import gleam/otp/static_supervisor
pub fn start_app() {
// Create a name for the consumer supervisor at program startup
let consumers_name = process.new_name("consumers")
// Create the child specification
let consumer_spec = carotte.consumer_supervised(consumers_name)
// Add to your application's supervision tree
let assert Ok(_) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(consumer_spec)
|> static_supervisor.start()
// Later, get the consumer reference to subscribe
let consumer = carotte.named_consumer(consumers_name)
// Subscribe to queues (consumers are supervised) - returns consumer_tag
let assert Ok(consumer_tag) =
carotte.subscribe(
consumer,
channel: ch,
queue: "work_queue",
callback: fn(payload, deliver) {
// payload.payload is BitArray - convert to string for text messages
let assert Ok(text) = bit_array.to_string(payload.payload)
io.println("Processing: " <> text)
io.println("Exchange: " <> deliver.exchange)
io.println("Routing key: " <> deliver.routing_key)
// If callback crashes, consumer will be restarted by supervisor
}
)
}
Standalone mode (for simpler use cases without a supervision tree):
// Start supervisor directly (linked to calling process)
let consumers = process.new_name("consumers")
let assert Ok(consumer) = carotte.start_consumer(consumers)
let assert Ok(consumer_tag) = carotte.subscribe(consumer, channel: ch, queue: "my_queue", callback: handler)
Manual Acknowledgment
For more control over message acknowledgment:
let assert Ok(consumer_tag) =
carotte.subscribe_with_options(
consumer,
channel: ch,
queue: "work_queue",
callback: fn(msg, deliver) {
// Process the message
case process_message(msg) {
Ok(_) -> {
// Acknowledge on success
let assert Ok(_) = carotte.ack_single(ch, deliver.delivery_tag)
}
Error(_) -> {
// Don't ack - message will be redelivered
}
}
Nil
},
options: [carotte.AutoAck(False)],
)
// Acknowledge multiple messages at once
let assert Ok(_) = carotte.ack(ch, deliver.delivery_tag, True)
Message Headers
Carotte supports reading and writing message headers. Headers can contain various types of values:
// Available header types
carotte.BoolHeader(True)
carotte.IntHeader(42)
carotte.FloatHeader(3.14)
carotte.StringHeader("hello")
carotte.ListHeader([carotte.IntHeader(1), carotte.IntHeader(2)])
Sending headers:
carotte.publish(
channel: ch,
exchange: "my_exchange",
routing_key: "my_key",
payload: <<"Hello!">>,
options: [
carotte.MessageHeaders(
carotte.headers_from_list([
#("user_id", carotte.StringHeader("123")),
#("priority", carotte.IntHeader(1)),
])
),
],
)
Reading headers from received messages:
carotte.subscribe(
consumer,
channel: ch,
queue: "my_queue",
callback: fn(payload, _deliver) {
// Convert headers to a list of name-value pairs
let headers = carotte.headers_to_list(payload.headers)
// Find a specific header
let user_id = list.find(headers, fn(h) { h.0 == "user_id" })
case user_id {
Ok(#(_, carotte.StringHeader(id))) -> io.println("User: " <> id)
_ -> io.println("No user_id header found")
}
},
)
Error Handling
Carotte provides operation-specific error types for precise error handling. Each operation category has its own error type, making it easy to handle errors appropriately.
Error Types
| Error Type | Used By | Variants |
|---|---|---|
ConnectionError | start, close, reconnect | ConnectionBlocked, ConnectionClosed, ConnectionAuthFailure, ConnectionRefused, ConnectionTimeout, NotConnected, AlreadyConnected, ReconnectionFailed, ConnectionUnknownError |
ChannelError | open_channel | ChannelClosed, ChannelProcessNotFound, ChannelConnectionClosed, ChannelUnknownError |
ExchangeError | declare_exchange, delete_exchange, bind_exchange, unbind_exchange | ExchangeNotFound, ExchangeAccessRefused, ExchangePreconditionFailed, ExchangeChannelClosed, ExchangeUnknownError |
QueueError | declare_queue, delete_queue, bind_queue, unbind_queue, purge_queue, queue_status | QueueNotFound, QueueAccessRefused, QueuePreconditionFailed, QueueResourceLocked, QueueChannelClosed, QueueUnknownError |
PublishError | publish | PublishNoRoute, PublishChannelClosed, PublishUnknownError |
ConsumeError | subscribe, unsubscribe, ack | ConsumeInitTimeout, ConsumeInitFailed, ConsumeProcessNotFound, ConsumeChannelClosed, ConsumeUnknownError |
Handling Errors
// Connection errors
case carotte.start(client_config) {
Ok(client) -> process_messages(client)
Error(carotte.ConnectionAuthFailure(msg)) -> {
io.println("Authentication failed: " <> msg)
}
Error(carotte.ConnectionTimeout(msg)) -> {
io.println("Connection timeout: " <> msg)
}
Error(other) -> {
io.println("Connection error: " <> carotte.describe_connection_error(other))
}
}
// Queue errors
case carotte.declare_queue(my_queue, channel) {
Ok(queue) -> use_queue(queue)
Error(carotte.QueueAccessRefused(msg)) -> {
io.println("Access refused: " <> msg)
}
Error(carotte.QueuePreconditionFailed(msg)) -> {
io.println("Queue configuration mismatch: " <> msg)
}
Error(other) -> {
io.println("Queue error: " <> carotte.describe_queue_error(other))
}
}
// Publish errors
case carotte.publish(channel:, exchange:, routing_key:, payload:, options: [carotte.Mandatory(True)]) {
Ok(_) -> io.println("Message published")
Error(carotte.PublishNoRoute(msg)) -> {
io.println("No route for message: " <> msg)
}
Error(other) -> {
io.println("Publish error: " <> carotte.describe_publish_error(other))
}
}
Error Description Functions
Each error type has a corresponding describe_*_error function that converts the error to a human-readable string:
carotte.describe_connection_error(err) // ConnectionError -> String
carotte.describe_channel_error(err) // ChannelError -> String
carotte.describe_exchange_error(err) // ExchangeError -> String
carotte.describe_queue_error(err) // QueueError -> String
carotte.describe_publish_error(err) // PublishError -> String
carotte.describe_consume_error(err) // ConsumeError -> String
Advanced Features
Asynchronous Operations
Most operations have async variants for non-blocking execution:
// Async queue declaration
carotte.declare_queue_async(my_queue, channel)
// Async exchange deletion
carotte.delete_exchange_async(channel:, exchange: "old_exchange", if_unused: True)
// Async queue binding
carotte.bind_queue_async(
channel:,
queue: "my_queue",
exchange: "my_exchange",
routing_key: "key"
)
Queue Management
Perform administrative operations on queues:
// Get queue status
let assert Ok(status) = carotte.queue_status(channel:, queue: "my_queue")
io.println("Messages: " <> int.to_string(status.message_count))
io.println("Consumers: " <> int.to_string(status.consumer_count))
// Purge all messages from a queue
let assert Ok(message_count) = carotte.purge_queue(channel:, queue: "my_queue")
// Delete a queue
let assert Ok(_) = carotte.delete_queue(
channel:,
queue: "my_queue",
if_unused: True, // Only delete if no consumers
if_empty: True // Only delete if empty
)
Exchange Bindings
Create complex routing topologies:
// Bind exchange to exchange
carotte.bind_exchange(
channel:,
source: "raw_logs",
destination: "processed_logs",
routing_key: "*.error"
)
// Unbind when no longer needed
carotte.unbind_exchange(
channel:,
source: "raw_logs",
destination: "processed_logs",
routing_key: "*.error"
)
Development
# Run tests (requires local RabbitMQ on localhost:5672)
gleam test
# Build documentation
gleam docs build
# Format code
gleam format
Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
- Built on top of the robust amqp_client Erlang library
- Inspired by RabbitMQ clients in other languages
- Thanks to the Gleam community for their support and feedback