View Source BroadwaySQS.Producer (BroadwaySQS v0.7.4)
A GenStage producer that continuously polls messages from a SQS queue and acknowledge them after being successfully processed.
By default this producer uses BroadwaySQS.ExAwsClient
to talk to SQS but
you can provide your client by implementing the BroadwaySQS.SQSClient
behaviour.
For a quick getting started on using Broadway with Amazon SQS, please see the Amazon SQS Guide.
Options
Aside from :receive_interval
and :sqs_client
which are generic and apply to all
producers (regardless of the client implementation), all other options are specific to
the BroadwaySQS.ExAwsClient
, which is the default client.
:queue_url
- Required. The url for the SQS queue. Note this does not have to be a regional endpoint. For example,https://sqs.amazonaws.com/0000000000/my_queue
.:sqs_client
- A module that implements theBroadwaySQS.SQSClient
behaviour. This module is responsible for fetching and acknowledging the messages. Pay attention that all options passed to the producer will be forwarded to the client. The default value isBroadwaySQS.ExAwsClient
.:receive_interval
(non_neg_integer/0
) - The duration (in milliseconds) for which the producer waits before making a request for more messages. The default value is5000
.:on_success
(atom/0
) - configures the acking behaviour for successful messages. See the "Acknowledgments" section below for all the possible values. The default value is:ack
.:on_failure
(atom/0
) - configures the acking behaviour for failed messages. See the "Acknowledgments" section below for all the possible values. The default value is:noop
.:config
(keyword/0
) - A set of options that overrides the default ExAws configuration options. The most commonly used options are::access_key_id
,:secret_access_key
,:scheme
,:region
and:port
. For a complete list of configuration options and their default values, please see theExAws
documentation. The default value is[]
.:max_number_of_messages
- The maximum number of messages to be fetched per request. This value must be between1
and10
, which is the maximum number allowed by AWS. The default value is10
.:wait_time_seconds
- The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. This value must be between0
and20
, which is the maximum number allowed by AWS. For more information see "WaitTimeSeconds" on the Amazon SQS documentation.:visibility_timeout
- The time period (in seconds) that a message will remain invisible to other consumers whilst still on the queue and not acknowledged. This is passed to SQS when the message (or messages) are read. This value must be between 0 and 43200 (12 hours).:attribute_names
- A list containing the names of attributes that should be attached to the response and appended to themetadata
field of the message. You can also use:all
instead of a list if you want to retrieve all attributes. Supported values are:* `:sender_id` * `:sent_timestamp` * `:approximate_receive_count` * `:approximate_first_receive_timestamp` * `:sequence_number` * `:message_deduplication_id` * `:message_group_id` * `:aws_trace_header`
:message_attribute_names
- A list containing the names of custom message attributes that should be attached to the response and appended to themetadata
field of the message. Wildcards[".*"]
and prefixes["bar.*"]
will retrieve multiple fields. You can also use:all
instead of the list if you want to retrieve all attributes.
Acknowledgments
You can use the :on_success
and :on_failure
options to control how messages are
acked on SQS. You can set these options when starting the SQS producer or change them
for each message through Broadway.Message.configure_ack/2
. By default, successful
messages are acked (:ack
) and failed messages are not (:noop
).
The possible values for :on_success
and :on_failure
are:
:ack
- acknowledge the message. SQS will delete the message from the queue and will not redeliver it to any other consumer.:noop
- do not acknowledge the message. SQS will eventually redeliver the message or remove it based on the "Visibility Timeout" and "Max Receive Count" configurations. For more information, see:
Batching
Even if you are not interested in working with Broadway batches via the
handle_batch/3
callback, we recommend all Broadway pipelines with SQS
producers to define a default batcher with batch_size
set to 10, so
messages can be acknowledged in batches, which improves the performance
and reduce the costs of integrating with SQS.
Example
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwaySQS.Producer,
queue_url: "https://sqs.amazonaws.com/0000000000/my_queue",
config: [
access_key_id: "YOUR_AWS_ACCESS_KEY_ID",
secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY",
region: "us-east-2"
]
}
],
processors: [
default: []
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 2000
]
]
)
The above configuration will set up a producer that continuously receives
messages from "my_queue"
and sends them downstream.
Retrieving Metadata
By default the following information is added to the metadata
field in the
%Message{}
struct:
message_id
- The message id received when the message was sent to the queuereceipt_handle
- The receipt handlemd5_of_body
- An MD5 digest of the message body
You can access any of that information directly while processing the message:
def handle_message(_, message, _) do
receipt = %{
id: message.metadata.message_id,
receipt_handle: message.metadata.receipt_handle
}
# Do something with the receipt
end
If you want to retrieve attributes
or message_attributes
, you need to
configure the :attributes_names
and :message_attributes_names
options
accordingly, otherwise, attributes will not be attached to the response and
will not be available in the metadata
field
producer: [
module: {BroadwaySQS.Producer,
queue_url: "https://sqs.amazonaws.com/0000000000/my_queue",
# Define which attributes/message_attributes you want to be attached
attribute_names: [:approximate_receive_count],
message_attribute_names: ["SomeAttribute"]
}
]
and then in handle_message
:
def handle_message(_, message, _) do
approximate_receive_count = message.metadata.attributes["approximate_receive_count"]
some_attribute = message.metadata.message_attributes["SomeAttribute"]
# Do something with the attributes
end
For more information on the :attributes_names
and :message_attributes_names
options, see "AttributeName.N" and "MessageAttributeName.N" on the ReceiveMessage documentation
Telemetry
This library exposes the following Telemetry events:
[:broadway_sqs, :receive_messages, :start]
- Dispatched before receiving messages from SQS (BroadwaySQS.SQSClient.receive_messages/2
)- measurement:
%{time: System.monotonic_time}
- metadata:
%{name: atom, demand: integer}
- measurement:
[:broadway_sqs, :receive_messages, :stop]
- Dispatched after messages have been received from SQS and "wrapped".measurement:
%{duration: native_time}
metadata:
%{ name: atom, messages: [Broadway.Message.t], demand: integer }
[:broadway_sqs, :receive_messages, :exception]
- Dispatched after a failure while receiving messages from SQS.measurement:
%{duration: native_time}
metadata:
%{ name: atom, demand: integer, kind: kind, reason: reason, stacktrace: stacktrace }