Architecture

View Source

Beamlens uses an autonomous operator architecture where specialized operators run LLM-driven loops to monitor skills and detect anomalies. Use Beamlens.Operator.run/2 for triggered analysis that returns results immediately. Notifications are emitted via telemetry.

Supervision Tree

Add Beamlens to your application's supervision tree:

{Beamlens, []}

This starts the following components:

graph TD
    S[Beamlens.Supervisor]
    S --> TS[TaskSupervisor]
    S --> OR[OperatorRegistry]
    S --> LS[LogStore]
    S --> ES[ExceptionStore]
    S --> VES["VmEvents.EventStore*"]
    S --> EGS["Ets.GrowthStore*"]
    S --> BAS["Beam.AtomStore*"]
    S --> TR["Tracer*"]
    S --> AS["Anomaly.Supervisor*"]
    S --> C[Coordinator]
    S --> OS[Operator.Supervisor]

* Conditional children - started based on enabled skills

Operators and Coordinator are static, always-running processes invoked via Beamlens.Operator.run/2 or Beamlens.Coordinator.run/2.

Inter-Process Communication

Operator → Coordinator Communication

Beamlens uses direct Erlang message passing for operator-coordinator communication:

# Coordinator invokes static operator
case Registry.lookup(Beamlens.OperatorRegistry, skill_module) do
  [{pid, _}] ->
    ref = Process.monitor(pid)
    Operator.run_async(pid, %{reason: context}, notify_pid: self())
end

# Operator sends notifications to coordinator
send(notify_pid, {:operator_notification, self(), notification})

# Operator signals completion
send(notify_pid, {:operator_complete, self(), Beamlens.Skill.Beam, result})

Process Lifecycle

            
               Beamlens.Supervisor 
            
                        
        
                                      
    
 Coordinator       ...          Operator.Super.  
   (idle)        (stores)      
             
                                   
                                                  
                              
                             Operator        Operator   
         invoke via          (idle)          (idle)     
         run_async()          
        
                                     send messages
                                    
                          {:operator_notification, pid, data}
                          {:operator_complete, pid, skill, result}

Crash Propagation:

  • Coordinator crash → Operators continue running (separate supervisor)
  • Operator crash at rest → Operator.Supervisor restarts it
  • Operator crash during invocation → Coordinator receives {:DOWN, ref, ...}

Message Protocol

Notification Message

{:operator_notification, operator_pid, %Notification{
  id: "abc123",
  operator: Beamlens.Skill.Beam,
  anomaly_type: "memory_threshold_exceeded",
  severity: :warning,
  context: "Node running for 3 days, 500 processes",
  observation: "Memory usage at 92%, exceeding threshold",
  hypothesis: "Likely ETS table growth",
  snapshots: [...]
}}

See Beamlens.Operator for implementation.

Intent Decomposition

Notifications separate facts from speculation to enable better correlation:

FieldTypeDescription
contextFactSystem state at detection time
observationFactWhat anomaly was detected
hypothesisSpeculationWhat might be causing it (optional)

The Coordinator correlates on context + observation only. Hypotheses require corroboration.

Insight Grounding

When producing insights, the Coordinator tracks how hypotheses were validated:

FieldDescription
matched_observationsExact observation text that correlated
hypothesis_groundedWhether speculation was corroborated by multiple operators

Completion Message

{:operator_complete, operator_pid, Beamlens.Skill.Beam, %Beamlens.Operator.CompletionResult{
  state: :healthy,
  notifications: [...],
  snapshots: [...]
}}

See lib/beamlens/operator.ex:732 for implementation.

Crash Handling

# Coordinator handles operator crashes
{:DOWN, ref, :process, pid, reason}
{:EXIT, pid, reason}

# Removes operator from running_operators map
# Emits telemetry: [:beamlens, :coordinator, :operator_crashed]

See lib/beamlens/coordinator.ex handle_info callbacks for crash handling.

On-Demand vs Continuous Mode

On-Demand (Operator.run/2):

  • Invokes static operator process
  • Operator transitions from idle to running, then back to idle
  • Caller receives result directly

Async (Operator.run_async/3):

  • Invokes static operator without blocking
  • Caller receives {:operator_notification, ...} and {:operator_complete, ...} messages
  • Multiple callers can queue invocations

On-Demand Execution Sequence

User
  
   Coordinator.run(context)
         
          GenServer.call to static Coordinator
                
                 Registry.lookup for Operator
                
                 Operator.run_async (invokes static Operator)
                       
                        LLM analysis loop
                       
                        send {:operator_complete, ...}
                
                 receive completion message
                
                 reply to caller
         
          block until timeout or completion
  
   {:ok, result}

Alternative: Standalone Operators

Operators can run without a coordinator:

# No notify_pid — operator doesn't send messages
{:ok, pid} = Operator.start_link(skill: Beamlens.Skill.Beam)

# Query status directly
status = Operator.status(pid)

# Notifications stored in operator state (not forwarded)

Telemetry Integration

While operators use direct messaging, Beamlens still emits telemetry events for observability:

:telemetry.execute(
  [:beamlens, :coordinator, :operator_notification_received],
  %{},
  %{notification_id: id, operator_pid: pid}
)

External systems subscribe to telemetry, not PubSub. See Telemetry Events for full list.

Operator Loop

Each operator is a GenServer running an LLM-driven analysis loop:

flowchart TD
    START[Start] --> SNAP[Build snapshot context]
    SNAP --> LLM[LLM selects tool]
    LLM --> EXEC[Execute tool]
    EXEC --> |SetState| STATE[Update state]
    EXEC --> |SendNotification| NOTIFY[Emit telemetry]
    EXEC --> |TakeSnapshot| CAPTURE[Store snapshot]
    EXEC --> |Execute| LUA[Run Lua code]
    EXEC --> |Wait| SLEEP[Sleep ms]
    STATE --> LLM
    NOTIFY --> LLM
    CAPTURE --> LLM
    LUA --> LLM
    SLEEP --> LLM

The LLM controls the loop timing via the wait tool. There are no fixed schedules.

On-Demand Analysis

For scheduled or triggered analysis (e.g., Oban workers):

{:ok, notifications} = Beamlens.Operator.run(Beamlens.Skill.Beam, %{reason: "high memory detected"})

The LLM investigates and calls done() when finished, returning notifications generated during analysis.

State Model

Operators maintain one of four states reflecting current assessment:

StateDescription
healthyEverything is normal
observingSomething looks off, gathering more data
warningElevated concern, not yet critical
criticalActive issue requiring attention

State transitions are driven by the LLM via the set_state tool.

Coordinator

The Coordinator is a GenServer that correlates operator notifications into unified insights. When you call Beamlens.Coordinator.run/2, it spawns operators, collects their notifications, and runs an LLM-driven analysis to produce insights.

Notification States

StateDescription
unreadNew notification, not yet processed
acknowledgedCurrently being analyzed
resolvedProcessed (correlated into insight or dismissed)

Coordinator Tools

ToolDescription
get_notificationsQuery notifications, optionally filtered by status
update_notification_statusesSet status on multiple notifications
produce_insightCreate insight correlating notifications (auto-resolves them)
doneSignal analysis completion and return results
thinkReason through complex decisions before acting
invoke_operatorsSpawn multiple operators in parallel
message_operatorSend message to running operator, get LLM response
get_operator_statusesCheck status of running operators
waitPause loop for specified duration

Correlation Types

When producing insights, the Coordinator classifies how notifications are related:

TypeDescription
temporalNotifications occurred close in time, possibly related
causalOne notification directly caused another (A → B)
symptomaticNotifications share a common hidden cause (A ← X → B)

Subscribe to Insights

:telemetry.attach("my-insights", [:beamlens, :coordinator, :insight_produced], fn
  _event, _measurements, %{insight: insight}, _config ->
    Logger.info("Insight: #{insight.summary}")
end, nil)

Available Tools

ToolDescription
set_stateUpdate operator state with reason
send_notificationCreate notification with referenced snapshots
get_notificationsRetrieve previous notifications for correlation
take_snapshotCapture current metrics with unique ID
get_snapshotRetrieve specific snapshot by ID
get_snapshotsRetrieve multiple snapshots with pagination
executeRun Lua code with metric callbacks
waitSleep before next iteration (LLM-controlled timing)
thinkReason through complex decisions before acting
doneSignal analysis completion

Lua Callbacks

The execute tool runs Lua code in a sandbox with access to skill-specific callbacks. Each skill provides its own prefixed callbacks (e.g., beam_get_memory for the BEAM skill).

Example Lua code for the BEAM skill:

local mem = beam_get_memory()
local procs = beam_top_processes(5, "memory")
return {memory = mem, top_procs = procs}

See the skill sections below for available callbacks per skill.

Base Callbacks

All operators have access to these common callbacks regardless of their skill:

CallbackDescription
get_current_time()Returns UTC timestamp (ISO 8601 format and Unix milliseconds)
get_node_info()Returns node name, uptime in seconds, OS type and name

These callbacks are provided by Beamlens.Skill.Base and are automatically available in the Lua sandbox for all operators.

Telemetry Events

Operators and the Coordinator emit telemetry events for observability. Key events:

EventDescription
[:beamlens, :operator, :started]Operator initialized
[:beamlens, :operator, :state_change]State transitioned
[:beamlens, :operator, :notification_sent]Notification created
[:beamlens, :operator, :iteration_start]Loop iteration began
[:beamlens, :operator, :done]On-demand analysis completed
[:beamlens, :coordinator, :started]Coordinator initialized
[:beamlens, :coordinator, :notification_received]Notification queued for correlation
[:beamlens, :coordinator, :iteration_start]Analysis loop iteration began
[:beamlens, :coordinator, :insight_produced]Insight created from correlated notifications
[:beamlens, :coordinator, :done]Analysis loop completed
[:beamlens, :llm, :start]LLM call started
[:beamlens, :llm, :stop]LLM call completed
[:beamlens, :compaction, :start]Context compaction started
[:beamlens, :compaction, :stop]Context compaction completed

Subscribe to notifications:

:telemetry.attach("my-notifications", [:beamlens, :operator, :notification_sent], fn
  _event, _measurements, %{notification: notification}, _config ->
    Logger.warning("Notification: #{notification.observation}")
end, nil)

See Beamlens.Telemetry for the complete event list.

LLM Integration

Beamlens uses BAML for type-safe LLM prompts via Puck. Two BAML functions handle the agent loops:

  • OperatorRun: Operator analysis loop (uses done() to signal completion)
  • CoordinatorRun: Coordinator analysis with operator invocation capabilities

Default LLM: Anthropic Claude Haiku (claude-haiku-4-5-20251001)

LLM Client Configuration

Configure alternative LLM providers via :client_registry when running analysis:

{:ok, result} = Beamlens.Coordinator.run(%{reason: "health check"},
  client_registry: %{
    primary: "Ollama",
    clients: [
      %{name: "Ollama", provider: "openai-generic",
        options: %{base_url: "http://localhost:11434/v1", model: "llama3"}}
    ]
  }
)

See providers.md for configuration examples.

Compaction

Operators and the Coordinator use context compaction to run indefinitely without exceeding the LLM's context window. When the context grows beyond a configurable token threshold, Puck's summarization strategy compacts the conversation while preserving essential information.

Configuration Options:

OptionDefaultDescription
:compaction_max_tokens50,000Token threshold before compaction triggers
:compaction_keep_last5Recent messages to keep verbatim after compaction

The compaction prompt preserves:

  • Anomalies detected and trend direction
  • Snapshot IDs (exact values required for notification references)
  • Key metric values that informed decisions
  • Notifications sent and their reasons

Compaction events are emitted via telemetry: [:beamlens, :compaction, :start] and [:beamlens, :compaction, :stop].

Sizing Guidance: Set :compaction_max_tokens to roughly 10% of your model's context window. This leaves ample room for the compacted summary, new incoming messages, and system prompts. For a 200k context window, 20k is reasonable. For smaller windows (e.g., 32k), reduce to 3k.

Built-in Skills

ModuleDescription
Beamlens.Skill.AllocatorMemory allocator monitoring (mbuf, binary, driver, etc.)
Beamlens.Skill.BeamBEAM VM metrics (memory, processes, schedulers, atoms)
Beamlens.Skill.EtsETS table monitoring
Beamlens.Skill.GcGarbage collection statistics
Beamlens.Skill.LoggerApplication log monitoring
Beamlens.Skill.AnomalyStatistical anomaly detection with auto-trigger
Beamlens.Skill.OverloadMessage queue overload analysis and bottleneck detection
Beamlens.Skill.PortsPort monitoring (file descriptors, sockets)
Beamlens.Skill.SupervisorSupervisor tree monitoring
Beamlens.Skill.OsOS-level metrics (CPU, memory, disk via os_mon)
Beamlens.Skill.VmEventsSystem event monitoring (long GC, large heap)
Beamlens.Skill.TracerProcess tracing for debugging

BEAM Skill (Beamlens.Skill.Beam)

Monitors BEAM VM runtime health.

Snapshot Metrics:

  • Process utilization %
  • Port utilization %
  • Atom utilization %
  • Scheduler run queue depth
  • Schedulers online

Lua Callbacks:

CallbackDescription
beam_get_memory()Memory breakdown by category
beam_get_processes()Process/port counts and limits
beam_get_schedulers()Scheduler stats and run queue
beam_get_atoms()Atom table statistics
beam_get_system()Node info, OTP version, uptime
beam_get_persistent_terms()Persistent term count and memory
beam_top_processes(limit, sort_by)Top processes by memory/queue/reductions

ETS Skill (Beamlens.Skill.Ets)

Monitors ETS table health and memory usage.

Snapshot Metrics:

  • Table count
  • Total memory (MB)
  • Largest table memory (MB)

Lua Callbacks:

CallbackDescription
ets_list_tables()All tables: name, type, protection, size, memory
ets_table_info(table_name)Single table details
ets_top_tables(limit, sort_by)Top N tables by "memory" or "size"

GC Skill (Beamlens.Skill.Gc)

Monitors garbage collection activity.

Snapshot Metrics:

  • Total GCs
  • Words reclaimed
  • Bytes reclaimed (MB)

Lua Callbacks:

CallbackDescription
gc_stats()Global GC statistics
gc_top_processes(limit)Processes with largest heaps

Logger Skill (Beamlens.Skill.Logger)

Monitors application logs via Erlang's :logger handler system.

Important: The Logger skill captures application log messages and makes them available for LLM analysis. Ensure your application logs do not contain sensitive data (PII, secrets, tokens) before enabling this operator. Review your logging configuration to verify log messages are safe for analysis.

Snapshot Metrics:

  • Total log count (1 minute window)
  • Error count (1 minute window)
  • Warning count (1 minute window)
  • Error rate %
  • Unique error modules

Lua Callbacks:

CallbackDescription
logger_stats()Log statistics: counts by level, error rate
logger_recent(limit, level)Recent logs, optionally filtered by level
logger_errors(limit)Recent error-level logs
logger_search(pattern, limit)Search logs by regex pattern
logger_by_module(module_name, limit)Logs from modules matching name

Ports Skill (Beamlens.Skill.Ports)

Monitors BEAM ports (file descriptors, sockets).

Snapshot Metrics:

  • Port count
  • Port limit
  • Port utilization %

Lua Callbacks:

CallbackDescription
ports_list()All ports: id, name, connected_pid
ports_info(port_id)Port details: I/O bytes, memory
ports_top(limit, sort_by)Top N ports by "input", "output", or "memory"

Supervisor Skill (Beamlens.Skill.Supervisor)

Monitors supervisor tree structure.

Snapshot Metrics:

  • Supervisor count
  • Total children

Lua Callbacks:

CallbackDescription
sup_list()All supervisors: name, pid, child_count, active_children
sup_children(supervisor_name)Direct children: id, pid, type
sup_tree(supervisor_name)Full supervision tree (recursive, depth-limited)
sup_unlinked_processes()Processes with no links or monitors (potential leaks)
sup_orphaned_processes()Processes whose parent/ancestor has died
sup_tree_integrity(supervisor_name)Supervision tree health check with anomaly detection
sup_zombie_children(supervisor_name)Children of dead supervisors (indicates supervisor crash without cleanup)

Os Skill (Beamlens.Skill.Os)

Monitors OS-level system health via Erlang's os_mon application.

Requirement: Add :os_mon to your application's extra_applications. Platform Notes: cpu_sup metrics are only available on Unix.

Snapshot Metrics:

  • CPU load (1m, 5m, 15m averages)
  • Memory used %
  • Disk max used %

Lua Callbacks:

CallbackDescription
system_get_cpu()CPU load averages and process count
system_get_memory()System memory stats
system_get_disks()Disk usage per mount point

Anomaly Skill (Beamlens.Skill.Anomaly)

Statistical anomaly detection with automatic Coordinator triggering based on learned baselines.

Configuration:

{Beamlens, [
  skills: [
    {Beamlens.Skill.Anomaly, [
      collection_interval_ms: 60_000,
      learning_duration_ms: 300_000,
      z_threshold: 3.0,
      consecutive_required: 3,
      cooldown_duration_ms: 900_000,
      auto_trigger: true,         # default: true
      max_triggers_per_hour: 3    # default: 3
    ]}
  ]
]}

Snapshot Metrics:

  • Current state (learning, active, cooldown)
  • Baseline statistics (mean, std_dev, percentiles)
  • Anomaly count
  • Metrics collected

Lua Callbacks:

CallbackDescription
monitor_get_state()Current detector state (:learning, :active, or :cooldown)
monitor_get_status()Detailed status including learning progress, consecutive anomaly count, and configuration

Overload Skill (Beamlens.Skill.Overload)

Message queue overload detection, bottleneck analysis, and cascade detection.

Snapshot Metrics:

  • Total queue size
  • Processes with queue > 1000
  • Processes with queue > 10000
  • Max queue size

Lua Callbacks:

CallbackDescription
overload_get_queue_status()Overall queue health and severity
overload_get_overloaded_processes()Processes with large queues
overload_detect_bottlenecks()Identify blocked processes and causes
overload_detect_cascades()Detect cascading queue failures
overload_predict_growth()Predict queue saturation times

VmEvents Skill (Beamlens.Skill.VmEvents)

Monitors OS-level system events via Erlang's :system_monitor.

Snapshot Metrics:

  • Long GC events (5m)
  • Long schedule events (5m)
  • Busy port events (5m)
  • Busy dist port events (5m)
  • Max GC duration (ms)
  • Max schedule duration (ms)
  • Affected process count
  • Affected port count

Lua Callbacks:

CallbackDescription
sysmon_stats()Event counts and max durations
sysmon_events(type, limit)Recent events, optionally filtered by type

Tracer Skill (Beamlens.Skill.Tracer)

Production-safe function call tracing powered by Recon.

Safety Guarantees:

  • Rate limiting: 5 traces per second max
  • Message limit: 50 traces before auto-stop
  • Time limit: 60 seconds max before auto-stop
  • Blocked hot paths: High-frequency stdlib functions rejected
  • Arity required: Must specify function arity (no wildcards)

Snapshot Metrics:

  • Active (boolean)
  • Trace count
  • Trace spec (module, function, arity)
  • Elapsed time (ms)

Skill Callbacks:

CallbackDescription
trace_start({module, function, arity})Start tracing a specific function (tuple argument)
trace_stop()Stop the active trace session
trace_get()Get collected trace events

Allocator Skill (Beamlens.Skill.Allocator)

Memory allocator monitoring (mbuf, binary, driver, etc.).

Snapshot Metrics:

  • Allocator types
  • Memory usage by allocator type

Lua Callbacks:

CallbackDescription
allocator_get_info()Allocator information by type
allocator_get_mbuf()Mbuf allocator stats
allocator_get_binary()Binary allocator stats

Ecto Skill (Beamlens.Skill.Ecto)

Monitors Ecto database health. Requires a custom skill module and supporting infrastructure.

Step 1: Create a skill module:

defmodule MyApp.EctoSkill do
  use Beamlens.Skill.Ecto, repo: MyApp.Repo
end

Step 2: Add the required components to your supervision tree:

children = [
  # Ecto skill infrastructure (must start before Beamlens)
  {Registry, keys: :unique, name: Beamlens.Skill.Ecto.Registry},
  {Beamlens.Skill.Ecto.TelemetryStore, repo: MyApp.Repo},

  # Configure Beamlens with Ecto skill
  {Beamlens, skills: [MyApp.EctoSkill]}
]

# Trigger investigation
{:ok, result} = Beamlens.Coordinator.run(%{reason: "slow queries detected"})

Snapshot Metrics:

  • Query count (1 minute window)
  • Average query time (ms)
  • Max query time (ms)
  • P95 query time (ms)
  • Slow query count
  • Error count

Lua Callbacks:

CallbackDescription
ecto_query_stats()Query statistics from telemetry
ecto_slow_queries(limit)Recent slow queries from telemetry
ecto_pool_stats()Connection pool health
ecto_db_slow_queries(limit)Slow queries from pg_stat_statements (PostgreSQL)
ecto_index_usage()Index scan statistics (PostgreSQL)
ecto_unused_indexes()Indexes with zero scans (PostgreSQL)
ecto_table_sizes(limit)Table sizes (PostgreSQL)
ecto_cache_hit()Buffer cache hit ratios (PostgreSQL)
ecto_locks()Active database locks (PostgreSQL)
ecto_long_running()Long-running queries (PostgreSQL)
ecto_bloat(limit)Table/index bloat (PostgreSQL)
ecto_connections()Database connections (PostgreSQL)

PostgreSQL-specific callbacks require {:ecto_psql_extras, "~> 0.8"} as an optional dependency.

Exception Skill (Beamlens.Skill.Exception)

Monitors application exceptions via Tower's reporter system.

Important: The Exception skill captures exception messages and stacktraces which may contain sensitive data (file paths, variable values). Ensure your exception handling does not expose PII before enabling this operator.

Requirement: Requires Tower to be installed and configured:

# In mix.exs deps
{:tower, "~> 0.8.6"}

# In config/config.exs
config :tower,
  reporters: [Beamlens.Skill.Exception.ExceptionStore]

Snapshot Metrics:

  • Total exceptions (5 minute window)
  • Counts by kind (error, exit, throw)
  • Counts by level
  • Top exception types
  • Unique exception type count

Lua Callbacks:

CallbackDescription
exception_stats()Exception statistics: total_count, by_kind, by_level, top_types
exception_recent(limit, kind)Recent exceptions, optionally filtered by kind
exception_by_type(exception_type, limit)Exceptions matching type name (e.g., "ArgumentError")
exception_search(pattern, limit)Search exception messages by regex pattern
exception_stacktrace(exception_id)Get full stacktrace for specific exception by ID

Custom Skills

Implement the Beamlens.Skill behaviour to create custom monitoring skills:

defmodule MyApp.Skills.Postgres do
  @behaviour Beamlens.Skill

  @impl true
  def title, do: "PostgreSQL"

  @impl true
  def description, do: "PostgreSQL database: connections, query performance, pool health"

  @impl true
  def system_prompt do
    """
    You are a PostgreSQL database monitor. Track connection pool health,
    query performance, and database resource utilization.

    ## What to Watch For
    - Pool exhaustion (checked_out approaching pool_size)
    - Slow query accumulation
    - Connection queue buildup
    """
  end

  @impl true
  def snapshot do
    %{
      active_connections: count_active(),
      pool_size: pool_size(),
      query_queue_depth: queue_depth()
    }
  end

  @impl true
  def callbacks do
    %{
      "postgres_slow_queries" => &slow_queries/0,
      "postgres_pool_stats" => &pool_stats/0
    }
  end

  @impl true
  def callback_docs do
    """
    ### postgres_slow_queries()
    Returns queries exceeding threshold: query, duration_ms, calls

    ### postgres_pool_stats()
    Connection pool stats: size, available, checked_out, waiting
    """
  end
end

Register in supervision tree:

{Beamlens, skills: [Beamlens.Skill.Beam, MyApp.Skills.Postgres]}

# Trigger investigation
{:ok, result} = Beamlens.Coordinator.run(%{reason: "database performance check"})