Context module for managing DB Sync transfers.
Provides CRUD operations and business logic for tracking data transfers between PhoenixKit instances, including approval workflow support.
Transfer Directions
"send"- This site sent data to another site"receive"- This site received data from another site
Status Flow
pending → pending_approval → approved → in_progress → completed
↘
denied
↘
expired (approval timed out)
pending → in_progress → completed
↘
failed
↘
cancelledUsage Examples
# Create a transfer record
{:ok, transfer} = Transfers.create_transfer(%{
direction: "receive",
connection_uuid: conn.uuid,
table_name: "users",
records_requested: 100,
conflict_strategy: "skip"
})
# Start a transfer
{:ok, transfer} = Transfers.start_transfer(transfer)
# Update progress
{:ok, transfer} = Transfers.update_progress(transfer, %{
records_transferred: 50,
records_created: 45,
records_skipped: 5
})
# Complete a transfer
{:ok, transfer} = Transfers.complete_transfer(transfer)
Summary
Functions
Gets active (in-progress) transfers.
Approves a pending transfer.
Cancels a transfer.
Completes a transfer successfully.
Gets transfer statistics for a connection.
Counts transfers with optional filters.
Creates a new transfer record.
Deletes a transfer.
Denies a pending transfer.
Expires pending approval requests that have timed out.
Marks a transfer as failed.
Gets a transfer by UUID.
Gets a transfer by ID or UUID, raising if not found.
Gets a transfer by ID with associations preloaded.
Lists transfers pending approval.
Lists transfers with optional filters.
Gets recent transfers for display.
Requests approval for a transfer.
Starts a transfer (changes status to "in_progress").
Gets transfer statistics grouped by table.
Updates transfer progress.
Functions
@spec active_transfers() :: [PhoenixKit.Modules.Sync.Transfer.t()]
Gets active (in-progress) transfers.
Examples
active = Transfers.active_transfers()
@spec approve_transfer(PhoenixKit.Modules.Sync.Transfer.t(), String.t()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Approves a pending transfer.
Parameters
transfer- The transfer to approveadmin_user_uuid- The user ID approving the transfer
Examples
{:ok, transfer} = Transfers.approve_transfer(transfer, current_user.uuid)
@spec cancel_transfer(PhoenixKit.Modules.Sync.Transfer.t()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Cancels a transfer.
Examples
{:ok, transfer} = Transfers.cancel_transfer(transfer)
@spec complete_transfer(PhoenixKit.Modules.Sync.Transfer.t(), map()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Completes a transfer successfully.
Parameters
transfer- The transfer to completefinal_stats- Optional final statistics to record
Examples
{:ok, transfer} = Transfers.complete_transfer(transfer, %{
records_transferred: 500,
records_created: 480,
records_updated: 15,
records_skipped: 5
})
Gets transfer statistics for a connection.
Returns
Map with:
:total_transfers- Total number of transfers:completed- Number of completed transfers:failed- Number of failed transfers:total_records- Total records transferred:total_bytes- Total bytes transferred
Examples
stats = Transfers.connection_stats("019...")
# => %{total_transfers: 50, completed: 48, failed: 2, ...}
@spec count_transfers(keyword()) :: non_neg_integer()
Counts transfers with optional filters.
Accepts same filter options as list_transfers/1.
@spec create_transfer(map()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Creates a new transfer record.
Parameters
attrs- Transfer attributes::direction(required) - "send" or "receive":table_name(required) - Name of the table being transferred:connection_uuid- UUID of the permanent connection (if used):session_code- Ephemeral session code (if used):remote_site_url- URL of the other site:records_requested- Number of records requested:conflict_strategy- "skip", "overwrite", "merge", "append":requires_approval- Whether this transfer needs approval:requester_ip- IP address of the requester:requester_user_agent- User agent of the requester:initiated_by_uuid- UUID of user who initiated the transfer:metadata- Additional context as a map
Examples
{:ok, transfer} = Transfers.create_transfer(%{
direction: "receive",
table_name: "users",
connection_uuid: conn.uuid,
records_requested: 500,
conflict_strategy: "skip",
initiated_by_uuid: current_user.uuid
})
@spec delete_transfer(PhoenixKit.Modules.Sync.Transfer.t()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Deletes a transfer.
Examples
{:ok, transfer} = Transfers.delete_transfer(transfer)
@spec deny_transfer( PhoenixKit.Modules.Sync.Transfer.t(), String.t(), String.t() | nil ) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Denies a pending transfer.
Parameters
transfer- The transfer to denyadmin_user_uuid- The user ID denying the transferreason- Optional reason for denial
Examples
{:ok, transfer} = Transfers.deny_transfer(transfer, current_user.uuid, "Data too sensitive")
@spec expire_pending_approvals() :: {non_neg_integer(), nil | term()}
Expires pending approval requests that have timed out.
Returns the number of transfers expired.
Examples
{count, nil} = Transfers.expire_pending_approvals()
IO.puts("Expired #{count} approval requests")
@spec fail_transfer(PhoenixKit.Modules.Sync.Transfer.t(), String.t()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Marks a transfer as failed.
Parameters
transfer- The transfer to failerror_message- Description of the failure
Examples
{:ok, transfer} = Transfers.fail_transfer(transfer, "Connection timeout")
@spec get_transfer(String.t()) :: PhoenixKit.Modules.Sync.Transfer.t() | nil
Gets a transfer by UUID.
Accepts:
- UUID string:
get_transfer("01234567-89ab-cdef-0123-456789abcdef")
@spec get_transfer!(integer() | String.t()) :: PhoenixKit.Modules.Sync.Transfer.t()
Gets a transfer by ID or UUID, raising if not found.
Accepts same inputs as get_transfer/1.
@spec get_transfer_with_preloads( integer() | String.t(), keyword() ) :: PhoenixKit.Modules.Sync.Transfer.t() | nil
Gets a transfer by ID with associations preloaded.
Options
:preload- List of associations to preload
Examples
transfer = Transfers.get_transfer_with_preloads(123, [:connection])
@spec list_pending_approvals(keyword()) :: [PhoenixKit.Modules.Sync.Transfer.t()]
Lists transfers pending approval.
Options
:connection_uuid- Filter by connection UUID:table_name- Filter by table name:limit- Maximum results:preload- Associations to preload
Examples
pending = Transfers.list_pending_approvals(connection_uuid: "019...")
@spec list_transfers(keyword()) :: [PhoenixKit.Modules.Sync.Transfer.t()]
Lists transfers with optional filters.
Options
:direction- Filter by direction ("send" or "receive"):status- Filter by status or list of statuses:connection_uuid- Filter by connection UUID:table_name- Filter by table name:requires_approval- Filter by approval requirement:from- Filter by inserted_at >= date:to- Filter by inserted_at <= date:limit- Maximum results:offset- Number of results to skip:preload- Associations to preload:order- Order direction (:asc or :desc, default :desc)
Examples
# List all pending approvals
transfers = Transfers.list_transfers(status: "pending_approval", requires_approval: true)
# List recent transfers for a connection
transfers = Transfers.list_transfers(connection_uuid: "019...", limit: 10)
# List transfers within date range
transfers = Transfers.list_transfers(from: ~U[2025-01-01 00:00:00Z], to: ~U[2025-12-31 23:59:59Z])
@spec recent_transfers(non_neg_integer()) :: [PhoenixKit.Modules.Sync.Transfer.t()]
Gets recent transfers for display.
Parameters
limit- Number of transfers to return (default: 10)
Examples
recent = Transfers.recent_transfers(5)
@spec request_approval(PhoenixKit.Modules.Sync.Transfer.t(), non_neg_integer()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Requests approval for a transfer.
Sets the transfer to "pending_approval" status with an expiration time.
Parameters
transfer- The transfer requiring approvalexpires_in_hours- Hours until approval expires (default: 24)
Examples
{:ok, transfer} = Transfers.request_approval(transfer, 48)
@spec start_transfer(PhoenixKit.Modules.Sync.Transfer.t()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, :cannot_start | Ecto.Changeset.t()}
Starts a transfer (changes status to "in_progress").
Only transfers that can be started (pending without approval, or approved) will be updated.
Examples
{:ok, transfer} = Transfers.start_transfer(transfer)
Gets transfer statistics grouped by table.
Options
:direction- Filter by direction:connection_uuid- Filter by connection UUID:from- Start date:to- End date
Examples
stats = Transfers.table_stats(direction: "receive")
# => [%{table_name: "users", count: 10, records: 5000}, ...]
@spec update_progress(PhoenixKit.Modules.Sync.Transfer.t(), map()) :: {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}
Updates transfer progress.
Parameters
transfer- The transfer to updateattrs- Progress attributes::records_transferred- Total records transferred so far:records_created- New records created:records_updated- Existing records updated:records_skipped- Records skipped (conflicts):records_failed- Records that failed:bytes_transferred- Total bytes transferred
Examples
{:ok, transfer} = Transfers.update_progress(transfer, %{
records_transferred: 100,
records_created: 95,
records_skipped: 5
})