TdsCdc (tds_cdc v0.1.0)
Copy MarkdownChange Data Capture for SQL Server databases using the TDS protocol.
TdsCdc allows you to capture row-level changes (INSERT, UPDATE, DELETE) from SQL Server tables that have CDC enabled. It polls the CDC change tables on a configurable interval and publishes changes to subscribers.
Usage with direct TDS connection
{:ok, pid} = TdsCdc.start_link(
conn: [hostname: "localhost", username: "sa", password: "pass", database: "mydb"],
capture_instances: ["dbo_Users"],
poll_interval: 1_000
)
TdsCdc.subscribe("dbo_Users")
# Changes will be sent as messages:
# {:tds_cdc_change, "dbo_Users", %TdsCdc.Change{...}}Usage with Ecto.Repo
{:ok, pid} = TdsCdc.start_link(
repo: MyApp.Repo,
capture_instances: ["dbo_Users"],
poll_interval: 1_000
)
TdsCdc.subscribe("dbo_Users")Gap detection
If a subscriber falls behind and SQL Server's CDC retention has purged
old change data, TdsCdc will detect the gap and send a
{:tds_cdc_gap_detected, capture_instance, old_lsn, min_lsn} message
to all subscribers before resetting the LSN position to the current
minimum. This allows applications to react to potential data loss.
Summary
Functions
Returns the list of active capture instances being tracked.
Checks if CDC is enabled on the database for the given connection.
Returns the current LSN position for the given capture instance.
Lists all CDC capture instances available in the database.
Starts a CDC client process linked to the calling process.
Starts a CDC client as part of a supervision tree.
Stops the CDC client.
Subscribes the calling process to change events for the given capture instance.
Unsubscribes the calling process from change events for the given capture instance.
Waits until the CDC client has initialized its LSN positions and is ready to receive subscriptions, or until the timeout expires.
Types
@type conn_opts() :: keyword()
@type start_opts() :: [ conn: conn_opts(), repo: module(), capture_instances: [String.t()], poll_interval: non_neg_integer() ]
Functions
@spec capture_instances() :: [String.t()]
Returns the list of active capture instances being tracked.
@spec cdc_enabled?(GenServer.server() | module()) :: {:ok, boolean()} | {:error, term()}
Checks if CDC is enabled on the database for the given connection.
Accepts either a TDS connection pid or an Ecto.Repo module.
Examples
# With TDS connection
{:ok, conn} = Tds.start_link(conn_opts)
{:ok, true} = TdsCdc.cdc_enabled?(conn)
# With Ecto.Repo
{:ok, true} = TdsCdc.cdc_enabled?(MyApp.Repo)
Returns the current LSN position for the given capture instance.
@spec list_capture_instances(GenServer.server() | module()) :: {:ok, [String.t()]} | {:error, term()}
Lists all CDC capture instances available in the database.
Accepts either a TDS connection pid or an Ecto.Repo module.
Examples
{:ok, instances} = TdsCdc.list_capture_instances(conn)
{:ok, instances} = TdsCdc.list_capture_instances(MyApp.Repo)
@spec start_link(start_opts()) :: GenServer.on_start()
Starts a CDC client process linked to the calling process.
Options
:conn- TDS connection options (required if not using:repo). SeeTdsmodule for details.:repo- An Ecto.Repo module (required if not using:conn). Must use TDS adapter.:capture_instances- List of CDC capture instance names to track (required).:poll_interval- Interval in ms to poll for changes (default: 1000).:name- GenServer name registration (default:TdsCdc.Client).
Examples
# With TDS connection
{:ok, pid} = TdsCdc.start_link(
conn: [hostname: "localhost", username: "sa", password: "pass", database: "mydb"],
capture_instances: ["dbo_Users"]
)
# With Ecto.Repo
{:ok, pid} = TdsCdc.start_link(
repo: MyApp.Repo,
capture_instances: ["dbo_Users"]
)
@spec start_link(start_opts(), GenServer.server()) :: GenServer.on_start()
Starts a CDC client as part of a supervision tree.
@spec stop(GenServer.server()) :: :ok
Stops the CDC client.
Subscribes the calling process to change events for the given capture instance.
Unsubscribes the calling process from change events for the given capture instance.
@spec wait_for_ready(keyword()) :: :ok | {:error, :timeout}
Waits until the CDC client has initialized its LSN positions and is ready to receive subscriptions, or until the timeout expires.
Returns :ok if the client is ready, or {:error, :timeout} if the
timeout expires before the client initializes.
Options
:timeout- Maximum time to wait in ms (default: 5000).:capture_instance- Which capture instance to check (default: checks the first one).
Examples
{:ok, pid} = TdsCdc.start_link(conn: conn_opts, capture_instances: ["dbo_users"])
:ok = TdsCdc.wait_for_ready(timeout: 10_000)
TdsCdc.subscribe("dbo_users")