ExMCP.Server.SSESession (ex_mcp v0.9.0)

View Source

Server-side SSE session manager for bidirectional MCP communication.

Manages the lifecycle of SSE sessions where the server needs to send requests to the client (elicitation, sampling) and receive responses.

Architecture

Each MCP session has:

  • A POST endpoint for client→server requests
  • A GET SSE stream for server→client messages (notifications, requests)
  • A pending request tracker for correlating server requests with client responses

Usage

# Initialize session state (call once at server startup)
SSESession.init()

# Register a GET SSE stream for a session
SSESession.register_sse_stream(session_id)

# Send a request to the client and wait for response
{:ok, result} = SSESession.send_request(session_id, "elicitation/create", params)

# Route a client response to the waiting request handler
SSESession.handle_response(session_id, request_id, result)

Summary

Functions

Clean up session state.

Route a client response back to the waiting request handler.

Check if a session has an active SSE stream.

Initialize the session ETS table.

Register the calling process as the SSE stream for a session.

Run the SSE event loop for a GET connection.

Send a JSON-RPC request to the client via the GET SSE stream.

Functions

cleanup(session_id)

@spec cleanup(String.t()) :: :ok

Clean up session state.

handle_response(request_id, result)

@spec handle_response(integer() | String.t(), {:ok, map()} | {:error, map()}) ::
  boolean()

Route a client response back to the waiting request handler.

Called when the server receives a POST with a JSON-RPC response (has id + result/error, no method).

has_sse_stream?(session_id)

@spec has_sse_stream?(String.t()) :: boolean()

Check if a session has an active SSE stream.

init()

@spec init() :: :ok

Initialize the session ETS table.

register_sse_stream(session_id)

@spec register_sse_stream(String.t()) :: :ok

Register the calling process as the SSE stream for a session.

run_sse_loop(conn, session_id, opts \\ [])

@spec run_sse_loop(Plug.Conn.t(), String.t(), keyword()) :: Plug.Conn.t()

Run the SSE event loop for a GET connection.

Forwards {:sse_send, data} messages as SSE events. Returns when :sse_close is received or timeout expires.

send_request(session_id, method, params, opts \\ [])

@spec send_request(String.t(), String.t(), map(), keyword()) ::
  {:ok, map()} | {:error, term()}

Send a JSON-RPC request to the client via the GET SSE stream.

Blocks until the client responds or timeout is reached. Returns {:ok, result} or {:error, reason}.