View Source RabbitMQStream.Producer.Behaviour behaviour (rabbitmq_stream v0.4.2)

Summary

Callbacks

Optional callback that is called after the process has started, but before the producer has declared itself and fetched its most recent publishing_id.

Callback responsible for encoding a message into binary format.

Callback responsible for generating the 'filter_value' for a message. The value is used by the Server for filtering the chunks sent to consumer that have defined a 'filter' parameter.

Publishes a single message to the stream.

Callbacks

Link to this callback

before_start(opts, state)

View Source (optional)
@callback before_start(
  opts :: RabbitMQStream.Producer.options(),
  state :: RabbitMQStream.Producer.t()
) ::
  RabbitMQStream.Producer.t()

Optional callback that is called after the process has started, but before the producer has declared itself and fetched its most recent publishing_id.

This is usefull for setup logic, such as creating the Stream if it doesn't yet exists.

@callback encode!(message :: term()) :: binary()

Callback responsible for encoding a message into binary format.

It can be used in many ways, such as to 'Jason.encode!/1' a Map, or to Gzip Compact a message before it is appended to the Stream.

Link to this callback

filter_value(message)

View Source (optional)
@callback filter_value(message :: term()) :: String.t() | nil

Callback responsible for generating the 'filter_value' for a message. The value is used by the Server for filtering the chunks sent to consumer that have defined a 'filter' parameter.

The callback is invoked before the message is encoded, by the serializer options.

Example defintion:

  @impl true
  def filter_value(message) do
    message["key"]
  end

The default implementation defines nil as the filter_value for all messages.

@callback publish(message :: term()) :: :ok

Publishes a single message to the stream.

The message first passes through the 'encode!/1' callback, before being sent to the RabbitMQStream.Producer process responsible for sending the binary message to the Connection.

As of RabbitMQ 3.13.x, the mesagge's 'filter_value' is generated by passing it through the 'filter_value/1' callback.

The callback always returns ':ok', as the server only send a response for a publish in case of an error, in which case the error code is logged.