PhoenixKit.Modules.Sync.Workers.ImportWorker (phoenix_kit v1.7.42)

Copy Markdown View Source

Oban worker for background data import from DB Sync.

This worker handles the actual import of records received from a sender, processing them in the background so the user doesn't have to wait.

Job Arguments

  • table - The table name to import into
  • records - List of records to import (JSON-serialized)
  • strategy - Conflict resolution strategy ("skip", "overwrite", "merge")
  • session_code - The sync session code (for tracking)
  • batch_index - Optional batch index for large transfers

Usage

The Receiver LiveView enqueues jobs after receiving data:

ImportWorker.new(%{
  table: "users",
  records: records,
  strategy: "skip",
  session_code: "ABC12345"
})
|> Oban.insert()

Queue Configuration

Add the sync queue to your Oban config:

config :my_app, Oban,
  queues: [default: 10, sync: 5]

Summary

Functions

Creates a new import job for the specified table and records.

Enqueues import jobs for multiple tables.

Functions

create_job(table, records, strategy, session_code, opts \\ [])

@spec create_job(String.t(), [map()], atom() | String.t(), String.t(), keyword()) ::
  Oban.Job.changeset()

Creates a new import job for the specified table and records.

Parameters

  • table - Table name to import into
  • records - List of record maps
  • strategy - Conflict strategy (atom or string)
  • session_code - Transfer session code for tracking
  • opts - Additional options:
    • :batch_index - Index of the batch for multi-batch imports
    • :schema - Table schema definition (for auto-creating missing tables)

Returns

An Oban.Job changeset ready for insertion.

enqueue_imports(table_data, session_code, batch_size \\ 500)

@spec enqueue_imports(map(), String.t(), pos_integer()) ::
  {:ok, non_neg_integer()} | {:error, term()}

Enqueues import jobs for multiple tables.

Splits large record sets into batches to avoid memory issues.

Parameters

  • table_data - Map of table name to one of:
    • {records, strategy} - Records and strategy without schema
    • {records, strategy, schema} - Records, strategy, and schema for auto-creating tables
  • session_code - Transfer session code
  • batch_size - Maximum records per job (default: 500)

Returns

  • {:ok, job_count} - Number of jobs enqueued
  • {:error, reason} - If any job failed to enqueue