LangChain.WebSocket (LangChain v0.8.4)

Copy Markdown View Source

A generic WebSocket client GenServer built on Mint.WebSocket.

Provides a persistent WebSocket connection that can send text frames and collect responses. This module is provider-agnostic -- it handles connection lifecycle, frame encoding/decoding, and ping/pong, but has no knowledge of any specific API protocol.

Usage

{:ok, ws} = LangChain.WebSocket.start_link(
  url: "wss://api.openai.com/v1/responses",
  headers: [{"authorization", "Bearer sk-..."}]
)

# Send a request and collect events until done_fn returns true
done_fn = fn event -> event["type"] == "response.completed" end
{:ok, events} = LangChain.WebSocket.send_and_collect(ws, payload, done_fn)

# When finished
LangChain.WebSocket.close(ws)
  • :url (required) -- WebSocket URL (e.g. "wss://example.com/ws")
  • :headers -- additional HTTP headers for the upgrade request (default: [])
  • :receive_timeout -- timeout in ms for receiving responses (default: 60_000)
  • :connect_timeout -- timeout in ms for initial connection (default: 10_000)

Lifecycle Management

The application is responsible for managing the WebSocket lifecycle.

start_link/1 links the WebSocket process to the caller and connects immediately during init/1. The connection stays open until explicitly closed with close/1, the linked process exits, or the server disconnects.

There is no built-in supervisor, reconnection logic, or health monitoring. The underlying mint_web_socket library is intentionally low-level and leaves these concerns to the application.

Key things to be aware of:

  • Process linking: The WebSocket is linked to the process that calls start_link/1. If that process exits, the WebSocket is terminated.
  • No reconnection: If the server closes the connection or the network drops, the GenServer transitions to :disconnected status. Subsequent send_and_collect/4 or send_and_stream/5 calls will return {:error, :not_connected}. The application must detect this and start a new WebSocket.
  • No retry logic: Failed sends are not retried. The application should implement retry or fallback behavior as needed.
  • Not serializable: The WebSocket is identified by its PID. If the PID is stored in a struct that gets serialized (e.g. to a database), it will be stale when restored.
  • Server-side timeouts: Remote servers may close idle connections at any time. Use connected?/1 to check status before sending.

For higher-level usage with ChatOpenAIResponses, see ChatOpenAIResponses.connect_websocket!/1 which wraps start_link/1 with the correct URL and headers.

Summary

Functions

Returns a specification to start this module under a supervisor.

Close the WebSocket connection and stop the GenServer.

Check if the WebSocket connection is alive and connected.

Send a text frame and collect all decoded JSON events until done_fn returns true.

Send a text frame and stream each decoded JSON event to callback_fn until done_fn returns true.

Start a WebSocket connection.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

close(pid)

@spec close(GenServer.server()) :: :ok

Close the WebSocket connection and stop the GenServer.

connected?(pid)

@spec connected?(GenServer.server()) :: boolean()

Check if the WebSocket connection is alive and connected.

send_and_collect(pid, payload, done_fn, opts \\ [])

@spec send_and_collect(GenServer.server(), binary(), (map() -> boolean()), keyword()) ::
  {:ok, [map()]} | {:error, term()}

Send a text frame and collect all decoded JSON events until done_fn returns true.

Returns {:ok, [decoded_events]} on success.

Options

  • :timeout — GenServer call timeout in ms (default: the configured receive_timeout)

send_and_stream(pid, payload, callback_fn, done_fn, opts \\ [])

@spec send_and_stream(
  GenServer.server(),
  binary(),
  (map() -> term()),
  (map() -> boolean()),
  keyword()
) ::
  {:ok, [term()]} | {:error, term()}

Send a text frame and stream each decoded JSON event to callback_fn until done_fn returns true.

Returns {:ok, [callback_results]} with the return values from each callback_fn invocation.

Options

  • :timeout — GenServer call timeout in ms (default: the configured receive_timeout)

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a WebSocket connection.

Options

  • :url (required) — WebSocket URL
  • :headers — HTTP headers for the upgrade request (default: [])
  • :receive_timeout — timeout for collecting responses in ms (default: 60_000)
  • :connect_timeout — timeout for initial connection in ms (default: 10_000)

Any other options are passed through to GenServer.start_link/3 (e.g. :name).