View Source RabbitMQStream.Connection (rabbitmq_stream v0.4.1)

Responsible for encoding and decoding messages, opening and maintaining a socket connection to a single node. It connects to the RabbitMQ, and authenticates, and mantains the connection open with heartbeats.

Adding a connectiong to the supervision tree

You can define a connection with:

defmodule MyApp.MyConnection
  use RabbitMQStream.Connection
end

Then you can add it to your supervision tree:

def start(_, _) do
  children = [
    {MyApp.MyConnection, username: "guest", password: "guest", host: "localhost", vhost: "/"},
    # ...
  ]

  opts = # ...
  Supervisor.start_link(children, opts)
end

Connection configuration

The connection accept the following options:

  • username - The username to use for authentication. Defaults to guest.
  • password - The password to use for authentication. Defaults to guest.
  • host - The host to connect to. Defaults to localhost.
  • port - The port to connect to. Defaults to 5552.
  • vhost - The virtual host to use. Defaults to /.
  • frame_max - The maximum frame size in Bytes. Defaults to 1_048_576.
  • heartbeat - The heartbeat interval in seconds. Defaults to 60.
  • lazy - If true, the connection won't starting until explicitly calling connect/1. Defaults to false.

Consuming messages

You can consume messages by calling subscribe/5:

{:ok, _subscription_id} = MyApp.MyConnection.subscribe("stream-01", self(), :next, 999)

Configuration

The configuration for the connection can be set in your config.exs file:

config :rabbitmq_stream, MyApp.MyConnection,
  username: "guest",
  password: "guest"
  # ...

You can override each configuration option by manually passing each configuration on the use macro:

defmodule MyApp.MyConnection
  use RabbitMQStream.Connection, username: "guest", password: "guest"
end

or when adding to the supervision tree:

def start(_, _) do
  children = [
    {MyApp.MyConnection, username: "guest", password: "guest"},
    # ...
  ]

  opts = # ...
  Supervisor.start_link(children, opts)
end

The precedence order is is the same order as the examples above, from top to bottom.

Startup

Any call or cast to the connection while it is not connected will be buffered and executed once the connection is open.

Summary

Functions

Sends a 'close' command to the RabbitMQ Stream server, and waits for the response, before calling the 'close/1' callback on the transport.

Starts the connection process with the RabbitMQ Stream server, and waits until the authentication is complete.

Creates a stream with the given stream_name. Returns an error when the stream already exists.

Creates a Super Stream, with the specified partitions. The partitions is a Keyword list, where each key is the partition name, and the value is the routing key.

Adds the specified amount of credits to the subscription under the given subscription_id.

Declares a producer on the stream with the producer_reference key.

Unregisters the producer, under the provided id, from the server.

Deletes the stream with the given stream_name. Returns an error when the stream doesn't exist.

Deletes a Super Stream.

Lists all the partitions of a super stream.

Sends a message to the stream referenced by the provided 'producer_id'.

Queries the metadata for the provided streams, along side with a listing of all the brokers available in the cluster.

Queries the offset for the given stream_name under offset_reference.

Queries the sequence number for the producer_reference, on the specified stream_name.

The server will sometimes send a request to the client, which we must send a response to.

Lists the names of all the streams of super stream, under the given routing_key.

Stores an offset on the given stream_name under offset_reference.

Queries the metadata information about a stream.

Starts consuming messages from the server, starting at the provided 'offset'.

Checks if the connected server supports the given command.

Unregisters a consumer from the connection.

Types

@type connection_option() ::
  {:username, String.t()}
  | {:password, String.t()}
  | {:host, String.t()}
  | {:port, non_neg_integer()}
  | {:vhost, String.t()}
  | {:frame_max, non_neg_integer()}
  | {:heartbeat, non_neg_integer()}
  | {:lazy, boolean()}
@type connection_options() :: [connection_option()]
@type offset() ::
  :first
  | :last
  | :next
  | {:offset, non_neg_integer()}
  | {:timestamp, integer()}
@type t() :: %RabbitMQStream.Connection{
  close_reason: String.t() | atom() | nil,
  commands: %{
    required(RabbitMQStream.Message.Helpers.command()) => %{
      min: non_neg_integer(),
      max: non_neg_integer()
    }
  },
  connect_requests: [pid()],
  connection_properties: Keyword.t(),
  correlation_sequence: non_neg_integer(),
  frames_buffer: RabbitMQStream.Message.Buffer.t(),
  internal_buffer: :queue.queue({atom(), atom(), [{atom(), term()}]}),
  mechanisms: [String.t()],
  options: connection_options(),
  peer_properties: %{required(String.t()) => term()},
  producer_sequence: non_neg_integer(),
  request_buffer: :queue.queue({term(), pid()}),
  request_tracker: %{required({atom(), integer()}) => {pid(), any()}},
  socket: :gen_tcp.socket(),
  state: :closed | :connecting | :open | :closing,
  subscriber_sequence: non_neg_integer(),
  subscriptions: %{required(non_neg_integer()) => pid()},
  transport: RabbitMQStream.Connection.Transport.t(),
  user_buffer: :queue.queue({atom(), atom(), [{atom(), term()}]})
}

Functions

Link to this function

close(server, reason \\ "", code \\ 0)

View Source

Sends a 'close' command to the RabbitMQ Stream server, and waits for the response, before calling the 'close/1' callback on the transport.

Starts the connection process with the RabbitMQ Stream server, and waits until the authentication is complete.

If the authentication process has already been started by other process, this call waits for it to complete before return the result.

Link to this function

create_stream(server, name, arguments \\ [])

View Source

Creates a stream with the given stream_name. Returns an error when the stream already exists.

Link to this function

create_super_stream(server, name, partitions, arguments \\ [])

View Source

Creates a Super Stream, with the specified partitions. The partitions is a Keyword list, where each key is the partition name, and the value is the routing key.

When publishing a message through a RabbitMQStream.SuperProducer, you can implement the the routing_key/2 callback to define the routing key for each message.

Requires RabbitMQ 3.13.0 or later.

Example:

RabbitMQStream.Connection.create_super_stream(conn, "transactions",
  "route-A": ["stream-01", "stream-02"],
  "route-B": ["stream-03", "stream-04"]
)
Link to this function

credit(server, subscription_id, credit)

View Source

Adds the specified amount of credits to the subscription under the given subscription_id.

This function always returns :ok as the RabbitMQ Server only sends a response if the command fails, which only happens if the subscription is not found. In that case the error is logged.

Link to this function

declare_producer(server, stream_name, producer_reference)

View Source

Declares a producer on the stream with the producer_reference key.

RabbitMQ expects a producer to be declare to prevent message duplication, by tracking the sequence number, which must be sent with each message.c

You can use the query_producer_sequence/3 the query a producer's sequence number tracked by the server.

It returns an id that identifies this producer. This id is only valid for this connection, as other connections might have the same id for different producers.

Link to this function

delete_producer(server, producer_id)

View Source

Unregisters the producer, under the provided id, from the server.

Link to this function

delete_stream(server, name)

View Source

Deletes the stream with the given stream_name. Returns an error when the stream doesn't exist.

Link to this function

delete_super_stream(server, name)

View Source

Deletes a Super Stream.

Requires RabbitMQ 3.13.0 or later.

Link to this function

partitions(server, super_stream)

View Source

Lists all the partitions of a super stream.

Requires RabbitMQ 3.13.0 or later.

Link to this function

publish(server, producer_id, publishing_id, message, filter_value \\ nil)

View Source

Sends a message to the stream referenced by the provided 'producer_id'.

The 'publishing_id' must be unique for the given producer, or the message will be ignored/dropped by the server.

Starting at 3.13.x, you can optionally provide a 'filter_value' parameter, which is used by the server to filter the messages to be sent to a consumer that have provided a 'filter' parameter.

Link to this function

query_metadata(server, streams)

View Source

Queries the metadata for the provided streams, along side with a listing of all the brokers available in the cluster.

If a stream doesn't exist, it stills returns an entry for it, but with a :stream_does_not_exist code.

Link to this function

query_offset(server, stream_name, offset_reference)

View Source

Queries the offset for the given stream_name under offset_reference.

Link to this function

query_producer_sequence(server, stream_name, producer_reference)

View Source

Queries the sequence number for the producer_reference, on the specified stream_name.

All messages sent to the server must have a distinct sequence number, which is tracked by the server.

Link to this function

respond(server, request, opts)

View Source

The server will sometimes send a request to the client, which we must send a response to.

And example request is the 'ConsumerUpdate', where the server expects a response with the offset. So the connection sends the request to the subscription handler, which then calls this function to send the response back to the server.

Link to this function

route(server, routing_key, super_stream)

View Source

Lists the names of all the streams of super stream, under the given routing_key.

Requires RabbitMQ 3.13.0 or later.

Link to this function

store_offset(server, stream_name, offset_reference, offset)

View Source

Stores an offset on the given stream_name under offset_reference.

This appends an offset to the stream, which can be retrieved later using with query_offset/3, by providing the same offset_reference.

Link to this function

stream_stats(server, stream_name)

View Source

Queries the metadata information about a stream.

Link to this function

subscribe(server, stream_name, pid, offset, credit, properties \\ [])

View Source

Starts consuming messages from the server, starting at the provided 'offset'.

The connection wills start send the messages to the provided 'pid' with the following format:

def handle_info({:deliver, %RabbitMQStream.Message.Types.DeliverData{}}, _) do
  # ...
end

You can optionally provide properties when declaring the subscription. The avaiable options are the following:

  • :single_active_consumer: set to true to enable single active consumer for this subscription.
  • :super_stream: set to the name of the super stream the subscribed is a partition of.
  • :filter: List of strings that define the value of the filter_key to match.
  • :match_unfiltered: whether to return messages without any filter value or not.

Be aware that a filter value is registered per message, and the server uses a Bloom Filter to check if a chunk has messages that match a filter. But this filter might give false positives, and not all the messages of a chunk might match the filter. So additional filtering on by the User might be necessary.

Link to this function

supports?(server, command, version \\ 1)

View Source

Checks if the connected server supports the given command.

Link to this function

unsubscribe(server, subscription_id)

View Source

Unregisters a consumer from the connection.