# `Electric.Postgres.ReplicationConnection`
[🔗](https://github.com/electric-sql/electric/tree/%40core/sync-service%401.6.2/packages/sync-service/lib/electric/postgres/replication_connection.ex#L16)

Vendored and extended version of `Postgrex.ReplicationConnection`.

Adds socket pause/resume support so that the callback module can apply
backpressure (stop reading WAL data) while remaining responsive to
non-socket messages such as keepalive timers.

## Extensions over upstream

Two new return types are available from `handle_data/2` and `handle_info/2`:

  * `{:noreply_and_pause, ack, state}` — send any ack messages to PostgreSQL,
    then pause socket reads. The gen_statem will still process `handle_info`
    and `handle_call` messages (e.g. keepalive timers), but no further
    `handle_data` callbacks will fire until the socket is resumed.

  * `{:noreply_and_resume, ack, state}` — send ack messages, resume socket
    reads, and process any buffered data that arrived while paused.

When paused, TCP/SSL data that arrives on the socket is buffered (at most
one Erlang message, since the socket uses `{active, :once}`). This provides
natural TCP-level backpressure: once the kernel receive buffer fills,
PostgreSQL's walsender blocks on write.

The callback module is expected to send periodic `StandbyStatusUpdate`
messages via `{:noreply, [encoded_msg], state}` from a `handle_info`
timer callback to prevent PostgreSQL's `wal_sender_timeout` from firing.

# `ack`

```elixir
@type ack() :: iodata()
```

# `query`

```elixir
@type query() :: iodata()
```

# `query_opts`

```elixir
@type query_opts() :: [{:timeout, timeout()}]
```

# `reason`

```elixir
@type reason() :: String.t()
```

# `server`

```elixir
@type server() :: :gen_statem.server_ref()
```

# `state`

```elixir
@type state() :: term()
```

# `stream_opts`

```elixir
@type stream_opts() :: [{:max_messages, pos_integer()}]
```

# `handle_call`
*optional* 

```elixir
@callback handle_call(term(), :gen_statem.from(), state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}
```

# `handle_connect`
*optional* 

```elixir
@callback handle_connect(state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}
```

# `handle_data`
*optional* 

```elixir
@callback handle_data(binary() | :done, state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:noreply_and_pause, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}
```

# `handle_disconnect`
*optional* 

```elixir
@callback handle_disconnect(state()) :: {:noreply, state()}
```

# `handle_info`
*optional* 

```elixir
@callback handle_info(term(), state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:noreply_and_resume, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}
```

# `handle_result`
*optional* 

```elixir
@callback handle_result([Postgrex.Result.t()] | Postgrex.Error.t(), state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}
```

# `init`

```elixir
@callback init(term()) :: {:ok, state()}
```

# `call`

# `decode_lsn`

```elixir
@spec decode_lsn(String.t()) :: {:ok, integer()} | :error
```

# `encode_lsn`

```elixir
@spec encode_lsn(integer()) :: {:ok, String.t()} | :error
```

# `reply`

# `start_link`

```elixir
@spec start_link(module(), term(), Keyword.t()) ::
  {:ok, pid()} | {:error, Postgrex.Error.t() | term()}
```

---

*Consult [api-reference.md](api-reference.md) for complete listing*
