Sequins.Pipeline.Action behaviour (Sequins v0.8.0)
Sequins.Pipeline.Action
wraps a Broadway SQS
processing pipeline to allow for the simple creation of multi-stage SQS -> Broadway -> SNS
pipelines.
Getting Started
First, follow the Create a SQS queue and Configure the project sections of the Broadway Amazon SQS guide.
Also make sure config.exs
contains a valid ExAws
configuration.
Implement the processing callback
This is where we depart from Broadway's default implementation. Pipeline.Action makes several opinionated assumptions about the AWS environment as well as the shape of the incoming message data.
Processor
Sequins.Pipeline.Action
does some pre- and post-processing of the Broadway.Message
struct. Instead of implementing handle_message/3
, we're just going to implement
our own process/2
, which recieves two parameters:
data
: The Message field of the incoming SQS messageattrs
: The MessageAttributes field of the incoming SQS message
attrs
(and data
, if it's a map) is converted from "CamelCaseStringKeys"
to
:underscore_atom_keys
before being passed to process/2
.
process/2
's return value should be one of the following:
{status, data, attrs}
(if bothdata
andattrs
have been updated){status, data}
(ifdata
has been updated){status}
orstatus
(if no data/attribute updates are needed)
The status
will be added to the attrs
of the next message in the pipeline.
Example:
defmodule MyApplication.MyPipeline do
use Pipeline.Action
def process(data, attrs) do
{:ok,
data
|> Map.get_and_update!(:value, fn n -> n * n end)}
end
end
By default, the queue and topic names are imputed based on the last segment
of the using module name (e.g., sequins-my-pipeline
for a module ending in
MyPipeline
). This can be overridden by passing a :queue_name
option to
use
:
defmodule MyApplication.MyPipeline do
use Pipeline.Action, queue_name: "my_pipeline"
...
end
The default resource prefix is sequins
, but can be changed by configuring the
:sequins
application's :prefix
attribute.
Batcher
Sequins.Pipeline.Action
sends processed data to an AWS Simple Notification Service
topic, allowing it to be dispatched to another queue (and into another Sequins.Pipeline.Action
),
an AWS Lambda, an arbitrary webhook, or even an email or SMS message.
Configuration Options
Sequins.Pipeline.Action
attempts to use sane defaults, inheriting most of them from Broadway
itself.
However, several can be overriden in the application configuration.
Options
Sequins.Pipeline.Action
is configured by passing options to start_link
.
Valid options are:
:producer_concurrency
- Optional. The number of producer concurrency to be created by Broadway. Analogous to Broadway's producer:concurrency
option. Default value is 1.:receive_interval
- Optional. The frequency with which the produer polls SQS for new messages. Default value is 5000.:wait_time_seconds
- Optional. The duration (in seconds) for which the producer's ReceiveMessages call waits for a message to arrive in the queue before returning. Default value is 0 (short polling).:max_number_of_messages
- Optional. The maximum number of messages the producer requests from SQS at once. Default value is 10. Maximum value is 10.:visibility_timeout
- Optional. The amount of time (in seconds) SQS will wait for a message to be acknowledged before putting it back in the queue. Defaults to the queue's configuredVisibilityTimeout
setting.:processor_concurrency
- Optional. The number of processor concurrency to be created by Broadway. Analogous to Broadway's producer:concurrency
option. Default value is 1.:max_demand
- Optional. Set the maximum demand of all processors concurrency. Analogous to Broadway's processor:max_demand
option. Default value is 10.:min_demand
- Optional. Set the minimum demand of all processors concurrency. Analogous to Broadway's processor:min_demand
option. Default value is 5.:batcher_concurrency
- Optional. The number of batcher concurrency to be created by Broadway. Analogous to Broadway's batcher:concurrency
option. Default value is 1.:batch_size
- Optional. The size of generated batches. Analogous to Broadway's batcher:batch_size
option. Default value is100
.:batch_timeout
- Optional. The time, in milliseconds, that the batcher waits before flushing the list of messages. Analogous to Broadway's batcher:batch_timeout
option. Default value is1000
.
Link to this section Summary
Link to this section Types
action_option()
Specs
action_option() :: {:batch_size, pos_integer()} | {:batch_timeout, non_neg_integer()} | {:batcher_concurrency, non_neg_integer()} | {:max_demand, non_neg_integer()} | {:min_demand, non_neg_integer()} | {:processor_concurrency, non_neg_integer()} | {:producer_concurrency, non_neg_integer()} | {:receive_interval, non_neg_integer()} | {:queue_name, String.t()}
action_options()
Specs
action_options() :: [action_option()]
Link to this section Callbacks
process(data, attrs)
Specs
process_name(name, base_name)
Specs
process_name(Broadway.name(), base_name :: String.t()) :: Broadway.name()
Link to this section Functions
process_name(broadway_name, base_name)
start_link(module, opts)
Specs
start_link(module :: module(), opts :: action_options()) :: {:ok, pid()}