Oban worker for background data import from DB Sync.
This worker handles the actual import of records received from a sender, processing them in the background so the user doesn't have to wait.
Job Arguments
table- The table name to import intorecords- List of records to import (JSON-serialized)strategy- Conflict resolution strategy ("skip", "overwrite", "merge")session_code- The sync session code (for tracking)batch_index- Optional batch index for large transfers
Usage
The Receiver LiveView enqueues jobs after receiving data:
ImportWorker.new(%{
table: "users",
records: records,
strategy: "skip",
session_code: "ABC12345"
})
|> Oban.insert()Queue Configuration
Add the sync queue to your Oban config:
config :my_app, Oban,
queues: [default: 10, sync: 5]
Summary
Functions
Creates a new import job for the specified table and records.
Enqueues import jobs for multiple tables.
Functions
@spec create_job(String.t(), [map()], atom() | String.t(), String.t(), keyword()) :: Oban.Job.changeset()
Creates a new import job for the specified table and records.
Parameters
table- Table name to import intorecords- List of record mapsstrategy- Conflict strategy (atom or string)session_code- Transfer session code for trackingopts- Additional options::batch_index- Index of the batch for multi-batch imports:schema- Table schema definition (for auto-creating missing tables)
Returns
An Oban.Job changeset ready for insertion.
@spec enqueue_imports(map(), String.t(), pos_integer()) :: {:ok, non_neg_integer()} | {:error, term()}
Enqueues import jobs for multiple tables.
Splits large record sets into batches to avoid memory issues.
Parameters
table_data- Map of table name to one of:{records, strategy}- Records and strategy without schema{records, strategy, schema}- Records, strategy, and schema for auto-creating tables
session_code- Transfer session codebatch_size- Maximum records per job (default: 500)
Returns
{:ok, job_count}- Number of jobs enqueued{:error, reason}- If any job failed to enqueue