View Source OffBroadway.Jetstream.Producer (jetstream v0.0.8-alpha2)
A GenStage producer meant to work with Broadway. It continuously receives messages from a NATS JetStream and acknowledges them after being successfully processed.
Options
Connection options
The following options are mandatory.
connection_name- The name of Gnat process or Gnat connection supervisor.stream_name- The name of stream to consume from.consumer_name- The name of consumer.
Optional options:
connection_retry_timeout- time in milliseconds, after which the failing connection will retry. Defaults to1000.connection_retries- number of failing connection retries the producer will make before shutting down. Defaults to10.inbox_prefix- custom prefix for listening topic. Defaults to_INBOX..
Message pulling options
receive_interval- The duration in milliseconds for which the producer waits before making a request for more messages. Defaults to5000.receive_timeout- The maximum time to wait for NATS Jetstream to respond with a requested message. Defaults to:infinity.
Acknowledger options
These options are passed to the acknowledger on its initialization.
:on_success- Configures the behaviour for successful messages. Defaults to:ack.:on_failure- Configures the behaviour for failed messages. Defaults to:nack.
Acknowledgements
By default, successful messages are acknowledged, and failed ones are nack'ed. You can
change this behaviour by setting the :on_success or/and :on_failure options.
You can also change the acknowledgement action for individual messages using the
Broadway.Message.configure_ack/2 function.
The supported options are:
:ack- Acknowledges a message was completely handled.:nack- Signals that the message will not be processed now and will be redelivered.:term- Tells the server to stop redelivery of a message without acknowledging it.
Example
Example Broadway module definition:
defmodule MyBroadway do
use Broadway
def start_link(_opts) do
Broadway.start_link(
__MODULE__,
name: MyBroadway,
producer: [
module: {
OffBroadway.Jetstream.Producer,
connection_name: :gnat,
stream_name: "TEST_STREAM",
consumer_name: "TEST_CONSUMER"
},
concurrency: 10
],
processors: [
default: [concurrency: 10]
],
batchers: [
example: [
concurrency: 5,
batch_size: 10,
batch_timeout: 2_000
]
]
)
end
def handle_message(_processor_name, message, _context) do
message
|> Message.update_data(&process_data/1)
|> Message.put_batcher(:example)
end
defp process_data(data) do
# Some data processing
end
def handle_batch(:example, messages, _batch_info, _context) do
# Do something with batch messages
end
endLearn more about available options in Broadway documentation.
Once you have your Broadway pipeline defined, you can add it to your supervision tree:
children = [
{MyBroadway, []}
]
Supervisor.start_link(children, strategy: :one_for_one)