PhoenixKit.Modules.Sync.Transfers (phoenix_kit v1.7.71)

Copy Markdown View Source

Context module for managing DB Sync transfers.

Provides CRUD operations and business logic for tracking data transfers between PhoenixKit instances, including approval workflow support.

Transfer Directions

  • "send" - This site sent data to another site
  • "receive" - This site received data from another site

Status Flow

pending  pending_approval  approved  in_progress  completed
                          
                          denied
                          
                          expired (approval timed out)

pending  in_progress  completed
                     
                     failed
                     
                     cancelled

Usage Examples

# Create a transfer record
{:ok, transfer} = Transfers.create_transfer(%{
  direction: "receive",
  connection_uuid: conn.uuid,
  table_name: "users",
  records_requested: 100,
  conflict_strategy: "skip"
})

# Start a transfer
{:ok, transfer} = Transfers.start_transfer(transfer)

# Update progress
{:ok, transfer} = Transfers.update_progress(transfer, %{
  records_transferred: 50,
  records_created: 45,
  records_skipped: 5
})

# Complete a transfer
{:ok, transfer} = Transfers.complete_transfer(transfer)

Summary

Functions

Gets active (in-progress) transfers.

Approves a pending transfer.

Cancels a transfer.

Completes a transfer successfully.

Gets transfer statistics for a connection.

Counts transfers with optional filters.

Creates a new transfer record.

Deletes a transfer.

Expires pending approval requests that have timed out.

Marks a transfer as failed.

Gets a transfer by UUID.

Gets a transfer by ID or UUID, raising if not found.

Gets a transfer by ID with associations preloaded.

Lists transfers pending approval.

Lists transfers with optional filters.

Gets recent transfers for display.

Requests approval for a transfer.

Starts a transfer (changes status to "in_progress").

Gets transfer statistics grouped by table.

Updates transfer progress.

Functions

active_transfers()

@spec active_transfers() :: [PhoenixKit.Modules.Sync.Transfer.t()]

Gets active (in-progress) transfers.

Examples

active = Transfers.active_transfers()

approve_transfer(transfer, admin_user_uuid)

Approves a pending transfer.

Parameters

  • transfer - The transfer to approve
  • admin_user_uuid - The user ID approving the transfer

Examples

{:ok, transfer} = Transfers.approve_transfer(transfer, current_user.uuid)

cancel_transfer(transfer)

Cancels a transfer.

Examples

{:ok, transfer} = Transfers.cancel_transfer(transfer)

complete_transfer(transfer, final_stats \\ %{})

@spec complete_transfer(PhoenixKit.Modules.Sync.Transfer.t(), map()) ::
  {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}

Completes a transfer successfully.

Parameters

  • transfer - The transfer to complete
  • final_stats - Optional final statistics to record

Examples

{:ok, transfer} = Transfers.complete_transfer(transfer, %{
  records_transferred: 500,
  records_created: 480,
  records_updated: 15,
  records_skipped: 5
})

connection_stats(connection_uuid)

@spec connection_stats(String.t()) :: map()

Gets transfer statistics for a connection.

Returns

Map with:

  • :total_transfers - Total number of transfers
  • :completed - Number of completed transfers
  • :failed - Number of failed transfers
  • :total_records - Total records transferred
  • :total_bytes - Total bytes transferred

Examples

stats = Transfers.connection_stats("019...")
# => %{total_transfers: 50, completed: 48, failed: 2, ...}

count_transfers(opts \\ [])

@spec count_transfers(keyword()) :: non_neg_integer()

Counts transfers with optional filters.

Accepts same filter options as list_transfers/1.

create_transfer(attrs)

@spec create_transfer(map()) ::
  {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}

Creates a new transfer record.

Parameters

  • attrs - Transfer attributes:
    • :direction (required) - "send" or "receive"
    • :table_name (required) - Name of the table being transferred
    • :connection_uuid - UUID of the permanent connection (if used)
    • :session_code - Ephemeral session code (if used)
    • :remote_site_url - URL of the other site
    • :records_requested - Number of records requested
    • :conflict_strategy - "skip", "overwrite", "merge", "append"
    • :requires_approval - Whether this transfer needs approval
    • :requester_ip - IP address of the requester
    • :requester_user_agent - User agent of the requester
    • :initiated_by_uuid - UUID of user who initiated the transfer
    • :metadata - Additional context as a map

Examples

{:ok, transfer} = Transfers.create_transfer(%{
  direction: "receive",
  table_name: "users",
  connection_uuid: conn.uuid,
  records_requested: 500,
  conflict_strategy: "skip",
  initiated_by_uuid: current_user.uuid
})

delete_transfer(transfer)

Deletes a transfer.

Examples

{:ok, transfer} = Transfers.delete_transfer(transfer)

deny_transfer(transfer, admin_user_uuid, reason \\ nil)

@spec deny_transfer(
  PhoenixKit.Modules.Sync.Transfer.t(),
  String.t(),
  String.t() | nil
) ::
  {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}

Denies a pending transfer.

Parameters

  • transfer - The transfer to deny
  • admin_user_uuid - The user ID denying the transfer
  • reason - Optional reason for denial

Examples

{:ok, transfer} = Transfers.deny_transfer(transfer, current_user.uuid, "Data too sensitive")

expire_pending_approvals()

@spec expire_pending_approvals() :: {non_neg_integer(), nil | term()}

Expires pending approval requests that have timed out.

Returns the number of transfers expired.

Examples

{count, nil} = Transfers.expire_pending_approvals()
IO.puts("Expired #{count} approval requests")

fail_transfer(transfer, error_message)

Marks a transfer as failed.

Parameters

  • transfer - The transfer to fail
  • error_message - Description of the failure

Examples

{:ok, transfer} = Transfers.fail_transfer(transfer, "Connection timeout")

get_transfer(uuid)

@spec get_transfer(String.t()) :: PhoenixKit.Modules.Sync.Transfer.t() | nil

Gets a transfer by UUID.

Accepts:

  • UUID string: get_transfer("01234567-89ab-cdef-0123-456789abcdef")

get_transfer!(id)

@spec get_transfer!(integer() | String.t()) :: PhoenixKit.Modules.Sync.Transfer.t()

Gets a transfer by ID or UUID, raising if not found.

Accepts same inputs as get_transfer/1.

get_transfer_with_preloads(id, opts \\ [])

@spec get_transfer_with_preloads(
  integer() | String.t(),
  keyword()
) :: PhoenixKit.Modules.Sync.Transfer.t() | nil

Gets a transfer by ID with associations preloaded.

Options

  • :preload - List of associations to preload

Examples

transfer = Transfers.get_transfer_with_preloads(123, [:connection])

list_pending_approvals(opts \\ [])

@spec list_pending_approvals(keyword()) :: [PhoenixKit.Modules.Sync.Transfer.t()]

Lists transfers pending approval.

Options

  • :connection_uuid - Filter by connection UUID
  • :table_name - Filter by table name
  • :limit - Maximum results
  • :preload - Associations to preload

Examples

pending = Transfers.list_pending_approvals(connection_uuid: "019...")

list_transfers(opts \\ [])

@spec list_transfers(keyword()) :: [PhoenixKit.Modules.Sync.Transfer.t()]

Lists transfers with optional filters.

Options

  • :direction - Filter by direction ("send" or "receive")
  • :status - Filter by status or list of statuses
  • :connection_uuid - Filter by connection UUID
  • :table_name - Filter by table name
  • :requires_approval - Filter by approval requirement
  • :from - Filter by inserted_at >= date
  • :to - Filter by inserted_at <= date
  • :limit - Maximum results
  • :offset - Number of results to skip
  • :preload - Associations to preload
  • :order - Order direction (:asc or :desc, default :desc)

Examples

# List all pending approvals
transfers = Transfers.list_transfers(status: "pending_approval", requires_approval: true)

# List recent transfers for a connection
transfers = Transfers.list_transfers(connection_uuid: "019...", limit: 10)

# List transfers within date range
transfers = Transfers.list_transfers(from: ~U[2025-01-01 00:00:00Z], to: ~U[2025-12-31 23:59:59Z])

recent_transfers(limit \\ 10)

@spec recent_transfers(non_neg_integer()) :: [PhoenixKit.Modules.Sync.Transfer.t()]

Gets recent transfers for display.

Parameters

  • limit - Number of transfers to return (default: 10)

Examples

recent = Transfers.recent_transfers(5)

request_approval(transfer, expires_in_hours \\ 24)

Requests approval for a transfer.

Sets the transfer to "pending_approval" status with an expiration time.

Parameters

  • transfer - The transfer requiring approval
  • expires_in_hours - Hours until approval expires (default: 24)

Examples

{:ok, transfer} = Transfers.request_approval(transfer, 48)

start_transfer(transfer)

@spec start_transfer(PhoenixKit.Modules.Sync.Transfer.t()) ::
  {:ok, PhoenixKit.Modules.Sync.Transfer.t()}
  | {:error, :cannot_start | Ecto.Changeset.t()}

Starts a transfer (changes status to "in_progress").

Only transfers that can be started (pending without approval, or approved) will be updated.

Examples

{:ok, transfer} = Transfers.start_transfer(transfer)

table_stats(opts \\ [])

@spec table_stats(keyword()) :: [map()]

Gets transfer statistics grouped by table.

Options

  • :direction - Filter by direction
  • :connection_uuid - Filter by connection UUID
  • :from - Start date
  • :to - End date

Examples

stats = Transfers.table_stats(direction: "receive")
# => [%{table_name: "users", count: 10, records: 5000}, ...]

update_progress(transfer, attrs)

@spec update_progress(PhoenixKit.Modules.Sync.Transfer.t(), map()) ::
  {:ok, PhoenixKit.Modules.Sync.Transfer.t()} | {:error, Ecto.Changeset.t()}

Updates transfer progress.

Parameters

  • transfer - The transfer to update
  • attrs - Progress attributes:
    • :records_transferred - Total records transferred so far
    • :records_created - New records created
    • :records_updated - Existing records updated
    • :records_skipped - Records skipped (conflicts)
    • :records_failed - Records that failed
    • :bytes_transferred - Total bytes transferred

Examples

{:ok, transfer} = Transfers.update_progress(transfer, %{
  records_transferred: 100,
  records_created: 95,
  records_skipped: 5
})