Building Clients with ACPex
View SourceThis guide covers building ACP clients (typically code editors or IDE plugins) using ACPex.
Table of Contents
- Overview
- The Client Behaviour
- Connecting to an Agent
- Handling Session Updates
- File System Operations
- Terminal Management
- User Interface Integration
- Advanced Patterns
Overview
An ACP client is typically a code editor or IDE plugin that:
- Spawns an AI agent as a subprocess
- Initializes the connection and negotiates capabilities
- Creates sessions and sends user prompts to the agent
- Handles streaming updates from the agent
- Responds to agent requests for file access and terminal operations
The Client Behaviour
Every client must implement the ACPex.Client
behaviour:
defmodule MyEditor.ACPClient do
@behaviour ACPex.Client
@impl true
def init(args), do: {:ok, %{}}
@impl true
def handle_session_update(notification, state), do: {:noreply, state}
@impl true
def handle_fs_read_text_file(request, state), do: {:ok, response, state}
@impl true
def handle_fs_write_text_file(request, state), do: {:ok, response, state}
@impl true
def handle_terminal_create(request, state), do: {:ok, response, state}
@impl true
def handle_terminal_output(request, state), do: {:ok, response, state}
@impl true
def handle_terminal_wait_for_exit(request, state), do: {:ok, response, state}
@impl true
def handle_terminal_kill(request, state), do: {:ok, response, state}
@impl true
def handle_terminal_release(request, state), do: {:ok, response, state}
end
Connecting to an Agent
Starting a Connection
# Start a client and connect to an agent
{:ok, connection_pid} = ACPex.start_client(
MyEditor.ACPClient,
[], # Init args for your client
agent_path: "/usr/local/bin/my-agent",
agent_args: ["--model", "claude-3-5-sonnet"] # Optional agent-specific args
)
Initialization Flow
The client automatically handles the initialization handshake:
- Client sends
initialize
request with capabilities - Agent responds with its capabilities
- If needed, client sends
authenticate
request - Agent responds with authentication result
def init(_args) do
initial_state = %{
# Client capabilities we'll advertise
capabilities: %{
"fileSystem" => %{
"readTextFile" => true,
"writeTextFile" => true
},
"terminal" => %{
"create" => true,
"output" => true,
"waitForExit" => true,
"kill" => true,
"release" => true
}
},
# Track active terminals
terminals: %{},
# Track active sessions
sessions: %{}
}
{:ok, initial_state}
end
Handling Session Updates
The agent sends real-time updates during prompt processing:
Message Chunks
Display the agent's response as it's being generated:
def handle_session_update(notification, state) do
%{session_id: session_id, update: update} = notification
case update["kind"] do
"agent_message_chunk" ->
# Stream the response to the UI
content = update["content"]
text = get_in(content, ["content", Access.at(0), "text"])
# Update your UI (pseudo-code)
UI.append_to_chat(session_id, text)
{:noreply, state}
_ ->
{:noreply, state}
end
end
Thoughts
Display the agent's reasoning process:
def handle_session_update(notification, state) do
case notification.update["kind"] do
"agent_thought_chunk" ->
thought = notification.update["content"]["thought"]
# Show in UI (e.g., a collapsible "thinking" section)
UI.show_thought(notification.session_id, thought)
{:noreply, state}
_ ->
{:noreply, state}
end
end
Tool Calls
Show what tools the agent is using:
def handle_session_update(notification, state) do
case notification.update["kind"] do
"tool_call" ->
%{
"tool_call_id" => tool_id,
"function" => %{"name" => name, "arguments" => args}
} = notification.update["content"]
# Show in UI
UI.show_tool_call(notification.session_id, "Using #{name}...")
{:noreply, state}
"tool_call_update" ->
%{
"tool_call_id" => tool_id,
"output" => output
} = notification.update["content"]
# Show result
UI.update_tool_call(notification.session_id, tool_id, output)
{:noreply, state}
_ ->
{:noreply, state}
end
end
Plans
Display multi-step plans:
def handle_session_update(notification, state) do
case notification.update["kind"] do
"plan" ->
steps = notification.update["content"]["steps"]
# Show plan in UI
UI.show_plan(notification.session_id, steps)
{:noreply, state}
_ ->
{:noreply, state}
end
end
Complete Update Handler
def handle_session_update(notification, state) do
%{session_id: session_id, update: update} = notification
case update["kind"] do
"user_message_chunk" ->
# Echo of user's message (optional to display)
{:noreply, state}
"agent_message_chunk" ->
content = get_in(update, ["content", "content"])
text = extract_text(content)
UI.append_message(session_id, text)
{:noreply, state}
"agent_thought_chunk" ->
thought = update["content"]["thought"]
UI.show_thought(session_id, thought)
{:noreply, state}
"tool_call" ->
name = get_in(update, ["content", "function", "name"])
UI.show_tool_usage(session_id, name)
{:noreply, state}
"tool_call_update" ->
output = update["content"]["output"]
UI.show_tool_output(session_id, output)
{:noreply, state}
"plan" ->
steps = update["content"]["steps"]
UI.show_plan(session_id, steps)
{:noreply, state}
_ ->
# Unknown update type - log but don't crash
require Logger
Logger.warning("Unknown update kind: #{update["kind"]}")
{:noreply, state}
end
end
defp extract_text(content) when is_list(content) do
content
|> Enum.filter(&match?(%{"type" => "text"}, &1))
|> Enum.map(& &1["text"])
|> Enum.join("")
end
File System Operations
Reading Files
def handle_fs_read_text_file(request, state) do
%{path: path} = request
case File.read(expand_path(path, state)) do
{:ok, content} ->
response = %ACPex.Schema.Client.FsReadTextFileResponse{
content: content
}
{:ok, response, state}
{:error, :enoent} ->
{:error, %{code: -32001, message: "File not found: #{path}"}, state}
{:error, :eacces} ->
{:error, %{code: -32002, message: "Permission denied: #{path}"}, state}
{:error, reason} ->
{:error, %{code: -32000, message: "Error reading file: #{reason}"}, state}
end
end
defp expand_path(path, state) do
# Resolve relative to workspace root
Path.join(state.workspace_root, path)
end
Writing Files
def handle_fs_write_text_file(request, state) do
%{path: path, content: content} = request
full_path = expand_path(path, state)
# Optional: Ask user for permission
case ask_user_permission("Write to #{path}?") do
:allow ->
case File.write(full_path, content) do
:ok ->
# Optional: Refresh file in editor
UI.refresh_file(path)
response = %ACPex.Schema.Client.FsWriteTextFileResponse{}
{:ok, response, state}
{:error, reason} ->
{:error, %{code: -32000, message: "Error writing file: #{reason}"}, state}
end
:deny ->
{:error, %{code: -32003, message: "User denied permission"}, state}
end
end
Security Considerations
def handle_fs_read_text_file(request, state) do
%{path: path} = request
# Validate path is within workspace
if path_allowed?(path, state) do
# ... proceed with read
else
{:error, %{code: -32004, message: "Path outside workspace"}, state}
end
end
defp path_allowed?(path, state) do
full_path = Path.expand(path, state.workspace_root)
workspace = Path.expand(state.workspace_root)
String.starts_with?(full_path, workspace)
end
Terminal Management
Creating Terminals
def handle_terminal_create(request, state) do
%{command: command, args: args, env: env_vars} = request
# Generate terminal ID
terminal_id = "term_#{:erlang.unique_integer([:positive])}"
# Build environment
env = build_env(env_vars, state)
# Spawn the command
port = Port.open(
{:spawn_executable, command},
[
{:args, args},
{:env, env},
:binary,
:exit_status,
{:line, 4096},
:use_stdio,
:hide
]
)
# Track the terminal
terminal = %{
id: terminal_id,
port: port,
command: command,
output: [],
exit_status: nil
}
new_state = put_in(state, [:terminals, terminal_id], terminal)
response = %ACPex.Schema.Client.Terminal.CreateResponse{
terminal_id: terminal_id
}
{:ok, response, new_state}
end
defp build_env(env_vars, state) do
# Convert from schema format to Port format
Enum.map(env_vars, fn %{name: name, value: value} ->
{String.to_charlist(name), String.to_charlist(value)}
end)
end
Handling Terminal Output
def handle_terminal_output(request, state) do
%{terminal_id: terminal_id} = request
case get_in(state, [:terminals, terminal_id]) do
nil ->
{:error, %{code: -32001, message: "Terminal not found"}, state}
terminal ->
# Collect all output so far
output = Enum.join(terminal.output, "")
response = %ACPex.Schema.Client.Terminal.OutputResponse{
output: output
}
{:ok, response, state}
end
end
# Handle port messages in a separate process or GenServer
def handle_info({port, {:data, {:eol, line}}}, state) do
# Find terminal by port
{terminal_id, terminal} =
Enum.find(state.terminals, fn {_id, t} -> t.port == port end)
# Append output
new_terminal = update_in(terminal, [:output], &[&1 | [line <> "\n"]])
new_state = put_in(state, [:terminals, terminal_id], new_terminal)
{:noreply, new_state}
end
def handle_info({port, {:exit_status, code}}, state) do
# Find terminal and update exit status
{terminal_id, terminal} =
Enum.find(state.terminals, fn {_id, t} -> t.port == port end)
new_terminal = put_in(terminal, [:exit_status], code)
new_state = put_in(state, [:terminals, terminal_id], new_terminal)
{:noreply, new_state}
end
Waiting for Terminal Exit
def handle_terminal_wait_for_exit(request, state) do
%{terminal_id: terminal_id} = request
case get_in(state, [:terminals, terminal_id]) do
nil ->
{:error, %{code: -32001, message: "Terminal not found"}, state}
terminal ->
# If already exited, return immediately
if terminal.exit_status do
response = %ACPex.Schema.Client.Terminal.WaitForExitResponse{
exit_status: %{code: terminal.exit_status}
}
{:ok, response, state}
else
# Wait for exit (this is blocking - see advanced patterns for async)
wait_for_exit(terminal.port)
# Port will send {:exit_status, code} message
# This gets handled in handle_info above
receive do
{^port, {:exit_status, code}} ->
response = %ACPex.Schema.Client.Terminal.WaitForExitResponse{
exit_status: %{code: code}
}
{:ok, response, state}
after
30_000 ->
{:error, %{code: -32002, message: "Wait timeout"}, state}
end
end
end
end
Killing and Releasing Terminals
def handle_terminal_kill(request, state) do
%{terminal_id: terminal_id} = request
case get_in(state, [:terminals, terminal_id]) do
nil ->
{:error, %{code: -32001, message: "Terminal not found"}, state}
terminal ->
# Kill the port
Port.close(terminal.port)
response = %ACPex.Schema.Client.Terminal.KillResponse{}
{:ok, response, state}
end
end
def handle_terminal_release(request, state) do
%{terminal_id: terminal_id} = request
# Remove terminal from tracking
new_state = update_in(state, [:terminals], &Map.delete(&1, terminal_id))
response = %ACPex.Schema.Client.Terminal.ReleaseResponse{}
{:ok, response, new_state}
end
User Interface Integration
Phoenix LiveView Example
defmodule MyEditorWeb.ChatLive do
use Phoenix.LiveView
def mount(_params, _session, socket) do
# Start ACP client
{:ok, client_pid} = ACPex.start_client(
MyEditor.ACPClient,
[ui_pid: self()], # Pass LiveView PID to client
agent_path: "/usr/local/bin/agent"
)
{:ok, assign(socket, client_pid: client_pid, messages: [])}
end
def handle_event("send_message", %{"text" => text}, socket) do
# Send to agent (via client)
send_prompt(socket.assigns.client_pid, text)
{:noreply, socket}
end
# Receive updates from client
def handle_info({:agent_update, update}, socket) do
new_messages = socket.assigns.messages ++ [update]
{:noreply, assign(socket, messages: new_messages)}
end
end
# In your client:
def init([ui_pid: ui_pid]) do
{:ok, %{ui_pid: ui_pid}}
end
def handle_session_update(notification, state) do
# Forward to LiveView
send(state.ui_pid, {:agent_update, notification.update})
{:noreply, state}
end
Desktop App Example (with Scenic)
defmodule MyEditor.Scene.Chat do
use Scenic.Scene
def init(scene, _params, _opts) do
# Start ACP client
{:ok, client_pid} = ACPex.start_client(
MyEditor.ACPClient,
[scene_pid: self()],
agent_path: "/usr/local/bin/agent"
)
graph = build_graph([])
{:ok, assign(scene, client_pid: client_pid, messages: [], graph: graph)}
end
def handle_event({:send_message, text}, _from, scene) do
send_prompt(scene.assigns.client_pid, text)
{:noreply, scene}
end
def handle_info({:agent_update, update}, scene) do
# Update graph with new message
new_messages = scene.assigns.messages ++ [update]
new_graph = build_graph(new_messages)
{:noreply, assign(scene, messages: new_messages, graph: new_graph)}
end
end
Advanced Patterns
Async Terminal Operations
Don't block the client while waiting for terminal exit:
def handle_terminal_wait_for_exit(request, state) do
%{terminal_id: terminal_id} = request
terminal = get_in(state, [:terminals, terminal_id])
if terminal.exit_status do
# Already exited
response = %ACPex.Schema.Client.Terminal.WaitForExitResponse{
exit_status: %{code: terminal.exit_status}
}
{:ok, response, state}
else
# Spawn a task to wait
caller = self()
Task.start(fn ->
wait_for_terminal_exit(terminal.port, caller, terminal_id)
end)
# Return immediately - response will be sent later
{:pending, state}
end
end
defp wait_for_terminal_exit(port, caller, terminal_id) do
receive do
{^port, {:exit_status, code}} ->
# Send response to connection process
ACPex.Protocol.Connection.send_response(
caller,
%ACPex.Schema.Client.Terminal.WaitForExitResponse{
exit_status: %{code: code}
}
)
end
end
Request Queuing
Handle multiple file operations efficiently:
def init(_args) do
{:ok, %{
file_queue: :queue.new(),
file_queue_worker: spawn_link(&file_queue_worker/0)
}}
end
def handle_fs_read_text_file(request, state) do
# Add to queue instead of processing immediately
new_queue = :queue.in({:read, request, self()}, state.file_queue)
# Notify worker
send(state.file_queue_worker, :process_queue)
{:pending, %{state | file_queue: new_queue}}
end
defp file_queue_worker() do
receive do
:process_queue ->
# Process queued file operations
# ...
file_queue_worker()
end
end
Permission System
Implement a comprehensive permission system:
def handle_fs_write_text_file(request, state) do
case check_permission(:write, request.path, state) do
{:allowed, reason} ->
# Proceed with write
do_write_file(request, state)
{:denied, reason} ->
{:error, %{code: -32003, message: "Permission denied: #{reason}"}, state}
{:ask_user, reason} ->
case UI.ask_permission("Allow write to #{request.path}?", reason) do
:allow ->
# Remember this decision
new_state = grant_permission(:write, request.path, state)
do_write_file(request, new_state)
:deny ->
{:error, %{code: -32003, message: "User denied permission"}, state}
end
end
end
defp check_permission(action, path, state) do
cond do
# Check if permanently allowed
Map.has_key?(state.permissions, {action, path}) ->
{:allowed, "previously granted"}
# Check if in safe directory
safe_path?(path, state) ->
{:allowed, "safe directory"}
# Ask user
true ->
{:ask_user, "outside safe directories"}
end
end
Testing Your Client
Unit Tests
defmodule MyEditor.ACPClientTest do
use ExUnit.Case
test "reads files" do
{:ok, state} = MyEditor.ACPClient.init([])
request = %ACPex.Schema.Client.FsReadTextFileRequest{
path: "test.txt"
}
# Mock file system
with_mock File, [read: fn _ -> {:ok, "content"} end] do
assert {:ok, response, _state} =
MyEditor.ACPClient.handle_fs_read_text_file(request, state)
assert response.content == "content"
end
end
end
Integration Tests
test "end-to-end agent interaction" do
# Start a mock agent
{:ok, agent} = start_mock_agent()
# Start client
{:ok, client} = ACPex.start_client(
MyEditor.ACPClient,
[],
agent_path: agent.path
)
# Send prompt
# Verify responses
end
Best Practices
- Always validate agent requests - Don't trust the agent blindly
- Implement permission systems - Ask users before file writes or terminal commands
- Handle updates efficiently - Batch UI updates to avoid flickering
- Provide good UX - Show progress, thoughts, and tool usage
- Handle errors gracefully - Show user-friendly error messages
- Sandbox operations - Restrict file/terminal access to workspace
- Log security events - Track what the agent requested and what was allowed