Sequins.Pipeline.Action behaviour (Sequins v0.8.0)

Sequins.Pipeline.Action wraps a Broadway SQS processing pipeline to allow for the simple creation of multi-stage SQS -> Broadway -> SNS pipelines.

Getting Started

First, follow the Create a SQS queue and Configure the project sections of the Broadway Amazon SQS guide.

Also make sure config.exs contains a valid ExAws configuration.

Implement the processing callback

This is where we depart from Broadway's default implementation. Pipeline.Action makes several opinionated assumptions about the AWS environment as well as the shape of the incoming message data.

Processor

Sequins.Pipeline.Action does some pre- and post-processing of the Broadway.Message struct. Instead of implementing handle_message/3, we're just going to implement our own process/2, which recieves two parameters:

  • data: The Message field of the incoming SQS message
  • attrs: The MessageAttributes field of the incoming SQS message

attrs (and data, if it's a map) is converted from "CamelCaseStringKeys" to :underscore_atom_keys before being passed to process/2.

process/2's return value should be one of the following:

  • {status, data, attrs} (if both data and attrs have been updated)
  • {status, data} (if data has been updated)
  • {status} or status (if no data/attribute updates are needed)

The status will be added to the attrs of the next message in the pipeline.

Example:

defmodule MyApplication.MyPipeline do
  use Pipeline.Action

  def process(data, attrs) do
    {:ok,
      data
      |> Map.get_and_update!(:value, fn n -> n * n end)}
  end
end

By default, the queue and topic names are imputed based on the last segment of the using module name (e.g., sequins-my-pipeline for a module ending in MyPipeline). This can be overridden by passing a :queue_name option to use:

defmodule MyApplication.MyPipeline do
  use Pipeline.Action, queue_name: "my_pipeline"
  ...
end

The default resource prefix is sequins, but can be changed by configuring the :sequins application's :prefix attribute.

Batcher

Sequins.Pipeline.Action sends processed data to an AWS Simple Notification Service topic, allowing it to be dispatched to another queue (and into another Sequins.Pipeline.Action), an AWS Lambda, an arbitrary webhook, or even an email or SMS message.

Configuration Options

Sequins.Pipeline.Action attempts to use sane defaults, inheriting most of them from Broadway itself. However, several can be overriden in the application configuration.

Options

Sequins.Pipeline.Action is configured by passing options to start_link. Valid options are:

  • :producer_concurrency - Optional. The number of producer concurrency to be created by Broadway. Analogous to Broadway's producer :concurrency option. Default value is 1.

  • :receive_interval - Optional. The frequency with which the produer polls SQS for new messages. Default value is 5000.

  • :wait_time_seconds - Optional. The duration (in seconds) for which the producer's ReceiveMessages call waits for a message to arrive in the queue before returning. Default value is 0 (short polling).

  • :max_number_of_messages - Optional. The maximum number of messages the producer requests from SQS at once. Default value is 10. Maximum value is 10.

  • :visibility_timeout - Optional. The amount of time (in seconds) SQS will wait for a message to be acknowledged before putting it back in the queue. Defaults to the queue's configured VisibilityTimeout setting.

  • :processor_concurrency - Optional. The number of processor concurrency to be created by Broadway. Analogous to Broadway's producer :concurrency option. Default value is 1.

  • :max_demand - Optional. Set the maximum demand of all processors concurrency. Analogous to Broadway's processor :max_demand option. Default value is 10.

  • :min_demand - Optional. Set the minimum demand of all processors concurrency. Analogous to Broadway's processor :min_demand option. Default value is 5.

  • :batcher_concurrency - Optional. The number of batcher concurrency to be created by Broadway. Analogous to Broadway's batcher :concurrency option. Default value is 1.

  • :batch_size - Optional. The size of generated batches. Analogous to Broadway's batcher :batch_size option. Default value is 100.

  • :batch_timeout - Optional. The time, in milliseconds, that the batcher waits before flushing the list of messages. Analogous to Broadway's batcher :batch_timeout option. Default value is 1000.

Link to this section Summary

Link to this section Types

Link to this type

action_option()

Specs

action_option() ::
  {:batch_size, pos_integer()}
  | {:batch_timeout, non_neg_integer()}
  | {:batcher_concurrency, non_neg_integer()}
  | {:max_demand, non_neg_integer()}
  | {:min_demand, non_neg_integer()}
  | {:processor_concurrency, non_neg_integer()}
  | {:producer_concurrency, non_neg_integer()}
  | {:receive_interval, non_neg_integer()}
  | {:queue_name, String.t()}
Link to this type

action_options()

Specs

action_options() :: [action_option()]

Link to this section Callbacks

Link to this callback

process(data, attrs)

Specs

process(data :: any(), attrs :: map()) ::
  {atom(), any(), map()} | {atom(), any()} | {atom()} | atom()
Link to this callback

process_name(name, base_name)

Specs

process_name(Broadway.name(), base_name :: String.t()) :: Broadway.name()

Link to this section Functions

Link to this function

process_name(broadway_name, base_name)

Link to this function

start_link(module, opts)

Specs

start_link(module :: module(), opts :: action_options()) :: {:ok, pid()}