A step-by-step guide for integrating CommBus into your Elixir application to add intelligent context management for LLM interactions.
Prerequisites
- Elixir ~> 1.17
- Mix project (Phoenix, Nerves, or standalone Elixir app)
- For Ecto adapter: Ecto ~> 3.11
- Basic familiarity with LLM prompt engineering
Installation Steps
1. Add Dependency
Add CommBus to your mix.exs dependencies:
def deps do
[
{:comm_bus, "~> 0.1.0"}
]
endThen fetch dependencies:
mix deps.get
mix compile
2. Choose Storage Strategy
CommBus supports multiple storage adapters depending on your needs.
Option A: In-Memory (Development/Testing)
Best for: Development, testing, proof-of-concept
# config/dev.exs
config :comm_bus,
storage: CommBus.Storage.InMemoryPros: Zero setup, fast, no database required Cons: Data lost on restart, not suitable for production
Option B: Ecto Adapter (Production)
Best for: Production applications with existing Ecto setup
# config/prod.exs
config :comm_bus,
storage: CommBus.Storage.EctoAdapter,
repo: MyApp.RepoMigration Setup:
mix ecto.gen.migration add_comm_bus_tables
Edit the generated migration file:
defmodule MyApp.Repo.Migrations.AddCommBusTables do
use Ecto.Migration
def change do
create table(:comm_bus_entries, primary_key: false) do
add :id, :string, primary_key: true
add :content, :text, null: false
add :keywords, {:array, :string}, default: []
add :section, :string, null: false
add :mode, :string, null: false
add :priority, :integer, default: 50
add :weight, :float, default: 1.0
add :token_count, :integer
add :enabled, :boolean, default: true
add :tags, {:array, :string}, default: []
add :metadata, :map, default: %{}
timestamps()
end
create table(:comm_bus_conversations, primary_key: false) do
add :id, :string, primary_key: true
add :depth, :integer, default: 0
add :metadata, :map, default: %{}
timestamps()
end
create table(:comm_bus_messages) do
add :conversation_id, references(:comm_bus_conversations, type: :string, on_delete: :delete_all)
add :role, :string, null: false
add :content, :text, null: false
add :token_count, :integer
add :metadata, :map, default: %{}
add :position, :integer, null: false
timestamps()
end
create index(:comm_bus_entries, [:enabled])
create index(:comm_bus_entries, [:section])
create index(:comm_bus_entries, [:mode])
create index(:comm_bus_entries, [:tags], using: :gin)
create index(:comm_bus_messages, [:conversation_id])
create index(:comm_bus_messages, [:conversation_id, :position])
end
endRun migration:
mix ecto.migrate
Pros: Production-ready, persistent, scales with your database Cons: Requires Ecto setup and migrations
Option C: DevMan Adapter (SQLite)
Best for: Integration with DevMan workflow tool
# config/config.exs
config :comm_bus,
storage: CommBus.Storage.Devman,
db_path: Path.expand("~/.devman/devman.db")Requires DevMan SQLite schema already set up.
Option D: HuMan Adapter (PostgreSQL)
Best for: Integration with HuMan reasoning infrastructure
# config/config.exs
config :comm_bus,
storage: CommBus.Storage.Human,
repo: HuMan.Repo
# HuMan repo configuration
config :human, HuMan.Repo,
database: "human_dev",
hostname: "localhost",
pool_size: 10Requires HuMan PostgreSQL schema already set up.
3. Configure Template Engine
CommBus supports two Mustache engines:
# config/config.exs
config :comm_bus,
# Option 1: BbMustache (default, faster, Erlang-based)
template_engine: CommBus.Template.Engine.BbMustache,
# Option 2: ExMustache (pure Elixir)
# template_engine: CommBus.Template.Engine.ExMustache,
# Prompt directory
prompt_root: Path.expand("config/comm_bus/prompts", File.cwd!()),
# Methodology directory
methodology_root: Path.expand("config/comm_bus/methodologies", File.cwd!())4. Set Up Directory Structure
Create directories for prompts and methodologies:
mkdir -p config/comm_bus/prompts
mkdir -p config/comm_bus/methodologies
Example prompt file (config/comm_bus/prompts/greeting.md):
---
name: greeting
description: Friendly greeting template
variables:
- name
- role
---
Hello {{name}}! I'm here to help you with {{role}}.Example methodology file (config/comm_bus/methodologies/custom.yml):
name: "Custom Workflow"
description: "Context entries for custom workflow"
slug: "custom"
tags: ["workflow", "custom"]
entries:
- id: "step-1"
content: "Step 1 instructions..."
keywords: ["start", "begin"]
section: pre_history
mode: triggered
priority: 805. Configure Tokenizer
CommBus uses a pluggable tokenizer. The default is Simple (heuristic-based):
# config/config.exs
config :comm_bus,
tokenizer: CommBus.Tokenizer.SimpleCustom Tokenizer (optional):
If you want accurate token counting for specific models, implement the CommBus.Tokenizer behaviour:
defmodule MyApp.GPT4Tokenizer do
@behaviour CommBus.Tokenizer
def count_tokens(text) do
# Use tiktoken_elixir or similar
MyApp.Tiktoken.count(text, model: "gpt-4")
end
end
# Configure
config :comm_bus, tokenizer: MyApp.GPT4TokenizerUsage Patterns
Pattern 1: Basic Assembly in Controllers/Services
Phoenix Controller Example:
defmodule MyAppWeb.ChatController do
use MyAppWeb, :controller
alias CommBus.{Assembler, Conversation, Entry, Message}
def create(conn, %{"message" => user_message, "session_id" => session_id}) do
# 1. Get or create conversation
conversation = get_or_create_conversation(session_id, user_message)
# 2. Load context entries
{:ok, entries} = CommBus.Storage.list_entries(filters: [enabled: true])
# 3. Assemble prompt
packet = Assembler.assemble_prompt(
conversation,
entries,
budget: %{total: 4000}
)
# 4. Send to LLM (via llm_core or direct API)
{:ok, response} = call_llm(packet.messages)
# 5. Save assistant response
updated_conversation = append_message(conversation, :assistant, response)
CommBus.Storage.upsert_conversation(updated_conversation)
json(conn, %{response: response})
end
defp get_or_create_conversation(session_id, user_message) do
case CommBus.Storage.get_conversation(session_id) do
{:ok, conv} ->
%{conv | messages: conv.messages ++ [
%Message{role: :user, content: user_message}
]}
{:error, :not_found} ->
%Conversation{
id: session_id,
messages: [%Message{role: :user, content: user_message}]
}
end
end
endPattern 2: Background Jobs with Oban
Process long conversations asynchronously:
defmodule MyApp.Workers.ConversationProcessor do
use Oban.Worker, queue: :llm, max_attempts: 3
alias CommBus.{Assembler, Methodologies}
@impl Oban.Worker
def perform(%Oban.Job{args: %{"conversation_id" => conv_id, "methodology" => methodology}}) do
# 1. Load conversation from storage
{:ok, conversation} = CommBus.Storage.get_conversation(conv_id)
# 2. Load methodology entries
method_entries = Methodologies.entries_for(methodology)
# 3. Load stored entries
{:ok, stored_entries} = CommBus.Storage.list_entries(filters: [enabled: true])
# 4. Combine entries
all_entries = method_entries ++ stored_entries
# 5. Assemble with generous budget
packet = Assembler.assemble_prompt(
conversation,
all_entries,
budget: %{total: 8000}
)
# 6. Process with LLM
{:ok, response} = MyApp.LLM.complete(packet.messages)
# 7. Save result
updated = %{conversation | messages: conversation.messages ++ [
%Message{role: :assistant, content: response}
]}
CommBus.Storage.upsert_conversation(updated)
:ok
end
end
# Enqueue job
MyApp.Workers.ConversationProcessor.new(%{
"conversation_id" => "conv-123",
"methodology" => "bug_triage"
})
|> Oban.insert()Pattern 3: Real-time Streaming with Phoenix LiveView
Stream LLM responses with context assembly:
defmodule MyAppWeb.ChatLive do
use MyAppWeb, :live_view
alias CommBus.{Assembler, Conversation, Message}
def mount(%{"session_id" => session_id}, _session, socket) do
{:ok, conversation} = CommBus.Storage.get_conversation(session_id)
socket =
socket
|> assign(:conversation, conversation)
|> assign(:streaming, false)
{:ok, socket}
end
def handle_event("send_message", %{"message" => content}, socket) do
# Add user message
conversation = socket.assigns.conversation
updated = %{conversation |
messages: conversation.messages ++ [
%Message{role: :user, content: content}
]
}
# Assemble context
{:ok, entries} = CommBus.Storage.list_entries(filters: [enabled: true])
packet = Assembler.assemble_prompt(updated, entries)
# Start streaming
{:ok, stream_pid} = MyApp.LLM.stream(packet.messages, self())
socket =
socket
|> assign(:conversation, updated)
|> assign(:streaming, true)
|> assign(:stream_pid, stream_pid)
{:noreply, socket}
end
def handle_info({:stream_chunk, chunk}, socket) do
# Append chunk to UI
{:noreply, push_event(socket, "chunk", %{data: chunk})}
end
def handle_info(:stream_complete, socket) do
{:noreply, assign(socket, :streaming, false)}
end
endPattern 4: Dynamic Entry Management
Update context entries at runtime:
defmodule MyApp.ContextManager do
alias CommBus.{Entry, Storage}
def add_project_context(project_id) do
project = MyApp.Projects.get!(project_id)
entry = %Entry{
id: "project-#{project_id}",
content: """
Project: #{project.name}
Tech Stack: #{Enum.join(project.technologies, ", ")}
Status: #{project.status}
""",
keywords: [project.name, "project", "codebase"],
section: :pre_history,
mode: :triggered,
priority: 70,
enabled: true,
tags: ["project", "context"]
}
Storage.insert_entry(entry)
end
def disable_outdated_entries do
{:ok, entries} = Storage.list_entries(filters: [enabled: true])
entries
|> Enum.filter(&outdated?/1)
|> Enum.each(fn entry ->
Storage.update_entry(entry.id, enabled: false)
end)
end
defp outdated?(entry) do
# Custom logic to determine if entry is outdated
case entry.metadata do
%{"expires_at" => expires_at} ->
DateTime.compare(DateTime.utc_now(), expires_at) == :gt
_ ->
false
end
end
endTesting
Test with In-Memory Storage
# test/test_helper.exs
Application.put_env(:comm_bus, :storage, CommBus.Storage.InMemory)
# test/my_app/chat_test.exs
defmodule MyApp.ChatTest do
use MyApp.DataCase
alias CommBus.{Assembler, Conversation, Entry, Message}
setup do
# Clear in-memory storage between tests
:ets.delete_all_objects(:comm_bus_entries)
:ets.delete_all_objects(:comm_bus_conversations)
:ok
end
test "assembles context for bug report" do
conversation = %Conversation{
messages: [
%Message{role: :user, content: "Bug in auth system"}
]
}
entries = [
%Entry{
id: "auth-docs",
keywords: ["auth*", "login"],
section: :pre_history,
content: "Auth system documentation...",
mode: :triggered
}
]
result = Assembler.assemble_prompt(conversation, entries)
assert length(result.included_entries) == 1
assert hd(result.included_entries).id == "auth-docs"
end
endProduction Considerations
Token Budget Tuning
Adjust budgets based on your LLM provider and model:
# For GPT-4 (8K context)
budget = %{
total: 6000, # Leave room for response
sections: %{
system: 500,
pre_history: 2000,
history: 2500,
post_history: 1000
}
}
# For Claude 3 Opus (200K context)
budget = %{
total: 150_000,
sections: %{
system: 1000,
pre_history: 50_000,
history: 80_000,
post_history: 19_000
}
}Prompt Override Storage
For runtime prompt overrides without redeploying:
defmodule MyApp.PromptOverrideStore do
@behaviour CommBus.Prompts.OverrideStore
def get_override(prompt_name) do
case MyApp.Repo.get_by(PromptOverride, name: prompt_name) do
%{content: content} -> {:ok, content}
nil -> :not_found
end
end
def set_override(prompt_name, content) do
%PromptOverride{name: prompt_name, content: content}
|> MyApp.Repo.insert!(
on_conflict: {:replace, [:content, :updated_at]},
conflict_target: :name
)
:ok
end
end
# Configure
config :comm_bus,
prompt_override_store: MyApp.PromptOverrideStoreTelemetry Monitoring
Monitor assembly performance:
:telemetry.attach_many(
"comm-bus-handler",
[
[:comm_bus, :assembly, :start],
[:comm_bus, :assembly, :stop],
[:comm_bus, :assembly, :exception]
],
&MyApp.Telemetry.handle_event/4,
nil
)
defmodule MyApp.Telemetry do
require Logger
def handle_event([:comm_bus, :assembly, :start], _measurements, metadata, _config) do
Logger.debug("Assembly started for conversation #{metadata.conversation_id}")
end
def handle_event([:comm_bus, :assembly, :stop], measurements, metadata, _config) do
Logger.info("Assembly completed in #{measurements.duration}ms, tokens: #{metadata.token_usage}")
end
def handle_event([:comm_bus, :assembly, :exception], measurements, metadata, _config) do
Logger.error("Assembly failed: #{inspect(metadata.error)}")
end
endPerformance Optimization
1. Entry Caching:
defmodule MyApp.CachedEntryLoader do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def get_entries do
GenServer.call(__MODULE__, :get_entries)
end
def init(_) do
schedule_refresh()
{:ok, load_entries()}
end
def handle_call(:get_entries, _from, entries) do
{:reply, entries, entries}
end
def handle_info(:refresh, _entries) do
schedule_refresh()
{:noreply, load_entries()}
end
defp load_entries do
{:ok, entries} = CommBus.Storage.list_entries(filters: [enabled: true])
entries
end
defp schedule_refresh do
Process.send_after(self(), :refresh, :timer.minutes(5))
end
end2. Prompt Preloading:
CommBus automatically caches prompts in :persistent_term, but you can preload them on application start:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Preload all prompts
CommBus.Prompts.Runtime.preload_all()
children = [
# ... other children
]
Supervisor.start_link(children, strategy: :one_for_one)
end
endMigration from Other Solutions
From Direct OpenAI API Calls
Before:
messages = [
%{role: "system", content: "You are a helpful assistant."},
%{role: "user", content: user_input}
]
OpenAI.chat_completion(messages, model: "gpt-4")After (with CommBus):
conversation = %Conversation{
messages: [%Message{role: :user, content: user_input}]
}
entries = [
%Entry{id: "system", mode: :constant, section: :system, content: "You are a helpful assistant."}
]
packet = Assembler.assemble_prompt(conversation, entries)
OpenAI.chat_completion(packet.messages, model: "gpt-4")Benefits: Dynamic context injection, token budget management, keyword-triggered entries
From LangChain
CommBus complements LangChain by handling context assembly before chains execute:
# Assemble context with CommBus
packet = CommBus.Assembler.assemble_prompt(conversation, entries)
# Use in LangChain
chain = %{llm: ChatOpenAI.new!(%{model: "gpt-4"})}
LLMChain.run(chain, messages: packet.messages)Next Steps
- Read the Integration Guide for DevMan, HuMan, and llm_core integration patterns
- Explore the API documentation for detailed module references
- Review CHANGELOG.md for version history and updates