View Source RabbitMQStream.Connection (rabbitmq_stream v0.4.1)
Responsible for encoding and decoding messages, opening and maintaining a socket connection to a single node. It connects to the RabbitMQ, and authenticates, and mantains the connection open with heartbeats.
Adding a connectiong to the supervision tree
You can define a connection with:
defmodule MyApp.MyConnection
use RabbitMQStream.Connection
end
Then you can add it to your supervision tree:
def start(_, _) do
children = [
{MyApp.MyConnection, username: "guest", password: "guest", host: "localhost", vhost: "/"},
# ...
]
opts = # ...
Supervisor.start_link(children, opts)
end
Connection configuration
The connection accept the following options:
username
- The username to use for authentication. Defaults toguest
.password
- The password to use for authentication. Defaults toguest
.host
- The host to connect to. Defaults tolocalhost
.port
- The port to connect to. Defaults to5552
.vhost
- The virtual host to use. Defaults to/
.frame_max
- The maximum frame size in Bytes. Defaults to1_048_576
.heartbeat
- The heartbeat interval in seconds. Defaults to60
.lazy
- Iftrue
, the connection won't starting until explicitly callingconnect/1
. Defaults tofalse
.
Consuming messages
You can consume messages by calling subscribe/5
:
{:ok, _subscription_id} = MyApp.MyConnection.subscribe("stream-01", self(), :next, 999)
Configuration
The configuration for the connection can be set in your config.exs
file:
config :rabbitmq_stream, MyApp.MyConnection,
username: "guest",
password: "guest"
# ...
You can override each configuration option by manually passing each configuration on the use
macro:
defmodule MyApp.MyConnection
use RabbitMQStream.Connection, username: "guest", password: "guest"
end
or when adding to the supervision tree:
def start(_, _) do
children = [
{MyApp.MyConnection, username: "guest", password: "guest"},
# ...
]
opts = # ...
Supervisor.start_link(children, opts)
end
The precedence order is is the same order as the examples above, from top to bottom.
Startup
Any call or cast to the connection while it is not connected will be buffered and executed once the connection is open.
Summary
Functions
Sends a 'close' command to the RabbitMQ Stream server, and waits for the response, before calling the 'close/1' callback on the transport.
Starts the connection process with the RabbitMQ Stream server, and waits until the authentication is complete.
Creates a stream with the given stream_name
. Returns an error when
the stream already exists.
Creates a Super Stream, with the specified partitions. The partitions is a Keyword list, where each key is the partition name, and the value is the routing key.
Adds the specified amount of credits to the subscription under the given subscription_id
.
Declares a producer on the stream with the producer_reference
key.
Unregisters the producer, under the provided id, from the server.
Deletes the stream with the given stream_name
. Returns an error when
the stream doesn't exist.
Deletes a Super Stream.
Lists all the partitions of a super stream.
Sends a message to the stream referenced by the provided 'producer_id'.
Queries the metadata for the provided streams, along side with a listing of all the brokers available in the cluster.
Queries the offset for the given stream_name
under offset_reference
.
Queries the sequence number for the producer_reference
, on the specified
stream_name
.
The server will sometimes send a request to the client, which we must send a response to.
Lists the names of all the streams of super stream, under the given routing_key
.
Stores an offset
on the given stream_name
under offset_reference
.
Queries the metadata information about a stream.
Starts consuming messages from the server, starting at the provided 'offset'.
Checks if the connected server supports the given command.
Unregisters a consumer from the connection.
Types
@type connection_option() :: {:username, String.t()} | {:password, String.t()} | {:host, String.t()} | {:port, non_neg_integer()} | {:vhost, String.t()} | {:frame_max, non_neg_integer()} | {:heartbeat, non_neg_integer()} | {:lazy, boolean()}
@type connection_options() :: [connection_option()]
@type offset() :: :first | :last | :next | {:offset, non_neg_integer()} | {:timestamp, integer()}
@type t() :: %RabbitMQStream.Connection{ close_reason: String.t() | atom() | nil, commands: %{ required(RabbitMQStream.Message.Helpers.command()) => %{ min: non_neg_integer(), max: non_neg_integer() } }, connect_requests: [pid()], connection_properties: Keyword.t(), correlation_sequence: non_neg_integer(), frames_buffer: RabbitMQStream.Message.Buffer.t(), internal_buffer: :queue.queue({atom(), atom(), [{atom(), term()}]}), mechanisms: [String.t()], options: connection_options(), peer_properties: %{required(String.t()) => term()}, producer_sequence: non_neg_integer(), request_buffer: :queue.queue({term(), pid()}), request_tracker: %{required({atom(), integer()}) => {pid(), any()}}, socket: :gen_tcp.socket(), state: :closed | :connecting | :open | :closing, subscriber_sequence: non_neg_integer(), subscriptions: %{required(non_neg_integer()) => pid()}, transport: RabbitMQStream.Connection.Transport.t(), user_buffer: :queue.queue({atom(), atom(), [{atom(), term()}]}) }
Functions
Sends a 'close' command to the RabbitMQ Stream server, and waits for the response, before calling the 'close/1' callback on the transport.
Starts the connection process with the RabbitMQ Stream server, and waits until the authentication is complete.
If the authentication process has already been started by other process, this call waits for it to complete before return the result.
Creates a stream with the given stream_name
. Returns an error when
the stream already exists.
Creates a Super Stream, with the specified partitions. The partitions is a Keyword list, where each key is the partition name, and the value is the routing key.
When publishing a message through a RabbitMQStream.SuperProducer, you can implement the
the routing_key/2
callback to define the routing key for each message.
Requires RabbitMQ 3.13.0 or later.
Example:
RabbitMQStream.Connection.create_super_stream(conn, "transactions",
"route-A": ["stream-01", "stream-02"],
"route-B": ["stream-03", "stream-04"]
)
Adds the specified amount of credits to the subscription under the given subscription_id
.
This function always returns :ok
as the RabbitMQ Server only sends a response if the command fails,
which only happens if the subscription is not found. In that case the error is logged.
Declares a producer on the stream with the producer_reference
key.
RabbitMQ expects a producer to be declare to prevent message duplication,
by tracking the sequence
number, which must be sent with each message.c
You can use the query_producer_sequence/3
the query a producer's sequence
number tracked by the server.
It returns an id that identifies this producer. This id is only valid for this connection, as other connections might have the same id for different producers.
Unregisters the producer, under the provided id, from the server.
Deletes the stream with the given stream_name
. Returns an error when
the stream doesn't exist.
Deletes a Super Stream.
Requires RabbitMQ 3.13.0 or later.
Lists all the partitions of a super stream.
Requires RabbitMQ 3.13.0 or later.
publish(server, producer_id, publishing_id, message, filter_value \\ nil)
View SourceSends a message to the stream referenced by the provided 'producer_id'.
The 'publishing_id' must be unique for the given producer, or the message will be ignored/dropped by the server.
Starting at 3.13.x, you can optionally provide a 'filter_value' parameter, which is used by the server to filter the messages to be sent to a consumer that have provided a 'filter' parameter.
Queries the metadata for the provided streams, along side with a listing of all the brokers available in the cluster.
If a stream doesn't exist, it stills returns an entry for it, but
with a :stream_does_not_exist
code.
Queries the offset for the given stream_name
under offset_reference
.
Queries the sequence number for the producer_reference
, on the specified
stream_name
.
All messages sent to the server must have a distinct sequence number, which is tracked by the server.
The server will sometimes send a request to the client, which we must send a response to.
And example request is the 'ConsumerUpdate', where the server expects a response with the offset. So the connection sends the request to the subscription handler, which then calls this function to send the response back to the server.
Lists the names of all the streams of super stream, under the given routing_key
.
Requires RabbitMQ 3.13.0 or later.
Stores an offset
on the given stream_name
under offset_reference
.
This appends an offset
to the stream, which can be retrieved later using
with query_offset/3
, by providing the same offset_reference
.
Queries the metadata information about a stream.
subscribe(server, stream_name, pid, offset, credit, properties \\ [])
View SourceStarts consuming messages from the server, starting at the provided 'offset'.
The connection wills start send the messages to the provided 'pid' with the following format:
def handle_info({:deliver, %RabbitMQStream.Message.Types.DeliverData{}}, _) do
# ...
end
You can optionally provide properties when declaring the subscription. The avaiable options are the following:
:single_active_consumer
: set totrue
to enable single active consumer for this subscription.:super_stream
: set to the name of the super stream the subscribed is a partition of.:filter
: List of strings that define the value of the filter_key to match.:match_unfiltered
: whether to return messages without any filter value or not.
Be aware that a filter value is registered per message, and the server uses a Bloom Filter to check if a chunk has messages that match a filter. But this filter might give false positives, and not all the messages of a chunk might match the filter. So additional filtering on by the User might be necessary.
Checks if the connected server supports the given command.
Unregisters a consumer from the connection.