PhoenixKit.Modules.Emails.SQSWorker (phoenix_kit v1.7.33)

Copy Markdown View Source

SQS Worker for processing email events from AWS SQS Queue.

⚠️ DEPRECATION NOTICE

This GenServer-based worker is deprecated in favor of the Oban-based approach. For new installations, use PhoenixKit.Modules.Emails.SQSPollingManager instead.

The GenServer approach has been replaced with Oban jobs to allow:

  • Dynamic enabling/disabling without application restart
  • Better job monitoring and failure tracking
  • Automatic retries via Oban's built-in retry mechanism
  • Integration with existing Oban infrastructure

Migration Path

If you're currently using SQSWorker, you can migrate to the new approach:

# Old approach (GenServer)
PhoenixKit.Modules.Emails.SQSWorker.status()
PhoenixKit.Modules.Emails.SQSWorker.pause()
PhoenixKit.Modules.Emails.SQSWorker.resume()

# New approach (Oban-based)
PhoenixKit.Modules.Emails.SQSPollingManager.status()
PhoenixKit.Modules.Emails.SQSPollingManager.disable_polling()
PhoenixKit.Modules.Emails.SQSPollingManager.enable_polling()

Backward Compatibility

This module maintains backward compatibility by delegating to SQSPollingManager where appropriate. The GenServer will still work but is not recommended for new installations.

Architecture

AWS SES  SNS Topic  SQS Queue  SQS Worker  Database

Configuration

All settings are retrieved from PhoenixKit Settings and checked dynamically:

  • email_ses_events - master switch for AWS SES events processing
  • sqs_polling_enabled - enable/disable polling
  • sqs_polling_interval_ms - interval between polling cycles
  • sqs_max_messages_per_poll - maximum messages per batch
  • aws_sqs_queue_url - SQS queue URL
  • aws_region - AWS region

Usage (Legacy)

# In supervision tree (deprecated)
{PhoenixKit.Modules.Emails.SQSWorker, []}

# Worker management (delegates to new API)
PhoenixKit.Modules.Emails.SQSWorker.status()
PhoenixKit.Modules.Emails.SQSWorker.process_now()
PhoenixKit.Modules.Emails.SQSWorker.pause()
PhoenixKit.Modules.Emails.SQSWorker.resume()

Summary

Functions

Returns a specification to start this module under a supervisor.

Deletes processed messages from DLQ.

Pauses polling (temporarily).

Processes all messages from DLQ (Dead Letter Queue).

Forces a polling cycle to start immediately.

Resumes polling after pause.

Starts the SQS Worker process.

Returns the current status of the worker process.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

delete_dlq_messages(receipt_handles)

Deletes processed messages from DLQ.

Parameters

  • receipt_handles - List of receipt handles to delete

Returns

  • {:ok, deleted_count} - Number of deleted messages
  • {:error, reason} - Deletion error

Examples

iex> PhoenixKit.Modules.Emails.SQSWorker.delete_dlq_messages(["receipt1", "receipt2"])
{:ok, 2}

pause(worker \\ __MODULE__)

Pauses polling (temporarily).

Note: This function now delegates to SQSPollingManager.

Examples

iex> PhoenixKit.Modules.Emails.SQSWorker.pause()
:ok

process_dlq_messages(opts \\ [])

Processes all messages from DLQ (Dead Letter Queue).

This function retrieves all messages from DLQ, processes them through SQSProcessor, and optionally deletes successfully processed messages.

Parameters

  • opts - Processing options:
    • :batch_size - Batch size (default 10)
    • :delete_after - Delete successfully processed messages (default false)
    • :max_batches - Maximum number of batches (default 100)

Returns

  • {:ok, result} - Successful processing with results
  • {:error, reason} - Processing error

Examples

iex> PhoenixKit.Modules.Emails.SQSWorker.process_dlq_messages()
{:ok, %{total_processed: 15, successful: 12, errors: 3}}

iex> PhoenixKit.Modules.Emails.SQSWorker.process_dlq_messages(delete_after: true)
{:ok, %{total_processed: 8, successful: 8, errors: 0, deleted: 8}}

process_now(worker \\ __MODULE__)

Forces a polling cycle to start immediately.

Note: This function now delegates to SQSPollingManager.

Examples

iex> PhoenixKit.Modules.Emails.SQSWorker.process_now()
:ok

resume(worker \\ __MODULE__)

Resumes polling after pause.

Note: This function now delegates to SQSPollingManager.

Examples

iex> PhoenixKit.Modules.Emails.SQSWorker.resume()
:ok

start_link(opts \\ [])

Starts the SQS Worker process.

Options

  • :name - process name (defaults to __MODULE__)

Examples

{:ok, pid} = PhoenixKit.Modules.Emails.SQSWorker.start_link()

status(worker \\ __MODULE__)

Returns the current status of the worker process.

Note: This function now delegates to SQSPollingManager for consistency.

Examples

iex> PhoenixKit.Modules.Emails.SQSWorker.status()
%{
  polling_enabled: true,
  messages_processed: 150,
  errors_count: 2,
  last_poll: ~U[2025-09-20 15:30:45.123456Z],
  queue_url: "https://sqs.eu-north-1.amazonaws.com/123456789012/phoenixkit-email-queue",
  average_processing_time_ms: 45.2
}