ClaudeCodeSDK.Streaming.Session (claude_code_sdk v0.2.2)

View Source

GenServer managing a persistent bidirectional streaming session with Claude.

Maintains a long-lived subprocess with stdin/stdout pipes for interactive conversations with character-level streaming support via --include-partial-messages.

Architecture

Session GenServer
  |
  > erlexec subprocess (claude CLI)
      stdin  (send messages)
      stdout (receive streaming events)
      stderr (capture errors)
  |
  > Subscriber Map
       ref1 => {pid1, current_stream_ref}
       ref2 => {pid2, current_stream_ref}
       ...

State

  • subprocess - {erlexec_pid, os_pid} tuple
  • session_id - Claude session ID (extracted from first message)
  • options - ClaudeCodeSDK.Options for configuration
  • subscribers - Map of ref => {subscriber_pid, active: boolean}
  • message_buffer - Incomplete JSON buffer from stdout
  • accumulated_text - Current message text being assembled

Lifecycle

  1. init/1 - Spawn subprocess with streaming flags
  2. :read_output loop - Continuously read from stdout
  3. Parse events via EventParser
  4. Broadcast to subscribers
  5. terminate/2 - Clean shutdown of subprocess

Summary

Functions

Returns a specification to start this module under a supervisor.

Closes the streaming session and terminates the subprocess.

Gets the Claude session ID.

Sends a message to the Claude session and returns a stream of events.

Starts a new streaming session.

Types

subscriber_pid()

@type subscriber_pid() :: pid()

subscriber_ref()

@type subscriber_ref() :: reference()

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

close(session)

@spec close(pid()) :: :ok

Closes the streaming session and terminates the subprocess.

Parameters

  • session - Session PID

Examples

{:ok, session} = Session.start_link()
# ... use session ...
:ok = Session.close(session)

get_session_id(session)

@spec get_session_id(pid()) :: {:ok, String.t()} | {:error, :no_session_id}

Gets the Claude session ID.

The session ID is extracted from the first message_start event.

Returns

  • {:ok, session_id} - Session ID available
  • {:error, :no_session_id} - Session not yet initialized

Examples

{:ok, session} = Session.start_link()
{:ok, session_id} = Session.get_session_id(session)

send_message(session, message)

@spec send_message(pid(), String.t()) :: Enumerable.t()

Sends a message to the Claude session and returns a stream of events.

The returned stream yields events as they arrive from Claude, enabling real-time typewriter effects and incremental UI updates.

Parameters

  • session - Session PID
  • message - Message text to send

Returns

Stream of event maps (see EventParser for event types)

Examples

{:ok, session} = Session.start_link()

# Get streaming response
Session.send_message(session, "Hello")
|> Stream.each(fn
  %{type: :text_delta, text: text} -> IO.write(text)
  %{type: :message_stop} -> IO.puts("")
end)
|> Stream.run()

start_link(options \\ nil)

@spec start_link(ClaudeCodeSDK.Options.t() | nil) :: GenServer.on_start()

Starts a new streaming session.

Spawns a Claude CLI subprocess with streaming flags enabled and begins listening for events.

Parameters

Returns

  • {:ok, pid} - Session started successfully
  • {:error, reason} - Failed to start subprocess

Examples

{:ok, session} = Session.start_link()
{:ok, session} = Session.start_link(%Options{model: "opus", max_turns: 10})