View Source BroadwaySQS.Producer (BroadwaySQS v0.7.4)

A GenStage producer that continuously polls messages from a SQS queue and acknowledge them after being successfully processed.

By default this producer uses BroadwaySQS.ExAwsClient to talk to SQS but you can provide your client by implementing the BroadwaySQS.SQSClient behaviour.

For a quick getting started on using Broadway with Amazon SQS, please see the Amazon SQS Guide.

Options

Aside from :receive_interval and :sqs_client which are generic and apply to all producers (regardless of the client implementation), all other options are specific to the BroadwaySQS.ExAwsClient, which is the default client.

  • :queue_url - Required. The url for the SQS queue. Note this does not have to be a regional endpoint. For example, https://sqs.amazonaws.com/0000000000/my_queue.

  • :sqs_client - A module that implements the BroadwaySQS.SQSClient behaviour. This module is responsible for fetching and acknowledging the messages. Pay attention that all options passed to the producer will be forwarded to the client. The default value is BroadwaySQS.ExAwsClient.

  • :receive_interval (non_neg_integer/0) - The duration (in milliseconds) for which the producer waits before making a request for more messages. The default value is 5000.

  • :on_success (atom/0) - configures the acking behaviour for successful messages. See the "Acknowledgments" section below for all the possible values. The default value is :ack.

  • :on_failure (atom/0) - configures the acking behaviour for failed messages. See the "Acknowledgments" section below for all the possible values. The default value is :noop.

  • :config (keyword/0) - A set of options that overrides the default ExAws configuration options. The most commonly used options are: :access_key_id, :secret_access_key, :scheme, :region and :port. For a complete list of configuration options and their default values, please see the ExAws documentation. The default value is [].

  • :max_number_of_messages - The maximum number of messages to be fetched per request. This value must be between 1 and 10, which is the maximum number allowed by AWS. The default value is 10.

  • :wait_time_seconds - The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. This value must be between 0 and 20, which is the maximum number allowed by AWS. For more information see "WaitTimeSeconds" on the Amazon SQS documentation.

  • :visibility_timeout - The time period (in seconds) that a message will remain invisible to other consumers whilst still on the queue and not acknowledged. This is passed to SQS when the message (or messages) are read. This value must be between 0 and 43200 (12 hours).

  • :attribute_names - A list containing the names of attributes that should be attached to the response and appended to the metadata field of the message. You can also use :all instead of a list if you want to retrieve all attributes. Supported values are:

    * `:sender_id`
    * `:sent_timestamp`
    * `:approximate_receive_count`
    * `:approximate_first_receive_timestamp`
    * `:sequence_number`
    * `:message_deduplication_id`
    * `:message_group_id`
    * `:aws_trace_header`
  • :message_attribute_names - A list containing the names of custom message attributes that should be attached to the response and appended to the metadata field of the message. Wildcards [".*"] and prefixes ["bar.*"] will retrieve multiple fields. You can also use :all instead of the list if you want to retrieve all attributes.

Acknowledgments

You can use the :on_success and :on_failure options to control how messages are acked on SQS. You can set these options when starting the SQS producer or change them for each message through Broadway.Message.configure_ack/2. By default, successful messages are acked (:ack) and failed messages are not (:noop).

The possible values for :on_success and :on_failure are:

  • :ack - acknowledge the message. SQS will delete the message from the queue and will not redeliver it to any other consumer.

  • :noop - do not acknowledge the message. SQS will eventually redeliver the message or remove it based on the "Visibility Timeout" and "Max Receive Count" configurations. For more information, see:

Batching

Even if you are not interested in working with Broadway batches via the handle_batch/3 callback, we recommend all Broadway pipelines with SQS producers to define a default batcher with batch_size set to 10, so messages can be acknowledged in batches, which improves the performance and reduce the costs of integrating with SQS.

Example

Broadway.start_link(MyBroadway,
  name: MyBroadway,
  producer: [
    module: {BroadwaySQS.Producer,
      queue_url: "https://sqs.amazonaws.com/0000000000/my_queue",
      config: [
        access_key_id: "YOUR_AWS_ACCESS_KEY_ID",
        secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY",
        region: "us-east-2"
      ]
    }
  ],
  processors: [
    default: []
  ],
  batchers: [
    default: [
      batch_size: 10,
      batch_timeout: 2000
    ]
  ]
)

The above configuration will set up a producer that continuously receives messages from "my_queue" and sends them downstream.

Retrieving Metadata

By default the following information is added to the metadata field in the %Message{} struct:

  • message_id - The message id received when the message was sent to the queue
  • receipt_handle - The receipt handle
  • md5_of_body - An MD5 digest of the message body

You can access any of that information directly while processing the message:

def handle_message(_, message, _) do
  receipt = %{
    id: message.metadata.message_id,
    receipt_handle: message.metadata.receipt_handle
  }

  # Do something with the receipt
end

If you want to retrieve attributes or message_attributes, you need to configure the :attributes_names and :message_attributes_names options accordingly, otherwise, attributes will not be attached to the response and will not be available in the metadata field

producer: [
  module: {BroadwaySQS.Producer,
    queue_url: "https://sqs.amazonaws.com/0000000000/my_queue",
    # Define which attributes/message_attributes you want to be attached
    attribute_names: [:approximate_receive_count],
    message_attribute_names: ["SomeAttribute"]
  }
]

and then in handle_message:

def handle_message(_, message, _) do
  approximate_receive_count = message.metadata.attributes["approximate_receive_count"]
  some_attribute = message.metadata.message_attributes["SomeAttribute"]

  # Do something with the attributes
end

For more information on the :attributes_names and :message_attributes_names options, see "AttributeName.N" and "MessageAttributeName.N" on the ReceiveMessage documentation

Telemetry

This library exposes the following Telemetry events:

  • [:broadway_sqs, :receive_messages, :start] - Dispatched before receiving messages from SQS (BroadwaySQS.SQSClient.receive_messages/2)

    • measurement: %{time: System.monotonic_time}
    • metadata: %{name: atom, demand: integer}
  • [:broadway_sqs, :receive_messages, :stop] - Dispatched after messages have been received from SQS and "wrapped".

    • measurement: %{duration: native_time}

    • metadata:

      %{
        name: atom,
        messages: [Broadway.Message.t],
        demand: integer
      }
  • [:broadway_sqs, :receive_messages, :exception] - Dispatched after a failure while receiving messages from SQS.

    • measurement: %{duration: native_time}

    • metadata:

      %{
        name: atom,
        demand: integer,
        kind: kind,
        reason: reason,
        stacktrace: stacktrace
      }