carotte
Carotte 🥕
A type-safe RabbitMQ client for Gleam that provides a clean, idiomatic interface for message queue operations on the Erlang VM.
Quick Start
import carotte
import gleam/bit_array
import gleam/erlang/process
import gleam/io
pub fn main() {
// Connect to RabbitMQ
let assert Ok(client) = carotte.start(carotte.default_client())
let assert Ok(ch) = carotte.open_channel(client)
// Declare exchange and queue
let assert Ok(_) = carotte.declare_exchange(carotte.exchange("my_exchange"), ch)
let assert Ok(_) = carotte.declare_queue(carotte.default_queue("my_queue"), ch)
let assert Ok(_) = carotte.bind_queue(channel: ch, queue: "my_queue", exchange: "my_exchange", routing_key: "")
// Start consumer supervisor and subscribe
let consumers = process.new_name("consumers")
let assert Ok(consumer) = carotte.start_consumer(consumers)
let assert Ok(_) = carotte.subscribe(consumer, channel: ch, queue: "my_queue", callback: fn(msg, _) {
let assert Ok(text) = bit_array.to_string(msg.payload)
io.println("Received: " <> text)
})
// Publish a message (payload is BitArray)
let assert Ok(_) = carotte.publish(channel: ch, exchange: "my_exchange", routing_key: "", payload: <<"Hello!">>, options: [])
}
Features
- Type-safe API: Leverage Gleam’s type system for safe message handling
- OTP Supervision: Integrate consumers into your application’s supervision tree
via
consumer_supervised, or use standalone mode withstart_consumer - Operation-Specific Errors: Granular error types (
ConnectionError,ChannelError,ExchangeError,QueueError,PublishError,ConsumeError) for precise error handling - Async Operations: Non-blocking variants with
_asyncsuffix - Full Headers Support: Type-safe message headers with
HeaderValuetypes - Connection Helpers: Built-in reconnection support and connection monitoring
OTP Supervision
For production use, integrate consumers into your supervision tree:
import gleam/erlang/process
import gleam/otp/static_supervisor
let consumers_name = process.new_name("consumers")
let spec = carotte.consumer_supervised(consumers_name)
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(spec)
|> static_supervisor.start()
let consumer = carotte.named_consumer(consumers_name)
carotte.subscribe(consumer, channel: ch, queue: "my_queue", callback: handler)
Error Handling
Each operation category has its own error type:
| Error Type | Operations |
|---|---|
ConnectionError | start, close, reconnect |
ChannelError | open_channel |
ExchangeError | declare_exchange, delete_exchange, bind_exchange, unbind_exchange |
QueueError | declare_queue, delete_queue, bind_queue, unbind_queue, purge_queue, queue_status |
PublishError | publish |
ConsumeError | subscribe, unsubscribe, ack |
Use describe_*_error functions to convert errors to human-readable strings.
Types
Represents an AMQP channel within a connection. Channels are lightweight connections that share a single TCP connection. Most AMQP operations are performed on channels.
pub type Channel
Errors that can occur when opening or using channels.
Returned by open_channel.
pub type ChannelError {
ChannelClosed(String)
ChannelProcessNotFound
ChannelConnectionClosed
ChannelUnknownError(String)
}
Constructors
-
ChannelClosed(String)The channel has been closed
-
ChannelProcessNotFoundThe channel process could not be found
-
ChannelConnectionClosedThe connection is closed, cannot open channel
-
ChannelUnknownError(String)An unknown channel error occurred
Represents an active connection to a RabbitMQ server.
This is an opaque type that encapsulates the underlying AMQP client process.
Use the builder pattern with default_client() and start() to create a client.
pub opaque type Client
Configuration builder for creating a RabbitMQ client.
Use default_client() to create a builder with sensible defaults,
then chain the with_* functions to customize the configuration.
pub type ClientConfig {
ClientConfig(
username: String,
password: String,
virtual_host: String,
host: String,
port: Int,
channel_max: Int,
frame_max: Int,
heartbeat: duration.Duration,
connection_timeout: duration.Duration,
)
}
Constructors
-
ClientConfig( username: String, password: String, virtual_host: String, host: String, port: Int, channel_max: Int, frame_max: Int, heartbeat: duration.Duration, connection_timeout: duration.Duration, )Arguments
- heartbeat
-
Heartbeat interval for the connection. The minimum duration is one second.
- connection_timeout
-
Timeout for establishing a connection.
Errors that can occur when establishing or managing RabbitMQ connections.
Returned by start, close, and reconnect.
pub type ConnectionError {
ConnectionBlocked
ConnectionClosed
ConnectionAuthFailure(String)
ConnectionRefused(String)
ConnectionTimeout(String)
NotConnected
ReconnectionFailed(ConnectionError)
AlreadyConnected
ConnectionUnknownError(String)
}
Constructors
-
ConnectionBlockedThe connection is blocked by the server due to resource constraints
-
ConnectionClosedThe connection has been closed
-
ConnectionAuthFailure(String)Authentication failed with the provided credentials
-
ConnectionRefused(String)Connection to the server was refused
-
ConnectionTimeout(String)Connection attempt timed out
-
NotConnectedConnection is not currently active
-
ReconnectionFailed(ConnectionError)Reconnection failed with the underlying cause
-
AlreadyConnectedConnection is already established
-
ConnectionUnknownError(String)An unknown connection error occurred
Connection event for callbacks.
pub type ConnectionEvent {
ConnectionDisconnected(DisconnectReason)
ConnectionReconnected
}
Constructors
-
ConnectionDisconnected(DisconnectReason) -
ConnectionReconnected
Connection state.
pub type ConnectionState {
Connected
Disconnected(reason: DisconnectReason)
}
Constructors
-
Connected -
Disconnected(reason: DisconnectReason)
Errors that can occur during consumer operations.
Returned by subscribe, subscribe_with_options, unsubscribe, ack.
pub type ConsumeError {
ConsumeInitTimeout
ConsumeInitFailed(String)
ConsumeProcessNotFound
ConsumeChannelClosed(String)
ConsumeUnknownError(String)
}
Constructors
-
ConsumeInitTimeoutConsumer initialization timed out
-
ConsumeInitFailed(String)Consumer initialization failed
-
ConsumeProcessNotFoundThe consumer process could not be found
-
ConsumeChannelClosed(String)The channel is closed
-
ConsumeUnknownError(String)An unknown consume error occurred
Opaque reference to a consumer supervisor.
Similar to how pog’s Connection wraps a pool reference, this type wraps the consumer supervisor name. Use it to subscribe to queues.
- Use
start_consumerornamed_consumerto get a Consumer - Use
subscribewith a Consumer to start consuming (returns consumer_tag) - Use
unsubscribewith channel + consumer_tag to stop consuming
pub opaque type Consumer
Configuration for a consumer.
pub opaque type ConsumerConfig
The message type for the consumer supervisor. Used when registering with a name.
pub type ConsumerSupervisorMessage =
factory_supervisor.Message(ConsumerConfig, String)
Metadata about a message delivery from the broker. Contains information about how and from where the message was delivered.
pub type Deliver {
Deliver(
consumer_tag: String,
delivery_tag: Int,
redelivered: Bool,
exchange: String,
routing_key: String,
)
}
Constructors
-
Deliver( consumer_tag: String, delivery_tag: Int, redelivered: Bool, exchange: String, routing_key: String, )Arguments
- consumer_tag
-
Identifier for the consumer that received this message.
- delivery_tag
-
Unique identifier for this delivery, used for acknowledgment.
- redelivered
-
True if this message was previously delivered but not acknowledged.
- exchange
-
The exchange the message was published to.
- routing_key
-
The routing key used when the message was published.
Reason for disconnection.
pub type DisconnectReason {
ServerClosed
NetworkError
UserClosed
Unknown(String)
ConnectionProcessNotAlive
}
Constructors
-
ServerClosedServer closed the connection.
-
NetworkErrorNetwork error.
-
UserClosedExplicitly closed by user.
-
Unknown(String)Unknown reason.
-
ConnectionProcessNotAliveThe connection process is no longer running.
Represents an AMQP exchange configuration. Exchanges receive messages from producers and route them to queues based on routing rules defined by the exchange type.
Use exchange() to create an exchange with defaults, then customize
using record update syntax.
pub type Exchange {
Exchange(
name: String,
exchange_type: ExchangeType,
durable: Bool,
auto_delete: Bool,
internal: Bool,
nowait: Bool,
)
}
Constructors
-
Exchange( name: String, exchange_type: ExchangeType, durable: Bool, auto_delete: Bool, internal: Bool, nowait: Bool, )
Errors that can occur during exchange operations.
Returned by declare_exchange, delete_exchange, bind_exchange, unbind_exchange.
pub type ExchangeError {
ExchangeNotFound(String)
ExchangeAccessRefused(String)
ExchangePreconditionFailed(String)
ExchangeChannelClosed(String)
ExchangeUnknownError(String)
}
Constructors
-
ExchangeNotFound(String)The exchange was not found
-
ExchangeAccessRefused(String)Access to the exchange was refused
-
ExchangePreconditionFailed(String)A precondition for the exchange operation failed
-
ExchangeChannelClosed(String)The channel is closed
-
ExchangeUnknownError(String)An unknown exchange error occurred
The type of routing logic an exchange uses to deliver messages to queues.
pub type ExchangeType {
Fanout
Direct
Topic
Headers
}
Constructors
-
FanoutMessages are delivered to all bound queues regardless of routing key.
-
DirectMessages are delivered to queues with an exact routing key match.
-
TopicMessages are delivered to queues with pattern-matching on routing key. Supports wildcards:
*matches one word,#matches zero or more words. -
HeadersMessages are routed based on header attributes rather than routing key.
An opaque container for message headers. Headers are key-value pairs that can be attached to messages for additional metadata or routing with header exchanges.
Create with headers_from_list() and read with headers_to_list().
pub opaque type HeaderList
Represents a typed header value. AMQP headers support several primitive types.
pub type HeaderValue {
BoolHeader(Bool)
FloatHeader(Float)
IntHeader(Int)
StringHeader(String)
ListHeader(List(HeaderValue))
}
Constructors
-
BoolHeader(Bool)A boolean header value.
-
FloatHeader(Float)A floating-point header value.
-
IntHeader(Int)An integer header value.
-
StringHeader(String)A string header value.
-
ListHeader(List(HeaderValue))A list of header values (nested).
A message payload received from the broker. Contains the message body, AMQP properties, and custom headers.
pub type Payload {
Payload(
payload: BitArray,
properties: List(PublishOption),
headers: HeaderList,
)
}
Constructors
-
Payload( payload: BitArray, properties: List(PublishOption), headers: HeaderList, )Arguments
- payload
-
The message body as raw bytes
- properties
-
AMQP message properties (content type, correlation ID, etc.)
- headers
-
Custom headers attached to the message
Errors that can occur when publishing messages.
Returned by publish.
pub type PublishError {
PublishNoRoute(String)
PublishChannelClosed(String)
PublishUnknownError(String)
}
Constructors
-
PublishNoRoute(String)No route exists to deliver the message (when mandatory flag is set)
-
PublishChannelClosed(String)The channel is closed
-
PublishUnknownError(String)An unknown publish error occurred
Options for publishing messages.
pub type PublishOption {
Mandatory(Bool)
ContentType(String)
ContentEncoding(String)
MessageHeaders(HeaderList)
Persistent(Bool)
CorrelationId(String)
Priority(Int)
ReplyTo(String)
Expiration(duration.Duration)
MessageId(String)
Timestamp(timestamp.Timestamp)
Type(String)
UserId(String)
AppId(String)
}
Constructors
-
Mandatory(Bool)If set, returns an error if the broker can’t route the message to a queue.
-
ContentType(String)MIME content type.
-
ContentEncoding(String)MIME content encoding.
-
MessageHeaders(HeaderList)Headers to attach to the message. Use
headers_from_listto create headers for sending, andheaders_to_listto read headers from received messages. -
Persistent(Bool)If set, uses persistent delivery mode. Messages marked as persistent that are delivered to durable queues will be logged to disk.
-
CorrelationId(String)Arbitrary application-specific message identifier.
-
Priority(Int)Message priority, ranging from 0 to 9.
-
ReplyTo(String)Name of the reply queue.
-
Expiration(duration.Duration)How long the message is valid before it expires.
-
MessageId(String)Message identifier.
-
Timestamp(timestamp.Timestamp)Timestamp associated with this message. Note: AMQP only supports second-level precision, so any nanoseconds in the timestamp will be truncated when sending.
-
Type(String)Message type.
-
UserId(String)Creating user ID. RabbitMQ will validate this against the active connection user.
-
AppId(String)Application ID.
Represents a declared queue on the broker.
Returned by declare_queue() and queue_status() with current statistics.
pub type Queue {
Queue(name: String, message_count: Int, consumer_count: Int)
}
Constructors
-
Queue(name: String, message_count: Int, consumer_count: Int)The declared queue with its name, current message count, and consumer count.
Configuration for declaring a queue.
Use queue() to create a queue with sensible defaults,
then customize using record update syntax.
pub type QueueConfig {
QueueConfig(
name: String,
passive: Bool,
durable: Bool,
exclusive: Bool,
auto_delete: Bool,
nowait: Bool,
)
}
Constructors
-
QueueConfig( name: String, passive: Bool, durable: Bool, exclusive: Bool, auto_delete: Bool, nowait: Bool, )
Errors that can occur during queue operations.
Returned by declare_queue, delete_queue, bind_queue, unbind_queue, purge_queue, queue_status.
pub type QueueError {
QueueNotFound(String)
QueueAccessRefused(String)
QueuePreconditionFailed(String)
QueueResourceLocked(String)
QueueChannelClosed(String)
QueueUnknownError(String)
}
Constructors
-
QueueNotFound(String)The queue was not found
-
QueueAccessRefused(String)Access to the queue was refused
-
QueuePreconditionFailed(String)A precondition for the queue operation failed
-
QueueResourceLocked(String)The queue is locked and cannot be accessed
-
QueueChannelClosed(String)The channel is closed
-
QueueUnknownError(String)An unknown queue error occurred
Options for subscribing to a queue.
pub type QueueOption {
AutoAck(Bool)
}
Constructors
-
AutoAck(Bool)If True, messages are automatically acknowledged upon delivery. If False, you must call
ack()to acknowledge messages manually.
Values
pub fn ack(
channel: Channel,
delivery_tag: Int,
multiple: Bool,
) -> Result(Nil, ConsumeError)
Acknowledge a message delivery. Used when manual acknowledgment is enabled (AutoAck(False)).
Parameters
channel: The channel to acknowledge ondelivery_tag: The delivery tag from the message metadatamultiple: If True, acknowledges all messages up to and including this delivery tag
Example
carotte.subscribe_with_options(
supervisor,
channel: ch,
queue: "my_queue",
options: [carotte.AutoAck(False)],
callback: fn(msg, meta) {
// Process message
let _ = carotte.ack(ch, meta.delivery_tag, False)
},
)
pub fn ack_single(
channel: Channel,
delivery_tag: Int,
) -> Result(Nil, ConsumeError)
Acknowledge a message delivery (acknowledges only this message). Convenience function for ack with multiple=False.
pub fn bind_exchange(
channel channel: Channel,
source source: String,
destination destination: String,
routing_key routing_key: String,
) -> Result(Nil, ExchangeError)
Bind an exchange to another exchange. Routing keys are used to filter messages from the source exchange.
pub fn bind_exchange_async(
channel channel: Channel,
source source: String,
destination destination: String,
routing_key routing_key: String,
) -> Result(Nil, ExchangeError)
Bind an exchange to another exchange without waiting for a response.
Same semantics as bind_exchange.
pub fn bind_queue(
channel channel: Channel,
queue queue: String,
exchange exchange: String,
routing_key routing_key: String,
) -> Result(Nil, QueueError)
Bind a queue to an exchange.
The routing_key is used to filter messages from the exchange.
pub fn bind_queue_async(
channel channel: Channel,
queue queue: String,
exchange exchange: String,
routing_key routing_key: String,
) -> Result(Nil, QueueError)
Bind a queue to an exchange asynchronously.
Same semantics as bind_queue.
pub fn close(client: Client) -> Result(Nil, ConnectionError)
Close the RabbitMQ client connection. This will close all channels and the underlying AMQP connection.
pub fn close_channel(
channel: Channel,
) -> Result(Nil, ChannelError)
Close a channel. This releases the channel resources on the server. Once closed, the channel cannot be used for further operations.
pub fn commit_transaction(
channel: Channel,
) -> Result(Nil, ChannelError)
Commit the current transaction on a channel.
All messages published since start_transaction (or the last commit) are delivered.
pub fn connection_state(client: Client) -> ConnectionState
Get the current connection state.
pub fn consumer_supervised(
name: process.Name(
factory_supervisor.Message(ConsumerConfig, String),
),
) -> supervision.ChildSpecification(
factory_supervisor.Supervisor(ConsumerConfig, String),
)
Create a child specification for adding the consumer supervisor to your application’s supervision tree.
This is the recommended way to start the consumer supervisor, as it ensures proper lifecycle management within your OTP application.
You must provide a name so that other parts of your application can find the supervisor to subscribe consumers.
Example
import gleam/erlang/process
import gleam/otp/static_supervisor
pub fn start_app() {
// Create a name at program startup
let consumers_name = process.new_name("consumers")
// Create the child specification (max 5 restarts in 10 seconds)
let consumer_spec = carotte.consumer_supervised(consumers_name)
// Add to your supervision tree
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(consumer_spec)
|> static_supervisor.start()
// Later, get the supervisor to subscribe
let sup = carotte.named_consumer(consumers_name)
carotte.subscribe(sup, channel: ch, queue: "my_queue", callback: handler)
}
pub fn declare_exchange(
exchange: Exchange,
channel: Channel,
) -> Result(Nil, ExchangeError)
Declare an exchange on the broker.
pub fn declare_exchange_async(
exchange: Exchange,
channel: Channel,
) -> Result(Nil, ExchangeError)
Declare an exchange on the broker without waiting for a response.
pub fn declare_queue(
queue: QueueConfig,
channel: Channel,
) -> Result(Queue, QueueError)
Declare a queue on the broker.
pub fn declare_queue_async(
queue: QueueConfig,
channel: Channel,
) -> Result(Nil, QueueError)
Declare a queue on the broker asynchronously.
pub fn default_client() -> ClientConfig
Create a new client builder with default settings. Uses guest/guest credentials on localhost:5672.
Example
let client = carotte.default_client()
|> carotte.start()
pub fn default_queue(name: String) -> QueueConfig
Create a queue configuration with the given name and sensible defaults. All boolean options default to False.
To customize, use record update syntax:
QueueConfig(..default_queue("my_queue"), durable: True, exclusive: True)
For an auto-generated queue name, pass an empty string:
default_queue("")
|> declare_queue(channel)
// Returns Queue with broker-generated name like "amq.gen-..."
pub fn delete_exchange(
channel channel: Channel,
exchange exchange: String,
if_unused unused: Bool,
) -> Result(Nil, ExchangeError)
Delete an exchange from the broker.
If unused is set to true, the exchange will only be deleted if it has no queues bound to it.
pub fn delete_exchange_async(
channel channel: Channel,
exchange exchange: String,
if_unused unused: Bool,
) -> Result(Nil, ExchangeError)
Delete an exchange from the broker without waiting for a response.
pub fn delete_queue(
channel channel: Channel,
queue queue: String,
if_unused if_unused: Bool,
if_empty if_empty: Bool,
) -> Result(Int, QueueError)
Delete a queue from the broker.
If if_unused is set, the queue will only be deleted if it has no subscribers.
If if_empty is set, the queue will only be deleted if it has no messages.
Returns the number of messages that were in the queue when it was deleted.
pub fn delete_queue_async(
channel channel: Channel,
queue queue: String,
if_unused if_unused: Bool,
if_empty if_empty: Bool,
) -> Result(Nil, QueueError)
Delete a queue from the broker asynchronously.
Same semantics as delete_queue.
pub fn describe_channel_error(err: ChannelError) -> String
Convert a ChannelError to a human-readable string description.
pub fn describe_connection_error(err: ConnectionError) -> String
Convert a ConnectionError to a human-readable string description. Useful for logging or displaying error messages to users.
pub fn describe_consume_error(err: ConsumeError) -> String
Convert a ConsumeError to a human-readable string description.
pub fn describe_exchange_error(err: ExchangeError) -> String
Convert an ExchangeError to a human-readable string description.
pub fn describe_publish_error(err: PublishError) -> String
Convert a PublishError to a human-readable string description.
pub fn describe_queue_error(err: QueueError) -> String
Convert a QueueError to a human-readable string description.
pub fn empty_headers() -> HeaderList
Create an empty HeaderList. Useful for pattern matching or when no headers are needed.
pub fn exchange(name: String) -> Exchange
Create an exchange with the given name and sensible defaults. Returns a Direct exchange with all options set to False.
To customize, use record update syntax:
Exchange(..exchange("events"), exchange_type: Topic, durable: True)
pub fn get_message(
channel: Channel,
queue queue: String,
auto_ack auto_ack: Bool,
) -> Result(option.Option(#(Payload, Deliver)), ConsumeError)
Get a single message from a queue without subscribing. This is a synchronous, polling-based approach to consuming messages.
Parameters
channel: The channel to usequeue: The queue name to get a message fromauto_ack: If True, the message is automatically acknowledged. If False, you must callack().
Returns
Ok(Some(#(payload, deliver)))if a message was availableOk(None)if the queue is emptyError(consume_error)if there was an error
Example
case carotte.get_message(ch, queue: "my_queue", auto_ack: True) {
Ok(Some(#(payload, deliver))) -> {
let assert Ok(text) = bit_array.to_string(payload.payload)
io.println("Got message: " <> text)
}
Ok(None) -> io.println("Queue is empty")
Error(e) -> io.println("Error: " <> carotte.describe_consume_error(e))
}
Note: For continuous message consumption, use subscribe() instead.
This function is best for one-off message retrieval or polling scenarios.
pub fn headers_from_list(
list: List(#(String, HeaderValue)),
) -> HeaderList
Create a HeaderList from a list of name-value pairs. Use this to construct headers for messages.
Example
let headers = headers_from_list([
#("user_id", StringHeader("123")),
#("retry_count", IntHeader(3)),
#("is_test", BoolHeader(True)),
])
pub fn headers_to_list(
headers: HeaderList,
) -> List(#(String, HeaderValue))
Convert a HeaderList back to a list of name-value pairs. Use this to read headers from received messages.
Example
case carotte.subscribe(supervisor, channel, "my_queue", fn(payload, _deliver) {
let headers = carotte.headers_to_list(payload.headers)
// headers: List(#(String, HeaderValue))
})
pub fn is_connected(client: Client) -> Bool
Check if the client connection is currently active.
pub fn nack(
channel: Channel,
delivery_tag: Int,
multiple: Bool,
requeue: Bool,
) -> Result(Nil, ConsumeError)
Negatively acknowledge a message delivery. Used when manual acknowledgment is enabled (AutoAck(False)) and you want to indicate that the message could not be processed.
Parameters
channel: The channel to nack ondelivery_tag: The delivery tag from the message metadatamultiple: If True, nacks all messages up to and including this delivery tagrequeue: If True, the message(s) will be requeued; if False, they will be discarded or dead-lettered (if a dead letter exchange is configured)
Example
carotte.subscribe_with_options(
consumer,
channel: ch,
queue: "my_queue",
options: [carotte.AutoAck(False)],
callback: fn(msg, meta) {
case process_message(msg) {
Ok(_) -> carotte.ack_single(ch, meta.delivery_tag)
Error(_) -> carotte.nack(ch, meta.delivery_tag, False, True) // Requeue for retry
}
},
)
pub fn nack_single(
channel: Channel,
delivery_tag: Int,
requeue: Bool,
) -> Result(Nil, ConsumeError)
Negatively acknowledge a single message. Convenience function for nack with multiple=False.
Parameters
channel: The channel to nack ondelivery_tag: The delivery tag from the message metadatarequeue: If True, the message will be requeued; if False, it will be discarded or dead-lettered
pub fn named_consumer(
name: process.Name(
factory_supervisor.Message(ConsumerConfig, String),
),
) -> Consumer
Get a reference to a running consumer supervisor by its registered name.
Use this to get a supervisor reference after it has been started as part
of your supervision tree via consumer_supervised.
Example
let consumer = carotte.named_consumer(consumers_name)
pub fn open_channel(
client: Client,
) -> Result(Channel, ChannelError)
Open a channel to a RabbitMQ server.
pub fn publish(
channel channel: Channel,
exchange exchange: String,
routing_key routing_key: String,
payload payload: BitArray,
options options: List(PublishOption),
) -> Result(Nil, PublishError)
Publish a message to an exchange.
The routing_key is used to route messages to queues.
The options are used to set message properties.
pub fn purge_queue(
channel channel: Channel,
queue queue: String,
) -> Result(Int, QueueError)
Purge a queue of all messages.
pub fn purge_queue_async(
channel channel: Channel,
queue queue: String,
) -> Result(Nil, QueueError)
Purge a queue of all messages asynchronously.
pub fn queue_status(
channel channel: Channel,
queue queue: String,
) -> Result(Queue, QueueError)
Get the status of a queue.
pub fn reconnect(
client: Client,
) -> Result(Client, ConnectionError)
Attempt to reconnect a disconnected client. Uses the original connection parameters. Returns error if already connected or reconnection fails.
pub fn reject(
channel: Channel,
delivery_tag: Int,
requeue: Bool,
) -> Result(Nil, ConsumeError)
Reject a message delivery. Similar to nack but only works with a single message (no multiple option). This is the original AMQP 0-9-1 method for rejecting messages.
Parameters
channel: The channel to reject ondelivery_tag: The delivery tag from the message metadatarequeue: If True, the message will be requeued; if False, it will be discarded or dead-lettered (if a dead letter exchange is configured)
Example
carotte.subscribe_with_options(
consumer,
channel: ch,
queue: "my_queue",
options: [carotte.AutoAck(False)],
callback: fn(msg, meta) {
case validate_message(msg) {
Ok(_) -> carotte.ack_single(ch, meta.delivery_tag)
Error(_) -> carotte.reject(ch, meta.delivery_tag, False) // Discard invalid message
}
},
)
pub fn rollback_transaction(
channel: Channel,
) -> Result(Nil, ChannelError)
Rollback the current transaction on a channel.
All messages published since start_transaction (or the last commit) are discarded.
pub fn set_qos(
channel: Channel,
prefetch_count: Int,
global: Bool,
) -> Result(Nil, ChannelError)
Set Quality of Service (QoS) for a channel. Controls the prefetch count for message delivery.
Parameters
channel: The channel to configureprefetch_count: Maximum number of unacknowledged messages. Set to 0 for unlimited.global: If True, applies to the entire connection. If False, applies only to this channel.
Example
// Limit to 10 unacknowledged messages per consumer
let assert Ok(_) = carotte.set_qos(ch, prefetch_count: 10, global: False)
This is essential for load balancing across multiple consumers.
pub fn start(
builder: ClientConfig,
) -> Result(Client, ConnectionError)
Start a RabbitMQ client connection. Returns an actor.StartResult which contains the client on success.
Example
case carotte.start(builder) {
Ok(client) -> // Use the client
Error(connection_error) -> // Handle connection error
}
pub fn start_consumer(
name: process.Name(
factory_supervisor.Message(ConsumerConfig, String),
),
) -> Result(Consumer, actor.StartError)
Start the consumer supervisor directly without adding it to a supervision tree.
Most of the time you want to use consumer_supervised and add the
supervisor to your application’s supervision tree instead of using this
function directly.
The supervisor will be linked to the calling process and registered with the given name.
Example
let name = process.new_name("my_consumers")
let assert Ok(consumer) = carotte.start_consumer(name)
pub fn start_transaction(
channel: Channel,
) -> Result(Nil, ChannelError)
Enable transaction mode on a channel.
Once enabled, messages published on this channel will not be delivered
until commit_transaction is called, or discarded if rollback_transaction is called.
Example
let assert Ok(_) = carotte.start_transaction(ch)
let assert Ok(_) = carotte.publish(channel: ch, exchange: "ex", routing_key: "key", payload: <<"msg1">>, options: [])
let assert Ok(_) = carotte.publish(channel: ch, exchange: "ex", routing_key: "key", payload: <<"msg2">>, options: [])
let assert Ok(_) = carotte.commit_transaction(ch) // Both messages delivered atomically
pub fn subscribe(
consumer: Consumer,
channel channel: Channel,
queue queue: String,
callback callback: fn(Payload, Deliver) -> Nil,
) -> Result(String, ConsumeError)
Start a consumer under supervision. Returns the consumer_tag string which can be used to unsubscribe later.
pub fn subscribe_with_options(
consumer: Consumer,
channel channel: Channel,
queue queue: String,
options options: List(QueueOption),
callback callback: fn(Payload, Deliver) -> Nil,
) -> Result(String, ConsumeError)
Start a consumer with options under supervision. Returns the consumer_tag string which can be used to unsubscribe later.
pub fn unbind_exchange(
channel channel: Channel,
source source: String,
destination destination: String,
routing_key routing_key: String,
) -> Result(Nil, ExchangeError)
Unbind an exchange from another exchange.
pub fn unbind_exchange_async(
channel channel: Channel,
source source: String,
destination destination: String,
routing_key routing_key: String,
) -> Result(Nil, ExchangeError)
Unbind an exchange from another exchange asynchronously.
Same semantics as unbind_exchange.
pub fn unbind_queue(
channel channel: Channel,
queue queue: String,
exchange exchange: String,
routing_key routing_key: String,
) -> Result(Nil, QueueError)
Unbind a queue from an exchange.
The routing_key is used to filter messages from the exchange.
pub fn unsubscribe(
channel channel: Channel,
consumer_tag consumer_tag: String,
) -> Result(Nil, ConsumeError)
Unsubscribe and stop a consumer gracefully.
pub fn unsubscribe_async(
channel channel: Channel,
consumer_tag consumer_tag: String,
) -> Result(Nil, ConsumeError)
Unsubscribe a consumer asynchronously.