Building Agents with ACPex
View SourceThis guide covers advanced topics for building AI coding agents using ACPex.
Table of Contents
- Overview
- The Agent Behaviour
- Protocol Lifecycle
- Session Management
- Streaming Updates
- Making Client Requests
- Error Handling
- Advanced Patterns
Overview
An ACP agent is an AI coding assistant that runs as a subprocess and communicates with a code editor (the client) over stdin/stdout using JSON-RPC. The agent:
- Receives prompts from the user via the client
- Processes those prompts (e.g., using an LLM)
- Sends back streaming updates and final responses
- Can request file system access and terminal operations from the client
The Agent Behaviour
Every agent must implement the ACPex.Agent
behaviour, which defines callbacks
for handling protocol messages:
defmodule MyAgent do
@behaviour ACPex.Agent
@impl true
def init(args), do: {:ok, %{}}
@impl true
def handle_initialize(request, state), do: {:ok, response, state}
@impl true
def handle_authenticate(request, state), do: {:ok, response, state}
@impl true
def handle_new_session(request, state), do: {:ok, response, state}
@impl true
def handle_load_session(request, state), do: {:error, error, state}
@impl true
def handle_prompt(request, state), do: {:ok, response, state}
@impl true
def handle_cancel(notification, state), do: {:noreply, state}
end
Protocol Lifecycle
1. Initialization
The first message your agent receives will be initialize
:
def handle_initialize(request, state) do
# request.protocol_version - The protocol version (currently 1)
# request.client_info - Information about the client (name, version, etc.)
# request.capabilities - What the client can do
response = %ACPex.Schema.Connection.InitializeResponse{
protocol_version: 1,
agent_capabilities: %{
"sessions" => %{
"new" => true, # Can create new sessions
"load" => false # Cannot load previous sessions
},
"mcp" => nil # No MCP support
},
meta: %{
"name" => "MyAgent",
"version" => "1.0.0",
"description" => "An example agent"
}
}
{:ok, response, state}
end
2. Authentication (Optional)
If your agent requires authentication, advertise it in capabilities and handle
the authenticate
request:
def handle_initialize(request, state) do
response = %ACPex.Schema.Connection.InitializeResponse{
protocol_version: 1,
agent_capabilities: %{...},
authentication_methods: [
%ACPex.Schema.Types.AuthMethod{
id: "api_key",
name: "API Key",
description: "Provide your API key"
}
]
}
{:ok, response, state}
end
def handle_authenticate(request, state) do
# request.method_id - Which auth method was chosen
# request.data - Authentication data (e.g., API key)
case validate_api_key(request.data["api_key"]) do
:ok ->
response = %ACPex.Schema.Connection.AuthenticateResponse{}
{:ok, response, state}
:error ->
{:error, %{code: -32001, message: "Invalid API key"}, state}
end
end
3. Session Creation
After initialization, the client will create a session:
def handle_new_session(request, state) do
# request.cwd - The current working directory from the client
# request.mcp_servers - List of MCP server configurations
# Generate a unique session ID
session_id = "session_#{:erlang.unique_integer([:positive])}"
# Initialize session state
session_state = %{
history: [],
context: %{},
cwd: request.cwd,
mcp_servers: request.mcp_servers
}
# Store session state
new_state = put_in(state, [:sessions, session_id], session_state)
response = %ACPex.Schema.Session.NewResponse{
session_id: session_id
}
{:ok, response, new_state}
end
Session Management
Multiple Sessions
An agent can maintain multiple concurrent sessions:
def init(_args) do
{:ok, %{sessions: %{}}}
end
def handle_prompt(request, state) do
session_id = request.session_id
session_state = get_in(state, [:sessions, session_id])
# Process the prompt with session context
# ...
# Update session state
new_session_state = Map.update!(session_state, :history, &[request | &1])
new_state = put_in(state, [:sessions, session_id], new_session_state)
{:ok, response, new_state}
end
Loading Previous Sessions
If your agent supports persisting sessions:
def handle_initialize(request, state) do
response = %ACPex.Schema.Connection.InitializeResponse{
# ...
agent_capabilities: %{
"sessions" => %{
"new" => true,
"load" => true # Advertise load support
}
}
}
{:ok, response, state}
end
def handle_load_session(request, state) do
# request.session_id - The session to load
case load_from_database(request.session_id) do
{:ok, session_data} ->
new_state = put_in(state, [:sessions, request.session_id], session_data)
response = %{session_id: request.session_id}
{:ok, response, new_state}
{:error, :not_found} ->
{:error, %{code: -32001, message: "Session not found"}, state}
end
end
Streaming Updates
Agents can send real-time updates while processing a prompt:
Message Chunks
Stream the LLM's response as it's generated:
def handle_prompt(request, state) do
# Start streaming
stream_llm_response(request.content, fn chunk ->
send_update(request.session_id, %{
kind: "agent_message_chunk",
content: %{
role: "assistant",
content: [%{text: chunk}]
}
})
end)
# Final response
response = %ACPex.Schema.Session.PromptResponse{
content: [%ACPex.Schema.Types.ContentBlock.Text{text: "Done"}],
stop_reason: "done"
}
{:ok, response, state}
end
defp send_update(session_id, update) do
ACPex.Protocol.Connection.send_notification(
self(),
"session/update",
%ACPex.Schema.Session.UpdateNotification{
session_id: session_id,
update: update
}
)
end
Thoughts
Show the agent's reasoning process:
def handle_prompt(request, state) do
send_update(request.session_id, %{
kind: "agent_thought_chunk",
content: %{
thought: "I need to analyze the code structure first..."
}
})
# Process...
send_update(request.session_id, %{
kind: "agent_thought_chunk",
content: %{
thought: "Found 3 potential issues. Let me fix them one by one."
}
})
# ...
end
Tool Calls
Inform the client about tool usage:
def handle_prompt(request, state) do
tool_call_id = "call_#{:erlang.unique_integer([:positive])}"
# Announce the tool call
send_update(request.session_id, %{
kind: "tool_call",
content: %{
tool_call_id: tool_call_id,
type: "function",
function: %{
name: "read_file",
arguments: Jason.encode!(%{path: "/src/main.ex"})
}
}
})
# Execute the tool (via client request)
{:ok, file_content} = read_file_from_client("/src/main.ex")
# Send tool result
send_update(request.session_id, %{
kind: "tool_call_update",
content: %{
tool_call_id: tool_call_id,
output: file_content
}
})
# Continue processing...
end
Plans
Show a multi-step plan:
def handle_prompt(request, state) do
send_update(request.session_id, %{
kind: "plan",
content: %{
steps: [
"1. Read the current implementation",
"2. Identify the bug",
"3. Propose a fix",
"4. Write tests"
]
}
})
# Execute the plan...
end
Making Client Requests
Agents can request operations from the client:
Reading Files
defp read_file_from_client(path) do
request = %ACPex.Schema.Client.FsReadTextFileRequest{path: path}
case ACPex.Protocol.Connection.send_request(self(), "fs/read_text_file", request) do
{:ok, %{content: content}} ->
{:ok, content}
{:error, error} ->
{:error, error}
end
end
Writing Files
defp write_file_to_client(path, content) do
request = %ACPex.Schema.Client.FsWriteTextFileRequest{
path: path,
content: content
}
case ACPex.Protocol.Connection.send_request(self(), "fs/write_text_file", request) do
{:ok, _response} ->
:ok
{:error, error} ->
{:error, error}
end
end
Terminal Operations
defp run_command(command, args) do
# Create terminal
create_req = %ACPex.Schema.Client.Terminal.CreateRequest{
command: command,
args: args,
env: [%{name: "PATH", value: System.get_env("PATH")}]
}
{:ok, %{terminal_id: terminal_id}} =
ACPex.Protocol.Connection.send_request(self(), "terminal/create", create_req)
# Wait for exit
wait_req = %ACPex.Schema.Client.Terminal.WaitForExitRequest{
terminal_id: terminal_id
}
{:ok, %{exit_status: exit_status}} =
ACPex.Protocol.Connection.send_request(self(), "terminal/wait_for_exit", wait_req)
# Get output
output_req = %ACPex.Schema.Client.Terminal.OutputRequest{
terminal_id: terminal_id
}
{:ok, %{output: output}} =
ACPex.Protocol.Connection.send_request(self(), "terminal/output", output_req)
# Release terminal
release_req = %ACPex.Schema.Client.Terminal.ReleaseRequest{
terminal_id: terminal_id
}
ACPex.Protocol.Connection.send_request(self(), "terminal/release", release_req)
{:ok, output, exit_status}
end
Error Handling
Returning Errors
def handle_prompt(request, state) do
case process_safely(request) do
{:ok, result} ->
response = %ACPex.Schema.Session.PromptResponse{
content: result,
stop_reason: "done"
}
{:ok, response, state}
{:error, reason} ->
# Return an error response
{:error, %{code: -32000, message: "Processing failed: #{reason}"}, state}
end
end
Handling Cancellation
def handle_prompt(request, state) do
# Start a long-running task
task = Task.async(fn ->
process_long_running_request(request)
end)
# Store the task so we can cancel it
new_state = Map.put(state, :current_task, task)
# Wait for result (will be cancelled if cancel notification arrives)
result = Task.await(task, :infinity)
{:ok, result, Map.delete(new_state, :current_task)}
end
def handle_cancel(notification, state) do
# Cancel the current task if one exists
if task = state[:current_task] do
Task.shutdown(task, :brutal_kill)
end
{:noreply, Map.delete(state, :current_task)}
end
Advanced Patterns
Async Processing with GenServer
For complex agents, use a GenServer to manage async work:
defmodule MyAgent.Worker do
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
def process_prompt(pid, request, callback) do
GenServer.cast(pid, {:process, request, callback})
end
@impl true
def init(_args) do
{:ok, %{}}
end
@impl true
def handle_cast({:process, request, callback}, state) do
# Do heavy processing
result = heavy_llm_call(request)
# Notify the agent via callback
callback.(result)
{:noreply, state}
end
end
# In your agent:
def init(_args) do
{:ok, worker} = MyAgent.Worker.start_link([])
{:ok, %{worker: worker}}
end
def handle_prompt(request, state) do
caller = self()
MyAgent.Worker.process_prompt(state.worker, request, fn result ->
# Send result back to connection process
send(caller, {:llm_result, result})
end)
# Wait for result
receive do
{:llm_result, result} ->
response = %ACPex.Schema.Session.PromptResponse{
content: result,
stop_reason: "done"
}
{:ok, response, state}
end
end
Context Management
Track conversation context across multiple prompts:
def handle_prompt(request, state) do
session_id = request.session_id
session = get_session(state, session_id)
# Build context from history
context = build_context(session.history, request)
# Add current prompt to history
new_session = update_in(session, [:history], &[request | &1])
new_state = put_session(state, session_id, new_session)
# Process with full context
response = process_with_context(context)
{:ok, response, new_state}
end
defp build_context(history, current_request) do
# Combine historical context with current request
history
|> Enum.reverse()
|> Enum.map(&extract_text/1)
|> Kernel.++([extract_text(current_request)])
|> Enum.join("\n")
end
Integration with MCP
Many agents will want to use MCP (Model Context Protocol) for tool access:
defmodule MyAgent do
@behaviour ACPex.Agent
def init(_args) do
# Connect to MCP servers
{:ok, mcp_client} = MCPex.Client.start_link(...)
{:ok, %{mcp: mcp_client}}
end
def handle_prompt(request, state) do
# List available MCP tools
{:ok, tools} = MCPex.Client.list_tools(state.mcp)
# Let LLM decide which tools to use
tool_calls = llm_choose_tools(request, tools)
# Execute tools via MCP
results = Enum.map(tool_calls, fn call ->
MCPex.Client.call_tool(state.mcp, call.name, call.arguments)
end)
# Generate final response
response = llm_synthesize(request, results)
{:ok, response, state}
end
end
Testing Your Agent
Unit Tests
defmodule MyAgentTest do
use ExUnit.Case
test "handles initialize" do
{:ok, state} = MyAgent.init([])
request = %ACPex.Schema.Connection.InitializeRequest{
protocol_version: 1,
capabilities: %{}
}
assert {:ok, response, _state} = MyAgent.handle_initialize(request, state)
assert response.protocol_version == 1
assert response.agent_capabilities["sessions"]["new"] == true
end
test "handles prompt" do
{:ok, state} = MyAgent.init([])
# Create a session first
new_req = %ACPex.Schema.Session.NewRequest{
cwd: "/tmp/test",
mcp_servers: []
}
{:ok, new_resp, state} = MyAgent.handle_new_session(new_req, state)
# Send a prompt
prompt_req = %ACPex.Schema.Session.PromptRequest{
session_id: new_resp.session_id,
content: [%ACPex.Schema.Types.ContentBlock.Text{text: "Hello"}]
}
assert {:ok, response, _state} = MyAgent.handle_prompt(prompt_req, state)
assert response.stop_reason == "done"
end
end
Integration Tests
test "full protocol flow" do
{:ok, agent_pid} = ACPex.start_agent(MyAgent, [])
# Send initialize
# Send new session
# Send prompt
# Verify responses
end
Best Practices
- Always validate input - Use Ecto changesets to validate request data
- Handle errors gracefully - Return proper JSON-RPC error responses
- Stream updates - Keep the client informed of progress
- Respect cancellation - Always implement
handle_cancel
properly - Keep state minimal - Store only what's necessary in state
- Use typed schemas - Leverage ACPex's schema system for type safety
- Log appropriately - Use
Logger
for debugging, but avoid excessive logging