OffBroadwayAmqp10.Producer (off_broadway_amqp10 v0.2.0)
View SourceAn AMQP 1.0 producer for Broadway.
Features
- Automatically acknowledges/rejects messages.
Options
:queue(String.t/0) - Required. The name of the queue or topic:durable- Receiver durability option based on:amqp10_client_session.terminus_durability/0The default value is:none.:connection(keyword/0) - Required. Connection options based on:amqp10_client_connection.connection_config/0:hostname(String.t/0) - Required. The hostname:port(pos_integer/0) - Required. The port number:sasl- Required. :none or [mechanism: :plain, username: "foo", password: "bar"]. In Azure Service Bus username isSharedAccessKeyNameand password isSharedAccessKey:tls_opts(term/0) - Must be{:secure_port, [:ssl.tls_option()]}.:transfer_limit_margin(pos_integer/0) - The max amount of credit that could be requested. The default value is100.
:session(keyword/0) - Required. Session options The default value is[snd_settle_mode: :mixed, rcv_settle_mode: :second].:name(String.t/0) - Required. When using topics in Azure Service Bus, it's calledSubscription Name. Otherwise just an arbitrary name.:snd_settle_mode-unsettled: The sender will send all deliveries initially unsettled to the receiver.settled: The sender will send all deliveries settled to the receiver.mixed: The sender may send a mixture of settled and unsettled deliveries to the receiver. The default value is:mixed.:rcv_settle_mode-first: The receiver will spontaneously settle all incoming transfers.second: The receiver will only settle after sending the disposition to the sender and receiving a disposition indicating settlement of the delivery from the sender. The default value is:second.
Example
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module:
{OffBroadwayAmqp10.Producer,
queue: "my_queue",
durable: :none,
connection: [
hostname: "my-service.servicebus.windows.net",
sasl: [mechanism: :plain, username: "foo", password: "bar"],
tls_opts: [],
transfer_limit_margin: 100
],
session: [
name: to_string(node()),
snd_settle_mode: :mixed,
rcv_settle_mode: :second
]},
concurrency: 1
],
processors: [
default: [
concurrency: 2,
]
]
)Message
The messages which you receive at the Broadway's processors is like:
message = %Broadway.Message{
data: "raw message body",
metadata: %{
headers: %{delivery_count: 0, durable: false, ... },
properties: %{creation_time: 1_656_945_422_583, ...},
application_properties: %{"foo" => "bar", ...},
annotations: %{"x-opt-sequence-number" => 6068, .. }
}
}Acking
Successful messages are acked and failed messages are rejected