Module epgsql_sock

GenServer holding all the connection state (including socket).

See also: epgsql_cmd_connect for network connection and authentication setup.

Description

GenServer holding all the connection state (including socket).

See https://www.postgresql.org/docs/current/static/protocol-flow.html

Commands in PostgreSQL protocol are pipelined: you don't have to wait for reply to be able to send next command. Commands are processed (and responses to them are generated) in FIFO order. eg, if you execute 2 SimpleQuery: #1 and #2, first you get all response packets for #1 and then all for #2:
   > SQuery #1
   > SQuery #2
   < RowDescription #1
   < DataRow #1.1
   < ...
   < DataRow #1.N
   < CommandComplete #1
   < RowDescription #2
   < DataRow #2.1
   < ...
   < DataRow #2.N
   < CommandComplete #2

epgsql_sock is capable of utilizing the pipelining feature - as soon as it receives a new command, it sends it to the server immediately and then it puts command's callbacks and state into internal queue of all the commands which were sent to the server and waiting for response. So it knows in which order it should call each pipelined command's handle_message callback. But it can be easily broken if high-level command is poorly implemented or some conflicting low-level commands (such as parse, bind, execute) are executed in a wrong order. In this case server and epgsql states become out of sync and epgsql_cmd_sync have to be executed in order to recover.

epgsql_cmd_copy_from_stdin and epgsql_cmd_start_replication switches the "state machine" of connection process to a special "COPY mode" subprotocol. See https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY.

Data Types

copy_state()

copy_state() = #copy{initiator = pid(), last_error = undefined | epgsql:query_error(), format = binary | text, binary_types = [epgsql:epgsql_type()] | undefined}

error()

error() = {error, sync_required | closed | sock_closed | sock_error}

pg_sock()

abstract datatype: pg_sock()

repl_state()

repl_state() = #repl{last_received_lsn = integer() | undefined, last_flushed_lsn = integer() | undefined, last_applied_lsn = integer() | undefined, feedback_required = boolean() | undefined, cbmodule = module() | undefined, cbstate = any() | undefined, receiver = pid() | undefined, align_lsn = boolean() | undefined}

tcp_socket()

tcp_socket() = gen_tcp:socket()

transport()

transport() = {call, any()} | {cast, pid(), reference()} | {incremental, pid(), reference()}

Function Index

activate/1
async_command/4
cancel/1
close/1
code_change/3
copy_send_rows/3
format_status/1
format_status/2
get_backend_pid/1
get_cmd_status/1
get_codec/1
get_parameter/2
get_parameter_internal/2
get_results/1
get_rows/1
get_subproto_state/1
handle_call/3
handle_cast/2
handle_info/2
init/1
init_replication_state/1
notify/2
on_copy_from_stdin/3Handle "copy subprotocol" for COPY ..
on_message/3
on_replication/3
send/2
send/3
send_multi/2
set_attr/3
set_net_socket/3
set_notice_receiver/2
set_packet_handler/2
standby_status_update/3
start_link/0
sync_command/3
terminate/2

Function Details

activate/1

activate(C::epgsql:connection()) -> ok | {error, inet:posix() | any()}

async_command/4

async_command(C::epgsql:connection(), Transport::cast | incremental, Command::epgsql_command:command(), Args::any()) -> reference()

cancel/1

cancel(S) -> any()

close/1

close(C) -> any()

code_change/3

code_change(OldVsn, State, Extra) -> any()

copy_send_rows/3

copy_send_rows(C, Rows, Timeout) -> any()

format_status/1

format_status(Status) -> any()

format_status/2

format_status(X1, X2) -> any()

get_backend_pid/1

get_backend_pid(C::epgsql:connection()) -> integer()

get_cmd_status/1

get_cmd_status(C) -> any()

get_codec/1

get_codec(State::pg_sock()) -> epgsql_binary:codec()

get_parameter/2

get_parameter(C, Name) -> any()

get_parameter_internal/2

get_parameter_internal(Name::binary(), State::pg_sock()) -> binary() | undefined

get_results/1

get_results(State::pg_sock()) -> [any()]

get_rows/1

get_rows(State::pg_sock()) -> [tuple()]

get_subproto_state/1

get_subproto_state(State::pg_sock()) -> repl_state() | copy_state() | undefined

handle_call/3

handle_call(X1, From, State) -> any()

handle_cast/2

handle_cast(X1, State) -> any()

handle_info/2

handle_info(X1, State) -> any()

init/1

init(X1) -> any()

init_replication_state/1

init_replication_state(State::pg_sock()) -> pg_sock()

notify/2

notify(State, Notice) -> any()

on_copy_from_stdin/3

on_copy_from_stdin(M, Err, State) -> any()

Handle "copy subprotocol" for COPY .. FROM STDIN

Activated by epgsql_cmd_copy_from_stdin, deactivated by epgsql_cmd_copy_done or error

on_message/3

on_message(Msg, Bin, State) -> any()

on_replication/3

on_replication(M, Err, State) -> any()

send/2

send(State::pg_sock(), Data::iodata()) -> ok | {error, any()}

send/3

send(State::pg_sock(), Type::epgsql_wire:packet_type(), Data::iodata()) -> ok | {error, any()}

send_multi/2

send_multi(State::pg_sock(), List::[{epgsql_wire:packet_type(), iodata()}]) -> ok | {error, any()}

set_attr/3

set_attr(X1::atom(), Backend::any(), State::pg_sock()) -> pg_sock()

set_net_socket/3

set_net_socket(Mod::gen_tcp | ssl, Socket::tcp_socket() | ssl:sslsocket(), State::pg_sock()) -> pg_sock()

set_notice_receiver/2

set_notice_receiver(C, PidOrName) -> any()

set_packet_handler/2

set_packet_handler(Handler::atom(), State0::pg_sock()) -> pg_sock()

standby_status_update/3

standby_status_update(C, FlushedLSN, AppliedLSN) -> any()

start_link/0

start_link() -> any()

sync_command/3

sync_command(C::epgsql:connection(), Command::epgsql_command:command(), Args::any()) -> any()

terminate/2

terminate(Reason, State) -> any()


Generated by EDoc