Libp2p.Yamux.Session (libp2p_elixir v0.9.0)

Manages a Yamux multiplexing session.

Yamux allows multiple concurrent streams to be multiplexed over a single reliable connection. This module implements the session management, frame parsing, and flow control.

Framing

Every message in Yamux is prefixed with a 12-byte header containing:

  • Version: (Always 0).
  • Type: The message type (Data, WindowUpdate, Ping, GoAway).
  • Flags: Modifiers like SYN (new stream), ACK (accept stream), FIN (close stream), RST (reset).
  • StreamID: The identifier for the logical stream (0 is reserved for the session).
  • Length: Payload length or control value.

Flow Control

Yamux uses a reliable credit-based flow control system to prevent fast senders from overwhelming receivers.

  • Each stream starts with a receiving window of 256KB.
  • As data is consumed, WindowUpdate frames are sent to the peer to grant more sending credit.
  • If the window is exhausted, the sender must pause until an update is received.

Stream IDs

To avoid collisions, clients (initiators) use odd stream IDs, and servers (listeners) use even stream IDs.

This is a pure(ish) state machine that:

  • consumes inbound bytes -> frames -> events
  • produces outbound frames for stream open/ack/data/close

It intentionally implements only what we need to bootstrap higher layers.

Summary

Functions

Half-close (FIN) a stream. Returns {out_bytes, st2}.

Feed inbound bytes. Returns {events, out_bytes, st2}.

Open an outbound stream. Returns {stream_id, out_bytes, session2}.

Open an outbound stream and send initial data in the SYN frame.

Hard reset a stream. Returns {out_bytes, st2}.

Send data on an open stream. Returns {out_bytes, st2}.

Types

event()

@type event() ::
  {:stream_open, non_neg_integer()}
  | {:stream_data, non_neg_integer(), binary()}
  | {:stream_close, non_neg_integer()}
  | {:stream_reset, non_neg_integer()}
  | {:go_away, non_neg_integer()}

t()

@type t() :: %Libp2p.Yamux.Session{
  buffer: binary(),
  next_stream_id: non_neg_integer(),
  role: :client | :server,
  streams: %{required(non_neg_integer()) => map()}
}

Functions

close_stream(st, stream_id)

@spec close_stream(t(), non_neg_integer()) :: {binary(), t()}

Half-close (FIN) a stream. Returns {out_bytes, st2}.

feed(st, bytes)

@spec feed(t(), binary()) :: {[event()], binary(), t()}

Feed inbound bytes. Returns {events, out_bytes, st2}.

new(role)

@spec new(:client | :server) :: t()

open_stream(st)

@spec open_stream(t()) :: {non_neg_integer(), binary(), t()}

Open an outbound stream. Returns {stream_id, out_bytes, session2}.

open_stream_with_data(st, data)

@spec open_stream_with_data(t(), binary()) :: {non_neg_integer(), binary(), t()}

Open an outbound stream and send initial data in the SYN frame.

Some peers appear to be strict about receiving the first payload alongside SYN. Returns {stream_id, out_bytes, session2}.

reset_stream(st, stream_id)

@spec reset_stream(t(), non_neg_integer()) :: {binary(), t()}

Hard reset a stream. Returns {out_bytes, st2}.

send_data(st, stream_id, data)

@spec send_data(t(), non_neg_integer(), binary()) :: {binary(), t()}

Send data on an open stream. Returns {out_bytes, st2}.