Hermes.Server.Transport.StreamableHTTP (hermes_mcp v0.10.0)

StreamableHTTP transport implementation for MCP servers.

This module provides an HTTP-based transport layer that supports multiple concurrent client sessions through Server-Sent Events (SSE). It enables web-based MCP clients to communicate with the server using standard HTTP protocols.

Features

  • Multiple concurrent client sessions
  • Server-Sent Events for real-time server-to-client communication
  • HTTP POST endpoint for client-to-server messages
  • Automatic session cleanup on disconnect
  • Integration with Phoenix/Plug applications

Usage

StreamableHTTP is typically started through the server supervisor:

Hermes.Server.start_link(MyServer, [],
  transport: :streamable_http,
  streamable_http: [port: 4000]
)

For integration with existing Phoenix/Plug applications:

# In your router
forward "/mcp", Hermes.Server.Transport.StreamableHTTP.Plug,
  server: MyApp.MCPServer

Message Flow

  1. Client connects to /sse endpoint, receives a session ID
  2. Client sends messages via POST to /messages with session ID header
  3. Server responses are pushed through the SSE connection
  4. Connection closes on client disconnect or server shutdown

Configuration

  • :port - HTTP server port (default: 4000)
  • :server - The MCP server process to connect to
  • :name - Process registration name

Summary

Types

StreamableHTTP transport options

t()

Functions

Returns a specification to start this module under a supervisor.

Gets the SSE handler process for a session.

Handles an incoming message from a client with request context.

Handles an incoming message with context and returns {:sse, response} if SSE handler exists.

Registers an SSE handler process for a session.

Routes a message to a specific session's SSE handler.

Sends a message to the client via the active SSE connection.

Shuts down the transport connection.

Starts the StreamableHTTP transport.

Unregisters an SSE handler process for a session.

Types

option()

@type option() ::
  {:server, GenServer.server()} | {:name, GenServer.name()} | GenServer.option()

StreamableHTTP transport options

  • :server - The server process (required)
  • :name - Name for registering the GenServer (required)

t()

@type t() :: GenServer.server()

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_schema(atom)

get_sse_handler(transport, session_id)

@spec get_sse_handler(GenServer.server(), String.t()) :: pid() | nil

Gets the SSE handler process for a session.

Returns the pid of the process handling SSE for this session, or nil if no SSE connection exists.

handle_message(transport, session_id, message, context)

@spec handle_message(GenServer.server(), String.t(), map() | [map()], map()) ::
  {:ok, binary() | nil} | {:error, term()}

Handles an incoming message from a client with request context.

Called by the Plug when a message is received via HTTP POST.

handle_message_for_sse(transport, session_id, message, context)

@spec handle_message_for_sse(GenServer.server(), String.t(), map(), map()) ::
  {:ok, binary()} | {:sse, binary()} | {:error, term()}

Handles an incoming message with context and returns {:sse, response} if SSE handler exists.

This allows the Plug to know whether to stream the response via SSE or return it as a regular HTTP response.

parse_options(data)

parse_options!(data)

register_sse_handler(transport, session_id)

@spec register_sse_handler(GenServer.server(), String.t()) :: :ok | {:error, term()}

Registers an SSE handler process for a session.

Called by the Plug when establishing an SSE connection. The calling process becomes the SSE handler for the session.

route_to_session(transport, session_id, message)

@spec route_to_session(GenServer.server(), String.t(), binary()) ::
  :ok | {:error, term()}

Routes a message to a specific session's SSE handler.

Used for targeted server notifications to specific clients.

send_message(transport, message)

@spec send_message(GenServer.server(), binary()) :: :ok | {:error, term()}

Sends a message to the client via the active SSE connection.

This function is used for server-initiated notifications. It will broadcast to all active SSE connections.

Parameters

  • transport - The transport process
  • message - The message to send

Returns

  • :ok if message was sent successfully
  • {:error, reason} otherwise

shutdown(transport)

@spec shutdown(GenServer.server()) :: :ok

Shuts down the transport connection.

This terminates all active sessions managed by this transport.

Parameters

  • transport - The transport process

start_link(opts)

@spec start_link(Enumerable.t(option())) :: GenServer.on_start()

Starts the StreamableHTTP transport.

unregister_sse_handler(transport, session_id)

@spec unregister_sse_handler(GenServer.server(), String.t()) :: :ok

Unregisters an SSE handler process for a session.

Called when the SSE connection is closed.