PhoenixKit.Emails.SQSWorker (phoenix_kit v1.5.1)
View SourceSQS Worker for processing email events from AWS SQS Queue.
This GenServer continuously polls the SQS queue, receives events from AWS SES through SNS, processes them and updates email statuses in the database.
Architecture
AWS SES → SNS Topic → SQS Queue → SQS Worker → DatabaseFeatures
- Long Polling: Efficient message retrieval with long polling
- Batch Processing: Process up to 10 messages at a time
- Error Handling: Retry logic with Dead Letter Queue
- Graceful Shutdown: Proper work completion on shutdown
- Metrics: Processing metrics collection for monitoring
- Backpressure: Load control through visibility timeout
- Dynamic Configuration: Automatically responds to settings changes without restart
Configuration
All settings are retrieved from PhoenixKit Settings and checked dynamically:
email_ses_events- master switch for AWS SES events processing (checked before each cycle)sqs_polling_enabled- enable/disable polling (checked before each cycle)sqs_polling_interval_ms- interval between polling cyclessqs_max_messages_per_poll- maximum messages per batchsqs_visibility_timeout- time for message processingaws_sqs_queue_url- SQS queue URLaws_region- AWS region
Note: When email_ses_events or sqs_polling_enabled is changed in settings,
the worker will automatically stop or resume polling without requiring a restart.
Status checks occur every 30 seconds when polling is disabled to detect re-enablement.
Security
- Uses AWS IAM for SQS access
- Automatic deletion of processed messages
- Retry mechanism for failed messages
- Dead Letter Queue for problematic messages
Usage
# In supervision tree
{PhoenixKit.Emails.SQSWorker, []}
# Worker management
PhoenixKit.Emails.SQSWorker.status()
PhoenixKit.Emails.SQSWorker.process_now()
PhoenixKit.Emails.SQSWorker.pause()
PhoenixKit.Emails.SQSWorker.resume()Monitoring
Worker provides metrics for monitoring:
- Number of processed messages
- Number of processing errors
- Last polling time
- Average processing speed
Summary
Functions
Returns a specification to start this module under a supervisor.
Deletes processed messages from DLQ.
Pauses polling (temporarily).
Processes all messages from DLQ (Dead Letter Queue).
Forces a polling cycle to start immediately.
Resumes polling after pause.
Starts the SQS Worker process.
Returns the current status of the worker process.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Deletes processed messages from DLQ.
Parameters
receipt_handles- List of receipt handles to delete
Returns
{:ok, deleted_count}- Number of deleted messages{:error, reason}- Deletion error
Examples
iex> PhoenixKit.Emails.SQSWorker.delete_dlq_messages(["receipt1", "receipt2"])
{:ok, 2}
Pauses polling (temporarily).
Worker will continue to run but will not poll the SQS queue.
Examples
iex> PhoenixKit.Emails.SQSWorker.pause()
:ok
Processes all messages from DLQ (Dead Letter Queue).
This function retrieves all messages from DLQ, processes them through SQSProcessor, and optionally deletes successfully processed messages.
Parameters
opts- Processing options::batch_size- Batch size (default 10):delete_after- Delete successfully processed messages (default false):max_batches- Maximum number of batches (default 100)
Returns
{:ok, result}- Successful processing with results{:error, reason}- Processing error
Examples
iex> PhoenixKit.Emails.SQSWorker.process_dlq_messages()
{:ok, %{total_processed: 15, successful: 12, errors: 3}}
iex> PhoenixKit.Emails.SQSWorker.process_dlq_messages(delete_after: true)
{:ok, %{total_processed: 8, successful: 8, errors: 0, deleted: 8}}
Forces a polling cycle to start immediately.
Useful for testing or when you need to process messages without waiting.
Examples
iex> PhoenixKit.Emails.SQSWorker.process_now()
:ok
Resumes polling after pause.
Examples
iex> PhoenixKit.Emails.SQSWorker.resume()
:ok
Starts the SQS Worker process.
Options
:name- process name (defaults to__MODULE__)
Examples
{:ok, pid} = PhoenixKit.Emails.SQSWorker.start_link()
Returns the current status of the worker process.
Examples
iex> PhoenixKit.Emails.SQSWorker.status()
%{
polling_enabled: true,
messages_processed: 150,
errors_count: 2,
last_poll: ~U[2025-09-20 15:30:45.123456Z],
queue_url: "https://sqs.eu-north-1.amazonaws.com/123456789012/phoenixkit-email-queue",
average_processing_time_ms: 45.2
}