Anubis.Server.Transport.StreamableHTTP (anubis_mcp v1.6.1)

Copy Markdown

StreamableHTTP transport implementation for MCP servers.

This module manages SSE (Server-Sent Events) connections for server-to-client communication. In the refactored architecture, request handling is done directly by Session processes - this module only manages SSE handlers and notifications.

Features

  • SSE handler registration for server-to-client push
  • Automatic handler cleanup on disconnect
  • Keepalive messages to maintain connections
  • Notification broadcasting to connected clients

Usage

StreamableHTTP is typically started through the server supervisor:

Anubis.Server.start_link(MyServer, [],
  transport: :streamable_http,
  streamable_http: [port: 4000]
)

For integration with existing Phoenix/Plug applications:

# In your router
forward "/mcp", Anubis.Server.Transport.StreamableHTTP.Plug,
  server: MyApp.MCPServer

Summary

Types

StreamableHTTP transport options

t()

Functions

Returns a specification to start this module under a supervisor.

Returns the SSE handler pid for a session, or nil if none is connected.

Registers the calling process as the SSE handler for a session.

Routes a message to a specific session's SSE handler for server-to-client push.

Unregisters the SSE handler for a session. Called when the SSE connection closes.

Types

option()

@type option() ::
  {:server, GenServer.server()} | {:name, GenServer.name()} | GenServer.option()

StreamableHTTP transport options

  • :server - The server module (required)
  • :name - Name for registering the GenServer (required)

t()

@type t() :: GenServer.server()

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_schema(atom)

get_sse_handler(transport, session_id)

@spec get_sse_handler(GenServer.server(), String.t()) :: pid() | nil

Returns the SSE handler pid for a session, or nil if none is connected.

parse_options(data)

parse_options!(data)

register_sse_handler(transport, session_id)

@spec register_sse_handler(GenServer.server(), String.t()) :: :ok | {:error, term()}

Registers the calling process as the SSE handler for a session.

Called by the Plug when establishing an SSE connection.

route_to_session(transport, session_id, message)

@spec route_to_session(GenServer.server(), String.t(), binary()) ::
  :ok | {:error, term()}

Routes a message to a specific session's SSE handler for server-to-client push.

unregister_sse_handler(transport, session_id, expected_pid \\ nil)

@spec unregister_sse_handler(GenServer.server(), String.t(), pid() | nil) :: :ok

Unregisters the SSE handler for a session. Called when the SSE connection closes.