View Source OffBroadway.Jetstream.Producer (jetstream v0.0.9)

A GenStage producer meant to work with Broadway. It continuously receives messages from a NATS JetStream and acknowledges them after being successfully processed.

Options

Connection options

The following options are mandatory.

  • connection_name - The name of Gnat process or Gnat connection supervisor.

  • stream_name - The name of stream to consume from.

  • consumer_name - The name of consumer.

Optional options:

  • connection_retry_timeout - time in milliseconds, after which the failing connection will retry. Defaults to 1000.

  • connection_retries - number of failing connection retries the producer will make before shutting down. Defaults to 10.

  • inbox_prefix - custom prefix for listening topic. Defaults to _INBOX..

  • domain - Jetstream domain.

Message pulling options

  • receive_interval - The duration in milliseconds for which the producer waits before making a request for more messages. Defaults to 5000.

  • receive_timeout - The maximum time to wait for NATS Jetstream to respond with a requested message. Defaults to :infinity.

Acknowledger options

These options are passed to the acknowledger on its initialization.

  • :on_success - Configures the behaviour for successful messages. Defaults to :ack.

  • :on_failure - Configures the behaviour for failed messages. Defaults to :nack.

Acknowledgements

By default, successful messages are acknowledged, and failed ones are nack'ed. You can change this behaviour by setting the :on_success or/and :on_failure options.

You can also change the acknowledgement action for individual messages using the Broadway.Message.configure_ack/2 function.

The supported options are:

  • :ack - Acknowledges a message was completely handled.

  • :nack - Signals that the message will not be processed now and will be redelivered.

  • :term - Tells the server to stop redelivery of a message without acknowledging it.

Example

Example Broadway module definition:

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(
      __MODULE__,
      name: MyBroadway,
      producer: [
        module: {
          OffBroadway.Jetstream.Producer,
          connection_name: :gnat,
          stream_name: "TEST_STREAM",
          consumer_name: "TEST_CONSUMER"
        },
        concurrency: 10
      ],
      processors: [
        default: [concurrency: 10]
      ],
      batchers: [
        example: [
          concurrency: 5,
          batch_size: 10,
          batch_timeout: 2_000
        ]
      ]
    )
  end

  def handle_message(_processor_name, message, _context) do
    message
    |> Message.update_data(&process_data/1)
    |> Message.put_batcher(:example)
  end

  defp process_data(data) do
    # Some data processing
  end

  def handle_batch(:example, messages, _batch_info, _context) do
    # Do something with batch messages
  end

end

Learn more about available options in Broadway documentation.

Once you have your Broadway pipeline defined, you can add it to your supervision tree:

children = [
  {MyBroadway, []}
]

Supervisor.start_link(children, strategy: :one_for_one)