Electric.Postgres.ReplicationConnection behaviour (electric v1.6.2)

Copy Markdown View Source

Vendored and extended version of Postgrex.ReplicationConnection.

Adds socket pause/resume support so that the callback module can apply backpressure (stop reading WAL data) while remaining responsive to non-socket messages such as keepalive timers.

Extensions over upstream

Two new return types are available from handle_data/2 and handle_info/2:

  • {:noreply_and_pause, ack, state} — send any ack messages to PostgreSQL, then pause socket reads. The gen_statem will still process handle_info and handle_call messages (e.g. keepalive timers), but no further handle_data callbacks will fire until the socket is resumed.

  • {:noreply_and_resume, ack, state} — send ack messages, resume socket reads, and process any buffered data that arrived while paused.

When paused, TCP/SSL data that arrives on the socket is buffered (at most one Erlang message, since the socket uses {active, :once}). This provides natural TCP-level backpressure: once the kernel receive buffer fills, PostgreSQL's walsender blocks on write.

The callback module is expected to send periodic StandbyStatusUpdate messages via {:noreply, [encoded_msg], state} from a handle_info timer callback to prevent PostgreSQL's wal_sender_timeout from firing.

Summary

Types

ack()

@type ack() :: iodata()

query()

@type query() :: iodata()

query_opts()

@type query_opts() :: [{:timeout, timeout()}]

reason()

@type reason() :: String.t()

server()

@type server() :: :gen_statem.server_ref()

state()

@type state() :: term()

stream_opts()

@type stream_opts() :: [{:max_messages, pos_integer()}]

Callbacks

handle_call(term, from, state)

(optional)
@callback handle_call(term(), :gen_statem.from(), state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}

handle_connect(state)

(optional)
@callback handle_connect(state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}

handle_data(arg1, state)

(optional)
@callback handle_data(binary() | :done, state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:noreply_and_pause, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}

handle_disconnect(state)

(optional)
@callback handle_disconnect(state()) :: {:noreply, state()}

handle_info(term, state)

(optional)
@callback handle_info(term(), state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:noreply_and_resume, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}

handle_result(arg1, state)

(optional)
@callback handle_result([Postgrex.Result.t()] | Postgrex.Error.t(), state()) ::
  {:noreply, state()}
  | {:noreply, ack(), state()}
  | {:query, query(), state()}
  | {:query, query(), query_opts(), state()}
  | {:stream, query(), stream_opts(), state()}
  | {:disconnect, reason()}

init(term)

@callback init(term()) :: {:ok, state()}

Functions

call(server, message, timeout \\ 5000)

decode_lsn(lsn)

@spec decode_lsn(String.t()) :: {:ok, integer()} | :error

encode_lsn(lsn)

@spec encode_lsn(integer()) :: {:ok, String.t()} | :error

reply(client, reply)

See :gen_statem.reply/2.

start_link(module, arg, opts)

@spec start_link(module(), term(), Keyword.t()) ::
  {:ok, pid()} | {:error, Postgrex.Error.t() | term()}