Vendored and extended version of Postgrex.ReplicationConnection.
Adds socket pause/resume support so that the callback module can apply backpressure (stop reading WAL data) while remaining responsive to non-socket messages such as keepalive timers.
Extensions over upstream
Two new return types are available from handle_data/2 and handle_info/2:
{:noreply_and_pause, ack, state}— send any ack messages to PostgreSQL, then pause socket reads. The gen_statem will still processhandle_infoandhandle_callmessages (e.g. keepalive timers), but no furtherhandle_datacallbacks will fire until the socket is resumed.{:noreply_and_resume, ack, state}— send ack messages, resume socket reads, and process any buffered data that arrived while paused.
When paused, TCP/SSL data that arrives on the socket is buffered (at most
one Erlang message, since the socket uses {active, :once}). This provides
natural TCP-level backpressure: once the kernel receive buffer fills,
PostgreSQL's walsender blocks on write.
The callback module is expected to send periodic StandbyStatusUpdate
messages via {:noreply, [encoded_msg], state} from a handle_info
timer callback to prevent PostgreSQL's wal_sender_timeout from firing.
Summary
Types
@type ack() :: iodata()
@type query() :: iodata()
@type query_opts() :: [{:timeout, timeout()}]
@type reason() :: String.t()
@type server() :: :gen_statem.server_ref()
@type state() :: term()
@type stream_opts() :: [{:max_messages, pos_integer()}]
Callbacks
@callback handle_call(term(), :gen_statem.from(), state()) :: {:noreply, state()} | {:noreply, ack(), state()} | {:query, query(), state()} | {:query, query(), query_opts(), state()} | {:stream, query(), stream_opts(), state()} | {:disconnect, reason()}
@callback handle_result([Postgrex.Result.t()] | Postgrex.Error.t(), state()) :: {:noreply, state()} | {:noreply, ack(), state()} | {:query, query(), state()} | {:query, query(), query_opts(), state()} | {:stream, query(), stream_opts(), state()} | {:disconnect, reason()}