View Source RabbitMQStream.Producer (rabbitmq_stream v0.4.2)

RabbitMQStream.Producer allows you to define modules or processes that publish messages to a single stream.

Defining a producer Module

A standalone producer module can be defined with:

defmodule MyApp.MyProducer do
  use RabbitMQStream.Producer,
    stream_name: "my-stream",
    connection: MyApp.MyConnection
end

After adding it to your supervision tree, you can publish messages with:

MyApp.MyProducer.publish("Hello, world!")

You can add the producer to your supervision tree as follows this:

def start(_, _) do
  children = [
    # ...
    MyApp.MyProducer
  ]

  opts = # ...
  Supervisor.start_link(children, opts)
end

The standalone producer starts its own RabbitMQStream.Connection, declaring itself and fetching its most recent publishing_id, and declaring the stream, if it does not exist.

Configuration

The RabbitMQStream.Producer accepts the following options:

  • :stream_name - The name of the stream to publish to. Required.
  • :reference_name - The string which is used by the server to prevent Duplicate Message. Defaults to __MODULE__.Producer. (If clustering in production, check notes and the end.)
  • :connection - The identifier for a RabbitMQStream.Connection.
  • :serializer - The module to use to decode the message. Defaults to nil, which means no encoding is done.

You can also declare the configuration in your config.exs:

config :rabbitmq_stream, MyApp.MyProducer,
  stream_name: "my-stream",
  connection: MyApp.MyConnection

Setting up

You can optionally define a before_start/2 callback to perform setup logic, such as creating the stream, if it doesn't yet exists.

defmodule MyApp.MyProducer do
  use RabbitMQStream.Producer,
    stream_name: "my-stream",
    connection: MyApp.MyConnection

  @impl true
  def before_start(_opts, state) do
    # Create the stream
    RabbitMQStream.Connection.create_stream(state.connection, state.stream_name)

    state
  end
end

Configuration

You can configure each Producer with:

config :rabbitmq_stream, MyApp.MyProducer,
  stream_name: "my-stream",
  connection: MyApp.MyConnection

And also you can override the defaults of all producers with:

  config :rabbitmq_stream, :defaults,
    producer: [
      connection: MyApp.MyConnection,
      # ...
    ]
    serializer: Jason

Globally configuring all producers ignores the following options:

  • :stream_name
  • :reference_name

Encoding

You can declare a function for encoding each message by declaring a encode!/1 callback as follows:alarm_handler

defmodule MyApp.MyProducer do
  use RabbitMQStream.Producer,
    stream_name: "my-stream",
    connection: MyApp.MyConnection

  @impl true
  def encode!(message) do
    Jason.encode!(message)
  end
end

Or by passing a :serializer option to the use macro:

defmodule MyApp.MyProducer do
  use RabbitMQStream.Producer,
    stream_name: "my-stream",
    connection: MyApp.MyConnection,
    serializer: Jason
end

The default value for the :serializer is the module itself, unless a default is defined at a higher level of the configuration. If there is a encode!/1 callback defined, it is always used

Notes on Clustering

Be aware that the sequence tracking for each :reference_name is global. Meaning the if you are running your Elixir as a cluster of multiple nodes, and each having a process of a Producer with the same :reference_name, you may encounter issues with message de-duplication, where messages are being dropped because the sequence on each producer's state might not be up to date after another process with the same :reference_name produced a message.

There might be cases where you would want this behaviour. If not, be sure to declare a unique :reference_name for each process.

Summary

Types

@type option() ::
  {:stream_name, String.t()}
  | {:reference_name, String.t()}
  | {:connection, GenServer.server()}
@type options() :: [option()]
@type t() :: %RabbitMQStream.Producer{
  connection: GenServer.server(),
  id: String.t() | nil,
  producer_module: module() | nil,
  publishing_id: String.t(),
  reference_name: String.t(),
  sequence: non_neg_integer() | nil,
  stream_name: String.t()
}

Functions

Link to this function

publish(server, message, filter_value \\ nil)

View Source
@spec publish(GenServer.server(), message :: binary(), filter_value :: binary() | nil) ::
  :ok

Publishes a binary message to the stream.

You can optionally provide a filter_value parameter, as of RabbitMQ 3.13.

The callback always returns :ok, as the server only send a response for a publish in case of an error, in which case the error code is logged.