Context module for managing DB Sync connections.
Provides CRUD operations and business logic for persistent connections between PhoenixKit instances. Connections replace ephemeral session codes with permanent token-based authentication.
Connection Directions
"sender"- This site sends data to other sites"receiver"- This site receives data from other sites
Approval Modes (Sender-side)
"auto_approve"- All transfers are automatically approved"require_approval"- Each transfer needs manual approval"per_table"- Tables inauto_approve_tablesdon't need approval
Connection Status Flow
pending → active → suspended → revoked
↘
expired (auto-set when limits exceeded or past expires_at)Usage Examples
# Create a sender connection
{:ok, conn, token} = Connections.create_connection(%{
name: "Production Receiver",
direction: "sender",
site_url: "https://receiver.example.com",
approval_mode: "auto_approve"
})
# Validate an incoming connection
{:ok, conn} = Connections.validate_connection(token, "192.168.1.1")
# Update connection statistics after transfer
{:ok, conn} = Connections.record_transfer(conn, %{
records_count: 100,
bytes_count: 50000
})
Summary
Functions
Approves a pending connection, making it active.
Counts connections with optional filters.
Creates a new connection with a generated auth token.
Deletes a connection.
Finds and marks expired connections.
Gets connections expiring soon (within given hours).
Finds a connection by auth token hash and direction.
Finds a connection by site URL and direction.
Finds a connection by site URL and auth token hash.
Finds a connection by auth token.
Gets a connection by UUID.
Gets a connection by UUID, raising if not found.
Lists all connections with optional filters.
Reactivates a suspended connection.
Records a transfer and updates connection statistics.
Regenerates the auth token for a connection.
Gets the remaining download allowance for a connection.
Gets the remaining record allowance for a connection.
Checks if a table requires approval for this connection.
Revokes a connection permanently.
Suspends an active connection.
Checks if a table is allowed for this connection.
Updates last connected timestamp.
Updates a connection's settings.
Validates an auth token and returns the connection if valid.
Validates a download password for a connection.
Functions
@spec approve_connection(PhoenixKit.Modules.Sync.Connection.t(), String.t()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, Ecto.Changeset.t()}
Approves a pending connection, making it active.
Parameters
connection- The connection to approveadmin_user_uuid- The user ID approving the connection
Examples
{:ok, conn} = Connections.approve_connection(conn, current_user.uuid)
@spec count_connections(keyword()) :: non_neg_integer()
Counts connections with optional filters.
Options
:direction- Filter by direction:status- Filter by status
@spec create_connection(map()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t(), String.t()} | {:error, Ecto.Changeset.t()}
Creates a new connection with a generated auth token.
Returns both the connection and the raw auth token (token is only shown once).
Parameters
attrs- Connection attributes::name(required) - Human-readable name:direction(required) - "sender" or "receiver":site_url(required) - URL of the remote site:approval_mode- "auto_approve", "require_approval", "per_table":created_by- User ID who created the connection- Other optional settings
Examples
{:ok, conn, token} = Connections.create_connection(%{
name: "Staging Server",
direction: "sender",
site_url: "https://staging.example.com",
created_by_uuid: current_user.uuid
})
# Token is only returned once - store it securely!
IO.puts("Auth token: #{token}")
@spec delete_connection(PhoenixKit.Modules.Sync.Connection.t()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, Ecto.Changeset.t()}
Deletes a connection.
Examples
{:ok, conn} = Connections.delete_connection(conn)
@spec expire_connections() :: {non_neg_integer(), nil | term()}
Finds and marks expired connections.
A connection is expired if:
expires_atis in the past, ORmax_downloadsis exceeded, ORmax_records_totalis exceeded
Returns the number of connections marked as expired.
Examples
{count, nil} = Connections.expire_connections()
IO.puts("Expired #{count} connections")
@spec expiring_soon(non_neg_integer()) :: [PhoenixKit.Modules.Sync.Connection.t()]
Gets connections expiring soon (within given hours).
Useful for sending expiration warnings.
Examples
expiring = Connections.expiring_soon(24) # Expiring within 24 hours
@spec find_by_hash_and_direction(String.t(), String.t()) :: PhoenixKit.Modules.Sync.Connection.t() | nil
Finds a connection by auth token hash and direction.
The auth_token_hash is unique per connection pair, so this can be used to find a specific connection without needing the site URL.
@spec find_by_site_url(String.t(), String.t()) :: PhoenixKit.Modules.Sync.Connection.t() | nil
Finds a connection by site URL and direction.
Examples
conn = Connections.find_by_site_url("https://example.com", "sender")
@spec find_by_site_url_and_hash(String.t(), String.t()) :: PhoenixKit.Modules.Sync.Connection.t() | nil
Finds a connection by site URL and auth token hash.
Used to identify a specific connection when the remote site requests deletion.
Examples
conn = Connections.find_by_site_url_and_hash("https://example.com", "abc123hash")
@spec find_by_token(String.t()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, :invalid_token}
Finds a connection by auth token.
Examples
{:ok, conn} = Connections.find_by_token(token)
@spec get_connection(String.t()) :: PhoenixKit.Modules.Sync.Connection.t() | nil
Gets a connection by UUID.
Accepts:
- UUID string:
get_connection("01234567-89ab-cdef-0123-456789abcdef")
@spec get_connection!(String.t()) :: PhoenixKit.Modules.Sync.Connection.t()
Gets a connection by UUID, raising if not found.
Accepts same inputs as get_connection/1.
@spec list_connections(keyword()) :: [PhoenixKit.Modules.Sync.Connection.t()]
Lists all connections with optional filters.
Options
:direction- Filter by direction ("sender" or "receiver"):status- Filter by status:limit- Maximum results:offset- Number of results to skip:preload- Associations to preload
Examples
connections = Connections.list_connections(direction: "sender", status: "active")
@spec reactivate_connection(PhoenixKit.Modules.Sync.Connection.t()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, Ecto.Changeset.t()}
Reactivates a suspended connection.
Examples
{:ok, conn} = Connections.reactivate_connection(conn)
@spec record_transfer(PhoenixKit.Modules.Sync.Connection.t(), map()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, Ecto.Changeset.t()}
Records a transfer and updates connection statistics.
Should be called after each successful data transfer.
Parameters
connection- The connection to updateattrs- Transfer statistics::records_count- Number of records transferred:bytes_count- Bytes transferred
Examples
{:ok, conn} = Connections.record_transfer(conn, %{
records_count: 500,
bytes_count: 250000
})
@spec regenerate_token(PhoenixKit.Modules.Sync.Connection.t()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t(), String.t()} | {:error, Ecto.Changeset.t()}
Regenerates the auth token for a connection.
Returns the new raw token (only shown once).
Examples
{:ok, conn, new_token} = Connections.regenerate_token(conn)
@spec remaining_downloads(PhoenixKit.Modules.Sync.Connection.t()) :: non_neg_integer() | :unlimited
Gets the remaining download allowance for a connection.
Returns :unlimited if no limit is set.
Examples
Connections.remaining_downloads(conn)
# => 45 (if max_downloads: 50, downloads_used: 5)
# => :unlimited (if max_downloads: nil)
@spec remaining_records(PhoenixKit.Modules.Sync.Connection.t()) :: non_neg_integer() | :unlimited
Gets the remaining record allowance for a connection.
Returns :unlimited if no limit is set.
Examples
Connections.remaining_records(conn)
# => 9500 (if max_records_total: 10000, records_downloaded: 500)
# => :unlimited (if max_records_total: nil)
@spec requires_approval?(PhoenixKit.Modules.Sync.Connection.t(), String.t()) :: boolean()
Checks if a table requires approval for this connection.
Examples
Connections.requires_approval?(conn, "users")
# => true (for require_approval mode)
# => false (for auto_approve mode)
# => depends on auto_approve_tables (for per_table mode)
@spec revoke_connection( PhoenixKit.Modules.Sync.Connection.t(), String.t(), String.t() | nil ) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, Ecto.Changeset.t()}
Revokes a connection permanently.
Parameters
connection- The connection to revokeadmin_user_uuid- The user ID revoking the connectionreason- Optional reason for revocation
Examples
{:ok, conn} = Connections.revoke_connection(conn, current_user.uuid, "Compromised")
@spec suspend_connection( PhoenixKit.Modules.Sync.Connection.t(), String.t(), String.t() | nil ) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, Ecto.Changeset.t()}
Suspends an active connection.
Parameters
connection- The connection to suspendadmin_user_uuid- The user ID suspending the connectionreason- Optional reason for suspension
Examples
{:ok, conn} = Connections.suspend_connection(conn, current_user.uuid, "Security audit")
@spec table_allowed?(PhoenixKit.Modules.Sync.Connection.t(), String.t()) :: boolean()
Checks if a table is allowed for this connection.
Takes into account both allowed_tables and excluded_tables.
Examples
Connections.table_allowed?(conn, "users")
# => true
Connections.table_allowed?(conn, "secrets")
# => false
@spec touch_connected(PhoenixKit.Modules.Sync.Connection.t()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, Ecto.Changeset.t()}
Updates last connected timestamp.
Examples
{:ok, conn} = Connections.touch_connected(conn)
@spec update_connection(PhoenixKit.Modules.Sync.Connection.t(), map()) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, Ecto.Changeset.t()}
Updates a connection's settings.
Examples
{:ok, conn} = Connections.update_connection(conn, %{
name: "New Name",
max_downloads: 100
})
@spec validate_connection(String.t(), String.t() | nil) :: {:ok, PhoenixKit.Modules.Sync.Connection.t()} | {:error, atom()}
Validates an auth token and returns the connection if valid.
Performs comprehensive validation including:
- Token verification
- Status check (must be active)
- Expiration check
- Download limits check
- Record limits check
- IP whitelist check (if provided)
- Time-of-day restrictions check
Parameters
token- The auth token to validateclient_ip- The client's IP address (optional)
Returns
{:ok, connection}- Connection is valid and ready to use{:error, reason}- Validation failed with reason::invalid_token- Token doesn't match any connection:connection_not_active- Connection status is not "active":connection_expired- Connection has expired:download_limit_reached- Max downloads exceeded:record_limit_reached- Max records exceeded:ip_not_allowed- Client IP not in whitelist:outside_allowed_hours- Current time outside allowed hours
Examples
case Connections.validate_connection(token, client_ip) do
{:ok, conn} -> proceed_with_transfer(conn)
{:error, :connection_expired} -> send_error("Connection has expired")
{:error, reason} -> send_error("Access denied: #{reason}")
end
@spec validate_download_password( PhoenixKit.Modules.Sync.Connection.t(), String.t() | nil ) :: :ok | {:error, :invalid_password}
Validates a download password for a connection.
Examples
case Connections.validate_download_password(conn, password) do
:ok -> proceed()
{:error, :invalid_password} -> deny_access()
end