View Source RabbitMQStream.Producer (rabbitmq_stream v0.4.2)
RabbitMQStream.Producer allows you to define modules or processes that publish messages to a single stream.
Defining a producer Module
A standalone producer module can be defined with:
defmodule MyApp.MyProducer do
use RabbitMQStream.Producer,
stream_name: "my-stream",
connection: MyApp.MyConnection
endAfter adding it to your supervision tree, you can publish messages with:
MyApp.MyProducer.publish("Hello, world!")You can add the producer to your supervision tree as follows this:
def start(_, _) do
children = [
# ...
MyApp.MyProducer
]
opts = # ...
Supervisor.start_link(children, opts)
endThe standalone producer starts its own RabbitMQStream.Connection, declaring itself and fetching its most recent publishing_id, and declaring the stream, if it does not exist.
Configuration
The RabbitMQStream.Producer accepts the following options:
:stream_name- The name of the stream to publish to. Required.:reference_name- The string which is used by the server to prevent Duplicate Message. Defaults to__MODULE__.Producer. (If clustering in production, check notes and the end.):connection- The identifier for aRabbitMQStream.Connection.:serializer- The module to use to decode the message. Defaults tonil, which means no encoding is done.
You can also declare the configuration in your config.exs:
config :rabbitmq_stream, MyApp.MyProducer,
stream_name: "my-stream",
connection: MyApp.MyConnectionSetting up
You can optionally define a before_start/2 callback to perform setup logic, such as creating the stream, if it doesn't yet exists.
defmodule MyApp.MyProducer do
use RabbitMQStream.Producer,
stream_name: "my-stream",
connection: MyApp.MyConnection
@impl true
def before_start(_opts, state) do
# Create the stream
RabbitMQStream.Connection.create_stream(state.connection, state.stream_name)
state
end
endConfiguration
You can configure each Producer with:
config :rabbitmq_stream, MyApp.MyProducer,
stream_name: "my-stream",
connection: MyApp.MyConnectionAnd also you can override the defaults of all producers with:
config :rabbitmq_stream, :defaults,
producer: [
connection: MyApp.MyConnection,
# ...
]
serializer: JasonGlobally configuring all producers ignores the following options:
:stream_name:reference_name
Encoding
You can declare a function for encoding each message by declaring a encode!/1 callback as follows:alarm_handler
defmodule MyApp.MyProducer do
use RabbitMQStream.Producer,
stream_name: "my-stream",
connection: MyApp.MyConnection
@impl true
def encode!(message) do
Jason.encode!(message)
end
endOr by passing a :serializer option to the use macro:
defmodule MyApp.MyProducer do
use RabbitMQStream.Producer,
stream_name: "my-stream",
connection: MyApp.MyConnection,
serializer: Jason
endThe default value for the :serializer is the module itself, unless a default is defined at a higher level of the
configuration. If there is a encode!/1 callback defined, it is always used
Notes on Clustering
Be aware that the sequence tracking for each :reference_name is global. Meaning the if you are running
your Elixir as a cluster of multiple nodes, and each having a process of a Producer with the same
:reference_name, you may encounter issues with message de-duplication, where messages are being
dropped because the sequence on each producer's state might not be up to date after another process
with the same :reference_name produced a message.
There might be cases where you would want this behaviour. If not, be sure to declare a unique
:reference_name for each process.
Summary
Functions
Publishes a binary message to the stream.
Types
@type option() :: {:stream_name, String.t()} | {:reference_name, String.t()} | {:connection, GenServer.server()}
@type options() :: [option()]
@type t() :: %RabbitMQStream.Producer{ connection: GenServer.server(), id: String.t() | nil, producer_module: module() | nil, publishing_id: String.t(), reference_name: String.t(), sequence: non_neg_integer() | nil, stream_name: String.t() }
Functions
@spec publish(GenServer.server(), message :: binary(), filter_value :: binary() | nil) :: :ok
Publishes a binary message to the stream.
You can optionally provide a filter_value parameter, as of RabbitMQ 3.13.
The callback always returns :ok, as the server only send a response
for a publish in case of an error, in which case the error code is logged.