Libp2p.Yamux.Session (libp2p_elixir v0.9.0)
Manages a Yamux multiplexing session.
Yamux allows multiple concurrent streams to be multiplexed over a single reliable connection. This module implements the session management, frame parsing, and flow control.
Framing
Every message in Yamux is prefixed with a 12-byte header containing:
- Version: (Always 0).
- Type: The message type (Data, WindowUpdate, Ping, GoAway).
- Flags: Modifiers like SYN (new stream), ACK (accept stream), FIN (close stream), RST (reset).
- StreamID: The identifier for the logical stream (0 is reserved for the session).
- Length: Payload length or control value.
Flow Control
Yamux uses a reliable credit-based flow control system to prevent fast senders from overwhelming receivers.
- Each stream starts with a receiving window of 256KB.
- As data is consumed,
WindowUpdateframes are sent to the peer to grant more sending credit. - If the window is exhausted, the sender must pause until an update is received.
Stream IDs
To avoid collisions, clients (initiators) use odd stream IDs, and servers (listeners) use even stream IDs.
This is a pure(ish) state machine that:
- consumes inbound bytes -> frames -> events
- produces outbound frames for stream open/ack/data/close
It intentionally implements only what we need to bootstrap higher layers.
Summary
Functions
Half-close (FIN) a stream. Returns {out_bytes, st2}.
Feed inbound bytes. Returns {events, out_bytes, st2}.
Open an outbound stream. Returns {stream_id, out_bytes, session2}.
Open an outbound stream and send initial data in the SYN frame.
Hard reset a stream. Returns {out_bytes, st2}.
Send data on an open stream. Returns {out_bytes, st2}.
Types
@type event() :: {:stream_open, non_neg_integer()} | {:stream_data, non_neg_integer(), binary()} | {:stream_close, non_neg_integer()} | {:stream_reset, non_neg_integer()} | {:go_away, non_neg_integer()}
@type t() :: %Libp2p.Yamux.Session{ buffer: binary(), next_stream_id: non_neg_integer(), role: :client | :server, streams: %{required(non_neg_integer()) => map()} }
Functions
@spec close_stream(t(), non_neg_integer()) :: {binary(), t()}
Half-close (FIN) a stream. Returns {out_bytes, st2}.
Feed inbound bytes. Returns {events, out_bytes, st2}.
@spec new(:client | :server) :: t()
@spec open_stream(t()) :: {non_neg_integer(), binary(), t()}
Open an outbound stream. Returns {stream_id, out_bytes, session2}.
@spec open_stream_with_data(t(), binary()) :: {non_neg_integer(), binary(), t()}
Open an outbound stream and send initial data in the SYN frame.
Some peers appear to be strict about receiving the first payload alongside SYN.
Returns {stream_id, out_bytes, session2}.
@spec reset_stream(t(), non_neg_integer()) :: {binary(), t()}
Hard reset a stream. Returns {out_bytes, st2}.
@spec send_data(t(), non_neg_integer(), binary()) :: {binary(), t()}
Send data on an open stream. Returns {out_bytes, st2}.