View Source RabbitMQStream.Consumer (rabbitmq_stream v0.4.2)
Used to declare a Persistent Consumer module. Then the handle_message/1 callback will be invoked
for each message received.
Usage
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first
@impl true
def handle_message(_message) do
# ...
:ok
end
endParameters
:connection- The connection module to use. This is required.:stream_name- The name of the stream to consume. This is required.:initial_offset- The initial offset. This is required.:initial_credit- The initial credit to request from the server. Defaults to50_000.:offset_tracking- Offset tracking strategies to use. Defaults to[count: [store_after: 50]].:flow_control- Flow control strategy to use. Defaults to[count: [credit_after: {:count, 1}]].:offset_reference-:private- Private data that can hold any value, and is passed to thehandle_message/2callback.:serializer- The module to use to decode the message. Defaults to__MODULE__, which means that the consumer will use thedecode!/1callback to decode the message, which is implemented by default to return the message as is.:properties- Define the properties of the subscription. Can only have one option at a time.:single_active_consumer: set totrueto enable single active consumer for this subscription.:super_stream: set to the name of the super stream the subscribed is a partition of.:filter: List of strings that define the value of the filter_key to match.:match_unfiltered: whether to return messages without any filter value or not.
It is also possible to process chunks by implementing the handle_chunk/1 or handle_chunk/2 callbacks.
Offset Tracking
The consumer is able to track its progress in the stream by storing its latests offset in the stream. Check [Offset Tracking with RabbitMQ Streams(https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/) for more information on how offset tracking works.
The consumer can be configured to use different offset tracking strategies, which decide when to
store the offset in the stream. You can implement your own strategy by implementing the
RabbitMQStream.Consumer.OffsetTracking behaviour, and passing it to the :offset_tracking option.
It defaults to RabbitMQStream.Consumer.OffsetTracking.CountStrategy, which stores the offset after,
by default, every 50_000 messages.
You can use the default strategies by passing a shorthand alias:
interval:RabbitMQStream.Consumer.OffsetTracking.IntervalStrategycount:RabbitMQStream.Consumer.OffsetTracking.CountStrategyuse RabbitMQStream.Consumer,
connection: MyApp.MyConnection, stream_name: "my_stream", initial_offset: :first, offset_tracking: [ count: [store_after: 50], interval: [store_after: 5_000] ]
Flow Control
The RabbitMQ Streams server requires that the consumer declares how many messages it is able to process at a time. This is done by informing an amount of 'credits' to the server. After every chunk is sent, one credit is consumed, and the server will send messages only if there are credits available.
We can configure the consumer to automatically request more credits based on a strategy.
By default it uses the RabbitMQStream.Consumer.FlowControl.MessageCount, which
requests 1 additional credit for every 1 processed chunk. Please check the
RabbitMQStream.Consumer.FlowControl.MessageCount module for more information.
You can also call RabbitMQStream.Consumer.credit/2 to manually add more credits to the subscription,
or implement your own strategy by implementing the RabbitMQStream.Consumer.FlowControl behaviour,
and passing it to the :flow_control option.
You can find more information on the RabbitMQ Streams documentation.
If you want an external process to be fully in control of the flow control of a consumer, you can
set the :flow_control option to false. Then you can call RabbitMQStream.Consumer.credit/2 to
manually add more credits to the subscription.
Configuration
You can configure each consumer with:
config :rabbitmq_stream, MyApp.MyConsumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first,
initial_credit: 50_000,
offset_tracking: [count: [store_after: 50]],
flow_control: [count: [credit_after: {:count, 1}]],
serializer: JasonThese options are overriden by the options passed to the use macro, which are overriden by the
options passed to start_link/1.
And also you can override the defaults of all consumers with:
config :rabbitmq_stream, :defaults,
consumer: [
connection: MyApp.MyConnection,
initial_credit: 50_000,
# ...
],Globally configuring all consumers ignores the following options:
:stream_name:offset_reference:private
Decoding
You can declare a function for decoding each message by declaring a decode!/1 callback as follows:
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first
@impl true
def decode!(message) do
Jason.decode!(message)
end
endOr by passing a :serializer option to the use macro:
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first,
serializer: Jason
endThe default value for the :serializer is the module itself, unless a default is defined at a higher
level of the configuration. If there is a decode!/1 callback defined, it is always used
Properties
You can provide additional properties to the consumer to change its behavior, by passing :properties.
Single active consumer
To use it, you must provide a "group_name". The server manages each consumer so the only one will of each group will be receiving chunks at a time.
Although there is only one Consumer active, we must provide the server the offset a consumer starts
on when being upgraded to the being the active one. To do so you must implement the handle_update/2
callback, which must return a {:ok, offset} tuple.
@impl true
def handle_update(_, :upgrade) do
{:ok, :last}
end
@impl true
def handle_update(_, :downgrade) do
# Must return something when being downgraded, but it is not used by the server.
# Could be useful to use some external logic to persist the offset somewhere,
# so that it can be queried by the other consumer that is being upgraded
{:ok, :last}
end
Summary
Types
@type option() :: {:offset_reference, String.t()} | {:connection, GenServer.server()} | {:stream_name, String.t()} | {:initial_offset, RabbitMQStream.Connection.offset()} | {:initial_credit, non_neg_integer()} | {:offset_tracking, [{RabbitMQStream.Consumer.OffsetTracking.t(), term()}]} | {:flow_control, {RabbitMQStream.Consumer.FlowControl.t(), term()}} | {:private, any()} | {:properties, [RabbitMQStream.Message.Types.ConsumerequestData.property()]}
@type opts() :: [option()]
@type t() :: %RabbitMQStream.Consumer{ connection: GenServer.server(), consumer_module: module(), credits: non_neg_integer(), flow_control: {RabbitMQStream.Consumer.FlowControl.t(), term()}, id: non_neg_integer() | nil, initial_credit: non_neg_integer(), initial_offset: RabbitMQStream.Connection.offset(), last_offset: non_neg_integer(), offset_reference: String.t(), offset_tracking: [{RabbitMQStream.Consumer.OffsetTracking.t(), term()}], private: any(), properties: [RabbitMQStream.Message.Types.ConsumerequestData.property()], stream_name: String.t() }
Functions
@spec credit(GenServer.server(), amount :: non_neg_integer()) :: :ok
@spec get_credits(GenServer.server()) :: non_neg_integer()
@spec store_offset(GenServer.server()) :: :ok