TdsCdc.Client (tds_cdc v0.1.0)

Copy Markdown

GenServer that manages the connection to SQL Server and polls for CDC changes.

The client maintains a database connection (either via TDS directly or through an Ecto.Repo), tracks LSN positions for each capture instance, and publishes changes to subscribers via message passing.

Connection options

You can use either:

  • :conn - Direct TDS connection options (see Tds.start_link/1)
  • :repo - An existing Ecto.Repo module to use for queries

When using :repo, the Repo must already be started as part of your application's supervision tree. TdsCdc will not start or stop the Repo.

Summary

Functions

Returns the list of active capture instances being tracked.

Returns a specification to start this module under a supervisor.

Returns the current LSN position for a capture instance.

Starts a CDC client linked to the calling process.

Stops the CDC client.

Subscribes the calling process to changes for a capture instance.

Unsubscribes the calling process from changes for a capture instance.

Types

state()

@type state() :: %TdsCdc.Client{
  adapter: term(),
  capture_instances: term(),
  conn: term(),
  conn_opts: term(),
  lsn_positions: term(),
  owns_conn?: term(),
  persistence: term(),
  poll_interval: term(),
  repo: term(),
  subscribers: term(),
  timer_ref: term()
}

Functions

capture_instances()

@spec capture_instances() :: [String.t()]

Returns the list of active capture instances being tracked.

capture_instances(server)

@spec capture_instances(GenServer.server()) :: [String.t()]

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

current_lsn(capture_instance)

@spec current_lsn(String.t()) :: {:ok, String.t()} | {:error, term()}

Returns the current LSN position for a capture instance.

current_lsn(server, capture_instance)

@spec current_lsn(GenServer.server(), String.t()) ::
  {:ok, String.t()} | {:error, term()}

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a CDC client linked to the calling process.

Options

  • :conn - TDS connection options. Mutually exclusive with :repo.
  • :repo - An Ecto.Repo module. Mutually exclusive with :conn.
  • :capture_instances - List of CDC capture instance names to track (required).
  • :poll_interval - Interval in ms to poll for changes (default: 1000).
  • :name - GenServer name registration (default: TdsCdc.Client).
  • :persistence - How to persist LSN positions across restarts. Can be {module, opts} tuple or nil to disable. Default: {TdsCdc.Persistence.File, []}.

Examples

# With TDS connection
{:ok, pid} = TdsCdc.start_link(
  conn: [hostname: "localhost", username: "sa", password: "pass", database: "mydb"],
  capture_instances: ["dbo_users"]
)

# With Ecto.Repo
{:ok, pid} = TdsCdc.start_link(
  repo: MyApp.Repo,
  capture_instances: ["dbo_users"]
)

# With custom persistence path
{:ok, pid} = TdsCdc.start_link(
  conn: [...],
  capture_instances: ["dbo_users"],
  persistence: {TdsCdc.Persistence.File, path: "/var/lib/myapp/lsn"}
)

# Disable persistence (positions lost on restart)
{:ok, pid} = TdsCdc.start_link(
  conn: [...],
  capture_instances: ["dbo_users"],
  persistence: nil
)

start_link(opts, name)

@spec start_link(
  keyword(),
  GenServer.server()
) :: GenServer.on_start()

stop(server \\ __MODULE__)

@spec stop(GenServer.server()) :: :ok

Stops the CDC client.

subscribe(capture_instance)

@spec subscribe(String.t()) :: :ok | {:error, term()}

Subscribes the calling process to changes for a capture instance.

subscribe(server, capture_instance)

@spec subscribe(GenServer.server(), String.t()) :: :ok | {:error, term()}

unsubscribe(capture_instance)

@spec unsubscribe(String.t()) :: :ok | {:error, term()}

Unsubscribes the calling process from changes for a capture instance.

unsubscribe(server, capture_instance)

@spec unsubscribe(GenServer.server(), String.t()) :: :ok | {:error, term()}