PhoenixKit.Emails.SQSWorker (phoenix_kit v1.5.1)

View Source

SQS Worker for processing email events from AWS SQS Queue.

This GenServer continuously polls the SQS queue, receives events from AWS SES through SNS, processes them and updates email statuses in the database.

Architecture

AWS SES  SNS Topic  SQS Queue  SQS Worker  Database

Features

  • Long Polling: Efficient message retrieval with long polling
  • Batch Processing: Process up to 10 messages at a time
  • Error Handling: Retry logic with Dead Letter Queue
  • Graceful Shutdown: Proper work completion on shutdown
  • Metrics: Processing metrics collection for monitoring
  • Backpressure: Load control through visibility timeout
  • Dynamic Configuration: Automatically responds to settings changes without restart

Configuration

All settings are retrieved from PhoenixKit Settings and checked dynamically:

  • email_ses_events - master switch for AWS SES events processing (checked before each cycle)
  • sqs_polling_enabled - enable/disable polling (checked before each cycle)
  • sqs_polling_interval_ms - interval between polling cycles
  • sqs_max_messages_per_poll - maximum messages per batch
  • sqs_visibility_timeout - time for message processing
  • aws_sqs_queue_url - SQS queue URL
  • aws_region - AWS region

Note: When email_ses_events or sqs_polling_enabled is changed in settings, the worker will automatically stop or resume polling without requiring a restart. Status checks occur every 30 seconds when polling is disabled to detect re-enablement.

Security

  • Uses AWS IAM for SQS access
  • Automatic deletion of processed messages
  • Retry mechanism for failed messages
  • Dead Letter Queue for problematic messages

Usage

# In supervision tree
{PhoenixKit.Emails.SQSWorker, []}

# Worker management
PhoenixKit.Emails.SQSWorker.status()
PhoenixKit.Emails.SQSWorker.process_now()
PhoenixKit.Emails.SQSWorker.pause()
PhoenixKit.Emails.SQSWorker.resume()

Monitoring

Worker provides metrics for monitoring:

  • Number of processed messages
  • Number of processing errors
  • Last polling time
  • Average processing speed

Summary

Functions

Returns a specification to start this module under a supervisor.

Deletes processed messages from DLQ.

Pauses polling (temporarily).

Processes all messages from DLQ (Dead Letter Queue).

Forces a polling cycle to start immediately.

Resumes polling after pause.

Starts the SQS Worker process.

Returns the current status of the worker process.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

delete_dlq_messages(receipt_handles)

Deletes processed messages from DLQ.

Parameters

  • receipt_handles - List of receipt handles to delete

Returns

  • {:ok, deleted_count} - Number of deleted messages
  • {:error, reason} - Deletion error

Examples

iex> PhoenixKit.Emails.SQSWorker.delete_dlq_messages(["receipt1", "receipt2"])
{:ok, 2}

pause(worker \\ __MODULE__)

Pauses polling (temporarily).

Worker will continue to run but will not poll the SQS queue.

Examples

iex> PhoenixKit.Emails.SQSWorker.pause()
:ok

process_dlq_messages(opts \\ [])

Processes all messages from DLQ (Dead Letter Queue).

This function retrieves all messages from DLQ, processes them through SQSProcessor, and optionally deletes successfully processed messages.

Parameters

  • opts - Processing options:
    • :batch_size - Batch size (default 10)
    • :delete_after - Delete successfully processed messages (default false)
    • :max_batches - Maximum number of batches (default 100)

Returns

  • {:ok, result} - Successful processing with results
  • {:error, reason} - Processing error

Examples

iex> PhoenixKit.Emails.SQSWorker.process_dlq_messages()
{:ok, %{total_processed: 15, successful: 12, errors: 3}}

iex> PhoenixKit.Emails.SQSWorker.process_dlq_messages(delete_after: true)
{:ok, %{total_processed: 8, successful: 8, errors: 0, deleted: 8}}

process_now(worker \\ __MODULE__)

Forces a polling cycle to start immediately.

Useful for testing or when you need to process messages without waiting.

Examples

iex> PhoenixKit.Emails.SQSWorker.process_now()
:ok

resume(worker \\ __MODULE__)

Resumes polling after pause.

Examples

iex> PhoenixKit.Emails.SQSWorker.resume()
:ok

start_link(opts \\ [])

Starts the SQS Worker process.

Options

  • :name - process name (defaults to __MODULE__)

Examples

{:ok, pid} = PhoenixKit.Emails.SQSWorker.start_link()

status(worker \\ __MODULE__)

Returns the current status of the worker process.

Examples

iex> PhoenixKit.Emails.SQSWorker.status()
%{
  polling_enabled: true,
  messages_processed: 150,
  errors_count: 2,
  last_poll: ~U[2025-09-20 15:30:45.123456Z],
  queue_url: "https://sqs.eu-north-1.amazonaws.com/123456789012/phoenixkit-email-queue",
  average_processing_time_ms: 45.2
}