barrel_mcp_session (barrel_mcp v2.0.2)
View SourceMCP Session Management.
Provides ETS-based session management for MCP Streamable HTTP transport. Sessions track client connections, protocol versions, and activity.
Summary
Functions
Push a notifications/<kind>/list_changed envelope to every session that has an active SSE channel. Tolerates a missing session manager (e.g. during stdio-only operation).
Cancel an in-flight tool call. Sends {cancel, RequestId} to the worker and {cancelled, RequestId} to the waiter, then drops the entry. Idempotent: a missing entry returns ok.
Cleanup sessions older than TTL milliseconds. Routes through the gen_server (the table owner under the new protected visibility); the handler deletes expired entries inline.
Drop an in-flight entry (called by the waiter after a normal completion).
Create a new session.
Delete a session.
Deliver a JSON-RPC response from the client back to the waiting caller. Called by the HTTP handler when an inbound POST contains a result or error for a server-initiated id.
Send elicitation/create to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared elicitation capability in initialize.
Return SSE events newer than LastId (oldest first), or truncated when LastId is older than the oldest buffered event.
Generate a unique session ID.
Get a session by ID.
Read the current log level for a session. Defaults to info before any logging/setLevel is received.
Look up the negotiated protocol version for a session.
Whether a session declared elicitation capability in its initialize request.
Whether a session declared roots capability in its initialize request.
Whether a session declared sampling capability in its initialize request.
List all sessions.
List session ids whose client declared elicitation capability.
List session ids whose client declared roots capability.
List session ids whose client declared sampling capability.
Numeric priority for the eight RFC 5424 levels (debug=0, emergency=7). Higher = more severe. Used for filtering: a notification at priority N is delivered iff N >= configured level.
Push a notifications/progress envelope to a specific session over its SSE channel. Token is the progressToken the client supplied on the originating request.
Record an in-flight tool call so a later notifications/cancelled can find the worker and waiter.
Append an SSE event to the session's ring buffer for later replay via Last-Event-ID.
Send roots/list to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared roots capability in initialize.
Send sampling/createMessage to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared sampling capability in initialize.
Set the client_capabilities map for a session. Called from the protocol handler after parsing the initialize request.
Set the per-session log level (driven by logging/setLevel). Level is one of the eight RFC 5424 levels accepted by the MCP spec; rejects anything else with {error, invalid_level}.
Record the negotiated protocol version on a session. Called by the HTTP transport after a successful initialize so later requests on the same session can fall back to it when the client omits the MCP-Protocol-Version header.
Configure the maximum number of SSE events buffered per session for replay.
Set the SSE process pid for a session.
Start the session manager.
Subscribe a session to resource updates for a given URI.
Return all session ids that subscribed to a URI.
Update last activity timestamp.
Types
Functions
-spec broadcast_list_changed(handler_type()) -> ok.
Push a notifications/<kind>/list_changed envelope to every session that has an active SSE channel. Tolerates a missing session manager (e.g. during stdio-only operation).
Cancel an in-flight tool call. Sends {cancel, RequestId} to the worker and {cancelled, RequestId} to the waiter, then drops the entry. Idempotent: a missing entry returns ok.
-spec cleanup_expired(pos_integer()) -> non_neg_integer().
Cleanup sessions older than TTL milliseconds. Routes through the gen_server (the table owner under the new protected visibility); the handler deletes expired entries inline.
Drop an in-flight entry (called by the waiter after a normal completion).
-spec create(Opts) -> {ok, binary()} when Opts :: #{client_info => map(), protocol_version => binary()}.
Create a new session.
-spec delete(binary()) -> ok.
Delete a session.
Deliver a JSON-RPC response from the client back to the waiting caller. Called by the HTTP handler when an inbound POST contains a result or error for a server-initiated id.
-spec elicit_create(binary(), map(), map()) -> {ok, map()} | {error, timeout | not_supported | no_sse | not_found | term()}.
Send elicitation/create to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared elicitation capability in initialize.
-spec events_since(binary(), binary()) -> {ok, [{binary(), map()}]} | truncated | {error, not_found}.
Return SSE events newer than LastId (oldest first), or truncated when LastId is older than the oldest buffered event.
-spec generate_id() -> binary().
Generate a unique session ID.
Get a session by ID.
Read the current log level for a session. Defaults to info before any logging/setLevel is received.
Look up the negotiated protocol version for a session.
Whether a session declared elicitation capability in its initialize request.
Whether a session declared roots capability in its initialize request.
Whether a session declared sampling capability in its initialize request.
-spec list() -> [map()].
List all sessions.
-spec list_elicitation_capable() -> [binary()].
List session ids whose client declared elicitation capability.
-spec list_roots_capable() -> [binary()].
List session ids whose client declared roots capability.
-spec list_sampling_capable() -> [binary()].
List session ids whose client declared sampling capability.
Numeric priority for the eight RFC 5424 levels (debug=0, emergency=7). Higher = more severe. Used for filtering: a notification at priority N is delivered iff N >= configured level.
Push a notifications/progress envelope to a specific session over its SSE channel. Token is the progressToken the client supplied on the originating request.
Record an in-flight tool call so a later notifications/cancelled can find the worker and waiter.
Append an SSE event to the session's ring buffer for later replay via Last-Event-ID.
-spec roots_list(binary(), map()) -> {ok, [map()]} | {error, timeout | not_supported | no_sse | not_found | term()}.
Send roots/list to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared roots capability in initialize.
-spec sampling_create_message(binary(), map(), map()) -> {ok, map(), map()} | {error, timeout | not_supported | no_sse | not_found | term()}.
Send sampling/createMessage to the client behind a session and wait for the response. The session must (a) exist, (b) have an active sse_pid, and (c) have declared sampling capability in initialize.
Set the client_capabilities map for a session. Called from the protocol handler after parsing the initialize request.
Set the per-session log level (driven by logging/setLevel). Level is one of the eight RFC 5424 levels accepted by the MCP spec; rejects anything else with {error, invalid_level}.
Record the negotiated protocol version on a session. Called by the HTTP transport after a successful initialize so later requests on the same session can fall back to it when the client omits the MCP-Protocol-Version header.
-spec set_sse_buffer_max(binary(), pos_integer()) -> ok | {error, not_found}.
Configure the maximum number of SSE events buffered per session for replay.
Set the SSE process pid for a session.
Start the session manager.
Subscribe a session to resource updates for a given URI.
Return all session ids that subscribed to a URI.
-spec update_activity(binary()) -> ok | {error, not_found}.
Update last activity timestamp.