TdsCdc.Listener behaviour (tds_cdc v0.1.0)

Copy Markdown

Behaviour for structured CDC event listeners.

Use this module to create a listener that follows a defined workflow, with mandatory handling of each change operation type via callbacks.

Usage

defmodule MyApp.CdcListener do
  use TdsCdc.Listener

  @impl true
  def on_init(_opts) do
    {:ok, %{inserts: 0, updates: 0, deletes: 0}}
  end

  @impl true
  def on_insert(change, state) do
    IO.puts("New record: #{inspect(change.data)}")
    {:ok, %{state | inserts: state.inserts + 1}}
  end

  @impl true
  def on_update(change, state) do
    IO.puts("Updated: #{inspect(change.data)}")
    {:ok, %{state | updates: state.updates + 1}}
  end

  @impl true
  def on_delete(change, state) do
    IO.puts("Deleted: #{inspect(change.data)}")
    {:ok, %{state | deletes: state.deletes + 1}}
  end

  @impl true
  def on_gap(ci, old_lsn, min_lsn, state) do
    Logger.warning("Gap detected in #{ci}")
    {:ok, state}
  end
end

Then add to your supervision tree:

children = [
  {MyApp.CdcListener, conn: [hostname: "localhost", ...], capture_instances: ["dbo_users"]}
]

Or start manually:

{:ok, pid} = MyApp.CdcListener.start_link(
  conn: [hostname: "localhost", username: "sa", password: "pass", database: "mydb"],
  capture_instances: ["dbo_users"]
)

Connection options

Same as TdsCdc.Client.start_link/1:

  • :conn - Direct TDS connection options
  • :repo - An existing Ecto.Repo module
  • :capture_instances - List of CDC capture instance names (required)
  • :poll_interval - Polling interval in ms (default: 1000)
  • :name - GenServer name registration (default: module name)

Callbacks

All callbacks are optional and have default implementations. Returning {:ok, state} continues the listener. Returning {:stop, reason} stops the listener process.

Summary

Callbacks

Called when a DELETE change is received.

Called when a CDC gap is detected.

Called when the listener starts, after CDC subscription is established.

Called when an INSERT change is received.

Called when the listener process is about to terminate.

Called when an UPDATE change is received.

Types

reason()

@type reason() :: term()

state()

@type state() :: term()

Callbacks

on_delete(change, state)

(optional)
@callback on_delete(change :: TdsCdc.Change.t(), state :: state()) ::
  {:ok, state()} | {:stop, reason()}

Called when a DELETE change is received.

The change argument is a %TdsCdc.Change{} struct with operation: :delete.

on_gap(capture_instance, old_lsn, min_lsn, state)

(optional)
@callback on_gap(
  capture_instance :: String.t(),
  old_lsn :: binary(),
  min_lsn :: binary(),
  state :: state()
) :: {:ok, state()} | {:stop, reason()}

Called when a CDC gap is detected.

This happens when the stored LSN position falls behind the minimum available LSN in CDC tables, meaning some changes were lost due to retention cleanup.

  • capture_instance - The capture instance where the gap was detected.
  • old_lsn - The LSN that was stored (now too old).
  • min_lsn - The new minimum LSN (position will be reset to this).

on_init(opts)

(optional)
@callback on_init(opts :: keyword()) :: {:ok, state()} | {:stop, reason()}

Called when the listener starts, after CDC subscription is established.

Use this to initialize your listener state. Receives the full options keyword list passed to start_link/1.

Return {:ok, state} to continue, or {:stop, reason} to stop.

on_insert(change, state)

(optional)
@callback on_insert(change :: TdsCdc.Change.t(), state :: state()) ::
  {:ok, state()} | {:stop, reason()}

Called when an INSERT change is received.

The change argument is a %TdsCdc.Change{} struct with operation: :insert.

on_terminate(reason, state)

(optional)
@callback on_terminate(reason :: term(), state :: term()) :: term()

Called when the listener process is about to terminate.

Use this for cleanup. The return value is ignored.

on_update(change, state)

(optional)
@callback on_update(change :: TdsCdc.Change.t(), state :: state()) ::
  {:ok, state()} | {:stop, reason()}

Called when an UPDATE change is received.

The change argument is a %TdsCdc.Change{} struct with operation: :update. Note: CDC produces two rows per update (before-image with operation=3, after-image with operation=4). Both are mapped to :update and delivered separately.