carotte

Carotte 🥕

A type-safe RabbitMQ client for Gleam that provides a clean, idiomatic interface for message queue operations on the Erlang VM.

Quick Start

import carotte
import gleam/bit_array
import gleam/erlang/process
import gleam/io

pub fn main() {
  // Connect to RabbitMQ
  let assert Ok(client) = carotte.start(carotte.default_client())
  let assert Ok(ch) = carotte.open_channel(client)

  // Declare exchange and queue
  let assert Ok(_) = carotte.declare_exchange(carotte.exchange("my_exchange"), ch)
  let assert Ok(_) = carotte.declare_queue(carotte.queue("my_queue"), ch)
  let assert Ok(_) = carotte.bind_queue(channel: ch, queue: "my_queue", exchange: "my_exchange", routing_key: "")

  // Start consumer supervisor and subscribe
  let consumers = process.new_name("consumers")
  let assert Ok(consumer) = carotte.start_consumer(consumers)
  let assert Ok(_) = carotte.subscribe(consumer, channel: ch, queue: "my_queue", callback: fn(msg, _) {
    let assert Ok(text) = bit_array.to_string(msg.payload)
    io.println("Received: " <> text)
  })

  // Publish a message (payload is BitArray)
  let assert Ok(_) = carotte.publish(channel: ch, exchange: "my_exchange", routing_key: "", payload: <<"Hello!">>, options: [])
}

Features

OTP Supervision

For production use, integrate consumers into your supervision tree:

import gleam/erlang/process
import gleam/otp/static_supervisor

let consumers_name = process.new_name("consumers")
let spec = carotte.consumer_supervised(consumers_name)

static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(spec)
|> static_supervisor.start()

let consumer = carotte.named_consumer(consumers_name)
carotte.subscribe(consumer, channel: ch, queue: "my_queue", callback: handler)

Error Handling

Each operation category has its own error type:

Error TypeOperations
ConnectionErrorstart, close, reconnect
ChannelErroropen_channel
ExchangeErrordeclare_exchange, delete_exchange, bind_exchange, unbind_exchange
QueueErrordeclare_queue, delete_queue, bind_queue, unbind_queue, purge_queue, queue_status
PublishErrorpublish
ConsumeErrorsubscribe, unsubscribe, ack

Use describe_*_error functions to convert errors to human-readable strings.

Types

Represents an AMQP channel within a connection. Channels are lightweight connections that share a single TCP connection. Most AMQP operations are performed on channels.

pub type Channel

Errors that can occur when opening or using channels. Returned by open_channel.

pub type ChannelError {
  ChannelClosed(String)
  ChannelProcessNotFound
  ChannelConnectionClosed
  ChannelUnknownError(String)
}

Constructors

  • ChannelClosed(String)

    The channel has been closed

  • ChannelProcessNotFound

    The channel process could not be found

  • ChannelConnectionClosed

    The connection is closed, cannot open channel

  • ChannelUnknownError(String)

    An unknown channel error occurred

Represents an active connection to a RabbitMQ server. This is an opaque type that encapsulates the underlying AMQP client process. Use the builder pattern with default_client() and start() to create a client.

pub opaque type Client

Configuration builder for creating a RabbitMQ client. Use default_client() to create a builder with sensible defaults, then chain the with_* functions to customize the configuration.

pub type ClientConfig {
  ClientConfig(
    username: String,
    password: String,
    virtual_host: String,
    host: String,
    port: Int,
    channel_max: Int,
    frame_max: Int,
    heartbeat: duration.Duration,
    connection_timeout: duration.Duration,
  )
}

Constructors

  • ClientConfig(
      username: String,
      password: String,
      virtual_host: String,
      host: String,
      port: Int,
      channel_max: Int,
      frame_max: Int,
      heartbeat: duration.Duration,
      connection_timeout: duration.Duration,
    )

    Arguments

    heartbeat

    Heartbeat interval for the connection. The minimum duration is one second.

    connection_timeout

    Timeout for establishing a connection.

Errors that can occur when establishing or managing RabbitMQ connections. Returned by start, close, and reconnect.

pub type ConnectionError {
  ConnectionBlocked
  ConnectionClosed
  ConnectionAuthFailure(String)
  ConnectionRefused(String)
  ConnectionTimeout(String)
  NotConnected
  ReconnectionFailed(ConnectionError)
  AlreadyConnected
  ConnectionUnknownError(String)
}

Constructors

  • ConnectionBlocked

    The connection is blocked by the server due to resource constraints

  • ConnectionClosed

    The connection has been closed

  • ConnectionAuthFailure(String)

    Authentication failed with the provided credentials

  • ConnectionRefused(String)

    Connection to the server was refused

  • ConnectionTimeout(String)

    Connection attempt timed out

  • NotConnected

    Connection is not currently active

  • ReconnectionFailed(ConnectionError)

    Reconnection failed with the underlying cause

  • AlreadyConnected

    Connection is already established

  • ConnectionUnknownError(String)

    An unknown connection error occurred

Connection event for callbacks.

pub type ConnectionEvent {
  ConnectionDisconnected(DisconnectReason)
  ConnectionReconnected
}

Constructors

Connection state.

pub type ConnectionState {
  Connected
  Disconnected(reason: DisconnectReason)
}

Constructors

Errors that can occur during consumer operations. Returned by subscribe, subscribe_with_options, unsubscribe, ack.

pub type ConsumeError {
  ConsumeInitTimeout
  ConsumeInitFailed(String)
  ConsumeProcessNotFound
  ConsumeChannelClosed(String)
  ConsumeUnknownError(String)
}

Constructors

  • ConsumeInitTimeout

    Consumer initialization timed out

  • ConsumeInitFailed(String)

    Consumer initialization failed

  • ConsumeProcessNotFound

    The consumer process could not be found

  • ConsumeChannelClosed(String)

    The channel is closed

  • ConsumeUnknownError(String)

    An unknown consume error occurred

Opaque reference to a consumer supervisor.

Similar to how pog’s Connection wraps a pool reference, this type wraps the consumer supervisor name. Use it to subscribe to queues.

  • Use start_consumer or named_consumer to get a Consumer
  • Use subscribe with a Consumer to start consuming (returns consumer_tag)
  • Use unsubscribe with channel + consumer_tag to stop consuming
pub opaque type Consumer

Configuration for a consumer.

pub opaque type ConsumerConfig

The message type for the consumer supervisor. Used when registering with a name.

pub type ConsumerSupervisorMessage =
  factory_supervisor.Message(ConsumerConfig, String)

Metadata about a message delivery from the broker. Contains information about how and from where the message was delivered.

pub type Deliver {
  Deliver(
    consumer_tag: String,
    delivery_tag: Int,
    redelivered: Bool,
    exchange: String,
    routing_key: String,
  )
}

Constructors

  • Deliver(
      consumer_tag: String,
      delivery_tag: Int,
      redelivered: Bool,
      exchange: String,
      routing_key: String,
    )

    Arguments

    consumer_tag

    Identifier for the consumer that received this message.

    delivery_tag

    Unique identifier for this delivery, used for acknowledgment.

    redelivered

    True if this message was previously delivered but not acknowledged.

    exchange

    The exchange the message was published to.

    routing_key

    The routing key used when the message was published.

Reason for disconnection.

pub type DisconnectReason {
  ServerClosed
  NetworkError
  UserClosed
  Unknown(String)
  ConnectionProcessNotAlive
}

Constructors

  • ServerClosed

    Server closed the connection.

  • NetworkError

    Network error.

  • UserClosed

    Explicitly closed by user.

  • Unknown(String)

    Unknown reason.

  • ConnectionProcessNotAlive

    The connection process is no longer running.

Represents an AMQP exchange configuration. Exchanges receive messages from producers and route them to queues based on routing rules defined by the exchange type.

Use exchange() to create an exchange with defaults, then customize using record update syntax.

pub type Exchange {
  Exchange(
    name: String,
    exchange_type: ExchangeType,
    durable: Bool,
    auto_delete: Bool,
    internal: Bool,
    nowait: Bool,
  )
}

Constructors

  • Exchange(
      name: String,
      exchange_type: ExchangeType,
      durable: Bool,
      auto_delete: Bool,
      internal: Bool,
      nowait: Bool,
    )

Errors that can occur during exchange operations. Returned by declare_exchange, delete_exchange, bind_exchange, unbind_exchange.

pub type ExchangeError {
  ExchangeNotFound(String)
  ExchangeAccessRefused(String)
  ExchangePreconditionFailed(String)
  ExchangeChannelClosed(String)
  ExchangeUnknownError(String)
}

Constructors

  • ExchangeNotFound(String)

    The exchange was not found

  • ExchangeAccessRefused(String)

    Access to the exchange was refused

  • ExchangePreconditionFailed(String)

    A precondition for the exchange operation failed

  • ExchangeChannelClosed(String)

    The channel is closed

  • ExchangeUnknownError(String)

    An unknown exchange error occurred

The type of routing logic an exchange uses to deliver messages to queues.

pub type ExchangeType {
  Fanout
  Direct
  Topic
  Headers
}

Constructors

  • Fanout

    Messages are delivered to all bound queues regardless of routing key.

  • Direct

    Messages are delivered to queues with an exact routing key match.

  • Topic

    Messages are delivered to queues with pattern-matching on routing key. Supports wildcards: * matches one word, # matches zero or more words.

  • Headers

    Messages are routed based on header attributes rather than routing key.

An opaque container for message headers. Headers are key-value pairs that can be attached to messages for additional metadata or routing with header exchanges.

Create with headers_from_list() and read with headers_to_list().

pub opaque type HeaderList

Represents a typed header value. AMQP headers support several primitive types.

pub type HeaderValue {
  BoolHeader(Bool)
  FloatHeader(Float)
  IntHeader(Int)
  StringHeader(String)
  ListHeader(List(HeaderValue))
}

Constructors

  • BoolHeader(Bool)

    A boolean header value.

  • FloatHeader(Float)

    A floating-point header value.

  • IntHeader(Int)

    An integer header value.

  • StringHeader(String)

    A string header value.

  • ListHeader(List(HeaderValue))

    A list of header values (nested).

A message payload received from the broker. Contains the message body, AMQP properties, and custom headers.

pub type Payload {
  Payload(
    payload: BitArray,
    properties: List(PublishOption),
    headers: HeaderList,
  )
}

Constructors

  • Payload(
      payload: BitArray,
      properties: List(PublishOption),
      headers: HeaderList,
    )

    Arguments

    payload

    The message body as raw bytes

    properties

    AMQP message properties (content type, correlation ID, etc.)

    headers

    Custom headers attached to the message

Errors that can occur when publishing messages. Returned by publish.

pub type PublishError {
  PublishNoRoute(String)
  PublishChannelClosed(String)
  PublishUnknownError(String)
}

Constructors

  • PublishNoRoute(String)

    No route exists to deliver the message (when mandatory flag is set)

  • PublishChannelClosed(String)

    The channel is closed

  • PublishUnknownError(String)

    An unknown publish error occurred

Options for publishing messages.

pub type PublishOption {
  Mandatory(Bool)
  ContentType(String)
  ContentEncoding(String)
  MessageHeaders(HeaderList)
  Persistent(Bool)
  CorrelationId(String)
  Priority(Int)
  ReplyTo(String)
  Expiration(duration.Duration)
  MessageId(String)
  Timestamp(timestamp.Timestamp)
  Type(String)
  UserId(String)
  AppId(String)
}

Constructors

  • Mandatory(Bool)

    If set, returns an error if the broker can’t route the message to a queue.

  • ContentType(String)

    MIME content type.

  • ContentEncoding(String)

    MIME content encoding.

  • MessageHeaders(HeaderList)

    Headers to attach to the message. Use headers_from_list to create headers for sending, and headers_to_list to read headers from received messages.

  • Persistent(Bool)

    If set, uses persistent delivery mode. Messages marked as persistent that are delivered to durable queues will be logged to disk.

  • CorrelationId(String)

    Arbitrary application-specific message identifier.

  • Priority(Int)

    Message priority, ranging from 0 to 9.

  • ReplyTo(String)

    Name of the reply queue.

  • Expiration(duration.Duration)

    How long the message is valid before it expires.

  • MessageId(String)

    Message identifier.

  • Timestamp(timestamp.Timestamp)

    Timestamp associated with this message. Note: AMQP only supports second-level precision, so any nanoseconds in the timestamp will be truncated when sending.

  • Type(String)

    Message type.

  • UserId(String)

    Creating user ID. RabbitMQ will validate this against the active connection user.

  • AppId(String)

    Application ID.

Represents a declared queue on the broker. Returned by declare_queue() and queue_status() with current statistics.

pub type Queue {
  Queue(name: String, message_count: Int, consumer_count: Int)
}

Constructors

  • Queue(name: String, message_count: Int, consumer_count: Int)

    The declared queue with its name, current message count, and consumer count.

Configuration for declaring a queue. Use queue() to create a queue with sensible defaults, then customize using record update syntax.

pub type QueueConfig {
  QueueConfig(
    name: String,
    passive: Bool,
    durable: Bool,
    exclusive: Bool,
    auto_delete: Bool,
    nowait: Bool,
  )
}

Constructors

  • QueueConfig(
      name: String,
      passive: Bool,
      durable: Bool,
      exclusive: Bool,
      auto_delete: Bool,
      nowait: Bool,
    )

Errors that can occur during queue operations. Returned by declare_queue, delete_queue, bind_queue, unbind_queue, purge_queue, queue_status.

pub type QueueError {
  QueueNotFound(String)
  QueueAccessRefused(String)
  QueuePreconditionFailed(String)
  QueueResourceLocked(String)
  QueueChannelClosed(String)
  QueueUnknownError(String)
}

Constructors

  • QueueNotFound(String)

    The queue was not found

  • QueueAccessRefused(String)

    Access to the queue was refused

  • QueuePreconditionFailed(String)

    A precondition for the queue operation failed

  • QueueResourceLocked(String)

    The queue is locked and cannot be accessed

  • QueueChannelClosed(String)

    The channel is closed

  • QueueUnknownError(String)

    An unknown queue error occurred

Options for subscribing to a queue.

pub type QueueOption {
  AutoAck(Bool)
}

Constructors

  • AutoAck(Bool)

    If True, messages are automatically acknowledged upon delivery. If False, you must call ack() to acknowledge messages manually.

Values

pub fn ack(
  channel: Channel,
  delivery_tag: Int,
  multiple: Bool,
) -> Result(Nil, ConsumeError)

Acknowledge a message delivery. Used when manual acknowledgment is enabled (AutoAck(False)).

Parameters

  • channel: The channel to acknowledge on
  • delivery_tag: The delivery tag from the message metadata
  • multiple: If True, acknowledges all messages up to and including this delivery tag

Example

carotte.subscribe_with_options(
  supervisor,
  channel: ch,
  queue: "my_queue",
  options: [carotte.AutoAck(False)],
  callback: fn(msg, meta) {
    // Process message
    let _ = carotte.ack(ch, meta.delivery_tag, False)
  },
)
pub fn ack_single(
  channel: Channel,
  delivery_tag: Int,
) -> Result(Nil, ConsumeError)

Acknowledge a message delivery (acknowledges only this message). Convenience function for ack with multiple=False.

pub fn bind_exchange(
  channel channel: Channel,
  source source: String,
  destination destination: String,
  routing_key routing_key: String,
) -> Result(Nil, ExchangeError)

Bind an exchange to another exchange. Routing keys are used to filter messages from the source exchange.

pub fn bind_exchange_async(
  channel channel: Channel,
  source source: String,
  destination destination: String,
  routing_key routing_key: String,
) -> Result(Nil, ExchangeError)

Bind an exchange to another exchange without waiting for a response. Same semantics as bind_exchange.

pub fn bind_queue(
  channel channel: Channel,
  queue queue: String,
  exchange exchange: String,
  routing_key routing_key: String,
) -> Result(Nil, QueueError)

Bind a queue to an exchange. The routing_key is used to filter messages from the exchange.

pub fn bind_queue_async(
  channel channel: Channel,
  queue queue: String,
  exchange exchange: String,
  routing_key routing_key: String,
) -> Result(Nil, QueueError)

Bind a queue to an exchange asynchronously. Same semantics as bind_queue.

pub fn close(client: Client) -> Result(Nil, ConnectionError)

Close the RabbitMQ client connection. This will close all channels and the underlying AMQP connection.

pub fn connection_state(client: Client) -> ConnectionState

Get the current connection state.

pub fn consumer_supervised(
  name: process.Name(
    factory_supervisor.Message(ConsumerConfig, String),
  ),
) -> supervision.ChildSpecification(
  factory_supervisor.Supervisor(ConsumerConfig, String),
)

Create a child specification for adding the consumer supervisor to your application’s supervision tree.

This is the recommended way to start the consumer supervisor, as it ensures proper lifecycle management within your OTP application.

You must provide a name so that other parts of your application can find the supervisor to subscribe consumers.

Example

import gleam/erlang/process
import gleam/otp/static_supervisor

pub fn start_app() {
  // Create a name at program startup
  let consumers_name = process.new_name("consumers")

  // Create the child specification (max 5 restarts in 10 seconds)
  let consumer_spec = carotte.consumer_supervised(consumers_name)

  // Add to your supervision tree
  static_supervisor.new(static_supervisor.OneForOne)
  |> static_supervisor.add(consumer_spec)
  |> static_supervisor.start()

  // Later, get the supervisor to subscribe
  let sup = carotte.named_consumer(consumers_name)
  carotte.subscribe(sup, channel: ch, queue: "my_queue", callback: handler)
}
pub fn declare_exchange(
  exchange: Exchange,
  channel: Channel,
) -> Result(Nil, ExchangeError)

Declare an exchange on the broker.

pub fn declare_exchange_async(
  exchange: Exchange,
  channel: Channel,
) -> Result(Nil, ExchangeError)

Declare an exchange on the broker without waiting for a response.

pub fn declare_queue(
  queue: QueueConfig,
  channel: Channel,
) -> Result(Queue, QueueError)

Declare a queue on the broker.

pub fn declare_queue_async(
  queue: QueueConfig,
  channel: Channel,
) -> Result(Nil, QueueError)

Declare a queue on the broker asynchronously.

pub fn default_client() -> ClientConfig

Create a new client builder with default settings. Uses guest/guest credentials on localhost:5672.

Example

let client = carotte.default_client()
  |> carotte.start()
pub fn default_queue(name: String) -> QueueConfig

Create a queue configuration with the given name and sensible defaults. All boolean options default to False.

To customize, use record update syntax:

QueueConfig(..default_queue("my_queue"), durable: True, exclusive: True)

For an auto-generated queue name, pass an empty string:

default_queue("")
|> declare_queue(channel)
// Returns Queue with broker-generated name like "amq.gen-..."
pub fn delete_exchange(
  channel channel: Channel,
  exchange exchange: String,
  if_unused unused: Bool,
) -> Result(Nil, ExchangeError)

Delete an exchange from the broker. If unused is set to true, the exchange will only be deleted if it has no queues bound to it.

pub fn delete_exchange_async(
  channel channel: Channel,
  exchange exchange: String,
  if_unused unused: Bool,
) -> Result(Nil, ExchangeError)

Delete an exchange from the broker without waiting for a response.

pub fn delete_queue(
  channel channel: Channel,
  queue queue: String,
  if_unused if_unused: Bool,
  if_empty if_empty: Bool,
) -> Result(Int, QueueError)

Delete a queue from the broker. If if_unused is set, the queue will only be deleted if it has no subscribers. If if_empty is set, the queue will only be deleted if it has no messages. Returns the number of messages that were in the queue when it was deleted.

pub fn delete_queue_async(
  channel channel: Channel,
  queue queue: String,
  if_unused if_unused: Bool,
  if_empty if_empty: Bool,
) -> Result(Nil, QueueError)

Delete a queue from the broker asynchronously. Same semantics as delete_queue.

pub fn describe_channel_error(err: ChannelError) -> String

Convert a ChannelError to a human-readable string description.

pub fn describe_connection_error(err: ConnectionError) -> String

Convert a ConnectionError to a human-readable string description. Useful for logging or displaying error messages to users.

pub fn describe_consume_error(err: ConsumeError) -> String

Convert a ConsumeError to a human-readable string description.

pub fn describe_exchange_error(err: ExchangeError) -> String

Convert an ExchangeError to a human-readable string description.

pub fn describe_publish_error(err: PublishError) -> String

Convert a PublishError to a human-readable string description.

pub fn describe_queue_error(err: QueueError) -> String

Convert a QueueError to a human-readable string description.

pub fn empty_headers() -> HeaderList

Create an empty HeaderList. Useful for pattern matching or when no headers are needed.

pub fn exchange(name: String) -> Exchange

Create an exchange with the given name and sensible defaults. Returns a Direct exchange with all options set to False.

To customize, use record update syntax:

Exchange(..exchange("events"), exchange_type: Topic, durable: True)
pub fn headers_from_list(
  list: List(#(String, HeaderValue)),
) -> HeaderList

Create a HeaderList from a list of name-value pairs. Use this to construct headers for messages.

Example

let headers = headers_from_list([
  #("user_id", StringHeader("123")),
  #("retry_count", IntHeader(3)),
  #("is_test", BoolHeader(True)),
])
pub fn headers_to_list(
  headers: HeaderList,
) -> List(#(String, HeaderValue))

Convert a HeaderList back to a list of name-value pairs. Use this to read headers from received messages.

Example

case carotte.subscribe(supervisor, channel, "my_queue", fn(payload, _deliver) {
  let headers = carotte.headers_to_list(payload.headers)
  // headers: List(#(String, HeaderValue))
})
pub fn is_connected(client: Client) -> Bool

Check if the client connection is currently active.

pub fn nack(
  channel: Channel,
  delivery_tag: Int,
  multiple: Bool,
  requeue: Bool,
) -> Result(Nil, ConsumeError)

Negatively acknowledge a message delivery. Used when manual acknowledgment is enabled (AutoAck(False)) and you want to indicate that the message could not be processed.

Parameters

  • channel: The channel to nack on
  • delivery_tag: The delivery tag from the message metadata
  • multiple: If True, nacks all messages up to and including this delivery tag
  • requeue: If True, the message(s) will be requeued; if False, they will be discarded or dead-lettered (if a dead letter exchange is configured)

Example

carotte.subscribe_with_options(
  consumer,
  channel: ch,
  queue: "my_queue",
  options: [carotte.AutoAck(False)],
  callback: fn(msg, meta) {
    case process_message(msg) {
      Ok(_) -> carotte.ack_single(ch, meta.delivery_tag)
      Error(_) -> carotte.nack(ch, meta.delivery_tag, False, True)  // Requeue for retry
    }
  },
)
pub fn nack_single(
  channel: Channel,
  delivery_tag: Int,
  requeue: Bool,
) -> Result(Nil, ConsumeError)

Negatively acknowledge a single message. Convenience function for nack with multiple=False.

Parameters

  • channel: The channel to nack on
  • delivery_tag: The delivery tag from the message metadata
  • requeue: If True, the message will be requeued; if False, it will be discarded or dead-lettered
pub fn named_consumer(
  name: process.Name(
    factory_supervisor.Message(ConsumerConfig, String),
  ),
) -> Consumer

Get a reference to a running consumer supervisor by its registered name.

Use this to get a supervisor reference after it has been started as part of your supervision tree via consumer_supervised.

Example

let consumer = carotte.named_consumer(consumers_name)
pub fn open_channel(
  client: Client,
) -> Result(Channel, ChannelError)

Open a channel to a RabbitMQ server.

pub fn publish(
  channel channel: Channel,
  exchange exchange: String,
  routing_key routing_key: String,
  payload payload: BitArray,
  options options: List(PublishOption),
) -> Result(Nil, PublishError)

Publish a message to an exchange. The routing_key is used to route messages to queues. The options are used to set message properties.

pub fn purge_queue(
  channel channel: Channel,
  queue queue: String,
) -> Result(Int, QueueError)

Purge a queue of all messages.

pub fn purge_queue_async(
  channel channel: Channel,
  queue queue: String,
) -> Result(Nil, QueueError)

Purge a queue of all messages asynchronously.

pub fn queue_status(
  channel channel: Channel,
  queue queue: String,
) -> Result(Queue, QueueError)

Get the status of a queue.

pub fn reconnect(
  client: Client,
) -> Result(Client, ConnectionError)

Attempt to reconnect a disconnected client. Uses the original connection parameters. Returns error if already connected or reconnection fails.

pub fn reject(
  channel: Channel,
  delivery_tag: Int,
  requeue: Bool,
) -> Result(Nil, ConsumeError)

Reject a message delivery. Similar to nack but only works with a single message (no multiple option). This is the original AMQP 0-9-1 method for rejecting messages.

Parameters

  • channel: The channel to reject on
  • delivery_tag: The delivery tag from the message metadata
  • requeue: If True, the message will be requeued; if False, it will be discarded or dead-lettered (if a dead letter exchange is configured)

Example

carotte.subscribe_with_options(
  consumer,
  channel: ch,
  queue: "my_queue",
  options: [carotte.AutoAck(False)],
  callback: fn(msg, meta) {
    case validate_message(msg) {
      Ok(_) -> carotte.ack_single(ch, meta.delivery_tag)
      Error(_) -> carotte.reject(ch, meta.delivery_tag, False)  // Discard invalid message
    }
  },
)
pub fn start(
  builder: ClientConfig,
) -> Result(Client, ConnectionError)

Start a RabbitMQ client connection. Returns an actor.StartResult which contains the client on success.

Example

case carotte.start(builder) {
  Ok(client) -> // Use the client
  Error(connection_error) -> // Handle connection error
}
pub fn start_consumer(
  name: process.Name(
    factory_supervisor.Message(ConsumerConfig, String),
  ),
) -> Result(Consumer, actor.StartError)

Start the consumer supervisor directly without adding it to a supervision tree.

Most of the time you want to use consumer_supervised and add the supervisor to your application’s supervision tree instead of using this function directly.

The supervisor will be linked to the calling process and registered with the given name.

Example

let name = process.new_name("my_consumers")
let assert Ok(consumer) = carotte.start_consumer(name)
pub fn subscribe(
  consumer: Consumer,
  channel channel: Channel,
  queue queue: String,
  callback callback: fn(Payload, Deliver) -> Nil,
) -> Result(String, ConsumeError)

Start a consumer under supervision. Returns the consumer_tag string which can be used to unsubscribe later.

pub fn subscribe_with_options(
  consumer: Consumer,
  channel channel: Channel,
  queue queue: String,
  options options: List(QueueOption),
  callback callback: fn(Payload, Deliver) -> Nil,
) -> Result(String, ConsumeError)

Start a consumer with options under supervision. Returns the consumer_tag string which can be used to unsubscribe later.

pub fn unbind_exchange(
  channel channel: Channel,
  source source: String,
  destination destination: String,
  routing_key routing_key: String,
) -> Result(Nil, ExchangeError)

Unbind an exchange from another exchange.

pub fn unbind_exchange_async(
  channel channel: Channel,
  source source: String,
  destination destination: String,
  routing_key routing_key: String,
) -> Result(Nil, ExchangeError)

Unbind an exchange from another exchange asynchronously. Same semantics as unbind_exchange.

pub fn unbind_queue(
  channel channel: Channel,
  queue queue: String,
  exchange exchange: String,
  routing_key routing_key: String,
) -> Result(Nil, QueueError)

Unbind a queue from an exchange. The routing_key is used to filter messages from the exchange.

pub fn unsubscribe(
  channel channel: Channel,
  consumer_tag consumer_tag: String,
) -> Result(Nil, ConsumeError)

Unsubscribe and stop a consumer gracefully.

pub fn unsubscribe_async(
  channel channel: Channel,
  consumer_tag consumer_tag: String,
) -> Result(Nil, ConsumeError)

Unsubscribe a consumer asynchronously.

Search Document