View Source OffBroadwayAmqp10.Producer (off_broadway_amqp10 v0.1.3)

An AMQP 1.0 producer for Broadway.

Features

  • Automatically acknowledges/rejects messages.

Options

  • :queue (String.t/0) - Required. The name of the queue or topic

  • :connection (keyword/0) - Required. Connection options based on :amqp10_client_connection.connection_config/0

    • :hostname (String.t/0) - Required. The hostname

    • :port (pos_integer/0) - Required. The port number

    • :sasl (keyword/0) - Required. [mechanism: :plain, username: "foo", password: "bar"]. In Azure Service Bus username is SharedAccessKeyName and password is SharedAccessKey

    • :tls_opts (term/0)

    • :transfer_limit_margin (pos_integer/0) - The max amount of credit that could be requested. The default value is 100.

  • :session (keyword/0) - Required. Session options The default value is [snd_settle_mode: :mixed, rcv_settle_mode: :second].

    • :name (String.t/0) - Required. When using topics in Azure Service Bus, it's called Subscription Name. Otherwise just an arbitrary name.

    • :snd_settle_mode - unsettled: The sender will send all deliveries initially unsettled to the receiver. settled: The sender will send all deliveries settled to the receiver. mixed: The sender may send a mixture of settled and unsettled deliveries to the receiver. The default value is :mixed.

    • :rcv_settle_mode - first: The receiver will spontaneously settle all incoming transfers. second: The receiver will only settle after sending the disposition to the sender and receiving a disposition indicating settlement of the delivery from the sender. The default value is :second.

Example

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module:
      {OffBroadwayAmqp10.Producer,
      queue: "my_queue",
      connection: [
        hostname: "my-service.servicebus.windows.net",
        sasl: [mechanism: :plain, username: "foo", password: "bar"],
        tls_opts: [],
        transfer_limit_margin: 100
      ],
      session: [
        name: to_string(node())
      ]},
    concurrency: 1
  ],
  processors: [
    default: [
      concurrency: 2,
    ]
  ]
)

Message

The messages which you receive at the Broadway's processors is like:

message = %Broadway.Message{
  data: "raw message body",
  metadata: %{
    headers: %{delivery_count: 0, durable: false, ... },
    properties: %{creation_time: 1_656_945_422_583, ...},
    application_properties: %{"foo" => "bar", ...},
    annotations: %{"x-opt-sequence-number" => 6068, .. }
  }
}

Acking

Successful messages are acked and failed messages are rejected