View Source OffBroadway.Jetstream.Producer (jetstream v0.0.9)
A GenStage producer meant to work with Broadway. It continuously receives messages from a NATS JetStream and acknowledges them after being successfully processed.
Options
Connection options
The following options are mandatory.
connection_name
- The name of Gnat process or Gnat connection supervisor.stream_name
- The name of stream to consume from.consumer_name
- The name of consumer.
Optional options:
connection_retry_timeout
- time in milliseconds, after which the failing connection will retry. Defaults to1000
.connection_retries
- number of failing connection retries the producer will make before shutting down. Defaults to10
.inbox_prefix
- custom prefix for listening topic. Defaults to_INBOX.
.domain
- Jetstream domain.
Message pulling options
receive_interval
- The duration in milliseconds for which the producer waits before making a request for more messages. Defaults to5000
.receive_timeout
- The maximum time to wait for NATS Jetstream to respond with a requested message. Defaults to:infinity
.
Acknowledger options
These options are passed to the acknowledger on its initialization.
:on_success
- Configures the behaviour for successful messages. Defaults to:ack
.:on_failure
- Configures the behaviour for failed messages. Defaults to:nack
.
Acknowledgements
By default, successful messages are acknowledged, and failed ones are nack'ed. You can
change this behaviour by setting the :on_success
or/and :on_failure
options.
You can also change the acknowledgement action for individual messages using the
Broadway.Message.configure_ack/2
function.
The supported options are:
:ack
- Acknowledges a message was completely handled.:nack
- Signals that the message will not be processed now and will be redelivered.:term
- Tells the server to stop redelivery of a message without acknowledging it.
Example
Example Broadway module definition:
defmodule MyBroadway do
use Broadway
def start_link(_opts) do
Broadway.start_link(
__MODULE__,
name: MyBroadway,
producer: [
module: {
OffBroadway.Jetstream.Producer,
connection_name: :gnat,
stream_name: "TEST_STREAM",
consumer_name: "TEST_CONSUMER"
},
concurrency: 10
],
processors: [
default: [concurrency: 10]
],
batchers: [
example: [
concurrency: 5,
batch_size: 10,
batch_timeout: 2_000
]
]
)
end
def handle_message(_processor_name, message, _context) do
message
|> Message.update_data(&process_data/1)
|> Message.put_batcher(:example)
end
defp process_data(data) do
# Some data processing
end
def handle_batch(:example, messages, _batch_info, _context) do
# Do something with batch messages
end
end
Learn more about available options in Broadway documentation.
Once you have your Broadway pipeline defined, you can add it to your supervision tree:
children = [
{MyBroadway, []}
]
Supervisor.start_link(children, strategy: :one_for_one)