quic_dist_controller (quic v1.3.1)

View Source

Per-connection distribution controller.

This module manages a single distribution connection over QUIC, handling:

- Control stream (stream 0) for handshake and tick messages - Data stream pool for distribution messages - Message framing with length prefixes - Tick handling for connection liveness - Stream prioritization

Stream Layout

Stream 0: Control (urgency 0) - Distribution handshake messages - Tick messages - Link/monitor signals

Streams 4,8,12...: Data (urgency 4-6) - Regular distribution messages - Round-robin scheduling

Summary

Functions

Register to accept incoming user streams. The controller auto-assigns ownership of each new incoming stream to the registered acceptor and delivers data directly as {quic_dist_stream, StreamRef, {data, Data, Fin}} messages. No prior {incoming, StreamId} handshake.

Transfer stream ownership to another process.

Get address information for the connection.

Get the other node name.

Get low-level controller (self).

Get connection statistics.

List all user streams. Returns a list of stream info maps.

Open a user stream for application use. Returns {ok, StreamId} on success.

Open a user stream with options. Options: {priority, 16..255} - Stream priority (default: 128, lower = higher priority)

Pre-nodeup callback - sends dist_ctrlr message to kernel. Called by f_setopts_pre_nodeup. SetupPid is the calling process.

Receive data from the control stream.

Reset/cancel a user stream (notifies peer immediately). Uses default error code 0.

Reset/cancel a user stream with a specific error code.

Send data on the control stream.

Send data on a user stream. Fin=true marks the end of data on this stream.

Set the other node name.

Set the supervisor process (kernel).

Start a controller for a QUIC distribution connection.

Stop accepting incoming user streams.

Send a tick message.

Functions

accept_user_streams(Controller, Acceptor)

-spec accept_user_streams(Controller :: pid(), Acceptor :: pid()) -> ok | {error, term()}.

Register to accept incoming user streams. The controller auto-assigns ownership of each new incoming stream to the registered acceptor and delivers data directly as {quic_dist_stream, StreamRef, {data, Data, Fin}} messages. No prior {incoming, StreamId} handshake.

callback_mode()

close_user_stream(Controller, StreamId)

-spec close_user_stream(Controller :: pid(), StreamId :: non_neg_integer()) -> ok | {error, term()}.

Close a user stream.

code_change(OldVsn, StateName, State, Extra)

connected(EventType, OldState, State)

controlling_process(Controller, StreamId, NewOwner)

-spec controlling_process(Controller :: pid(), StreamId :: non_neg_integer(), NewOwner :: pid()) ->
                             ok | {error, term()}.

Transfer stream ownership to another process.

get_address(Controller, Node)

-spec get_address(Controller :: pid(), Node :: node()) ->
                     {ok,
                      #net_address{address :: term(),
                                   host :: term(),
                                   protocol :: term(),
                                   family :: term()}}.

Get address information for the connection.

get_node(Controller)

-spec get_node(Controller :: pid()) -> {ok, node()} | undefined.

Get the other node name.

getll(Controller)

-spec getll(Controller :: pid()) -> {ok, pid()}.

Get low-level controller (self).

getstat(Controller)

-spec getstat(Controller :: pid()) ->
                 {ok,
                  RecvCnt :: non_neg_integer(),
                  SendCnt :: non_neg_integer(),
                  SendPend :: non_neg_integer()}.

Get connection statistics.

handshaking(EventType, OldState, State)

init(_)

init_state(EventType, OldState, State)

list_user_streams(Controller)

-spec list_user_streams(Controller :: pid()) -> [map()].

List all user streams. Returns a list of stream info maps.

open_user_stream(Controller, Owner)

-spec open_user_stream(Controller :: pid(), Owner :: pid()) -> {ok, non_neg_integer()} | {error, term()}.

Open a user stream for application use. Returns {ok, StreamId} on success.

open_user_stream(Controller, Owner, Opts)

-spec open_user_stream(Controller :: pid(), Owner :: pid(), Opts :: list()) ->
                          {ok, non_neg_integer()} | {error, term()}.

Open a user stream with options. Options: {priority, 16..255} - Stream priority (default: 128, lower = higher priority)

pre_nodeup(Controller)

-spec pre_nodeup(Controller :: pid()) -> ok.

Pre-nodeup callback - sends dist_ctrlr message to kernel. Called by f_setopts_pre_nodeup. SetupPid is the calling process.

recv(Controller, Length, Timeout)

-spec recv(Controller :: pid(), Length :: non_neg_integer(), Timeout :: timeout()) ->
              {ok, [byte()]} | {error, term()}.

Receive data from the control stream.

reset_user_stream(Controller, StreamId)

-spec reset_user_stream(Controller :: pid(), StreamId :: non_neg_integer()) -> ok | {error, term()}.

Reset/cancel a user stream (notifies peer immediately). Uses default error code 0.

reset_user_stream(Controller, StreamId, ErrorCode)

-spec reset_user_stream(Controller :: pid(),
                        StreamId :: non_neg_integer(),
                        ErrorCode :: non_neg_integer()) ->
                           ok | {error, term()}.

Reset/cancel a user stream with a specific error code.

send(Controller, Data)

-spec send(Controller :: pid(), Data :: iodata()) -> ok | {error, term()}.

Send data on the control stream.

send_user_data(Controller, StreamId, Data, Fin)

-spec send_user_data(Controller :: pid(),
                     StreamId :: non_neg_integer(),
                     Data :: iodata(),
                     Fin :: boolean()) ->
                        ok | {error, term()}.

Send data on a user stream. Fin=true marks the end of data on this stream.

set_node(Controller, Node)

-spec set_node(Controller :: pid(), Node :: node()) -> ok.

Set the other node name.

set_supervisor(Controller, Supervisor)

-spec set_supervisor(Controller :: pid(), Supervisor :: pid()) -> ok.

Set the supervisor process (kernel).

start_link(Conn, Role)

-spec start_link(Conn :: pid(), Role :: client | server) -> {ok, pid()} | {error, term()}.

Start a controller for a QUIC distribution connection.

stop_accepting_streams(Controller)

-spec stop_accepting_streams(Controller :: pid()) -> ok.

Stop accepting incoming user streams.

terminate(Reason, StateName, State)

tick(Controller)

-spec tick(Controller :: pid()) -> ok.

Send a tick message.