Snakepit Telemetry
View SourceSnakepit provides a comprehensive distributed telemetry system that enables observability across your Elixir cluster and Python workers. All telemetry flows through Elixir's standard :telemetry library, providing a unified interface for monitoring, metrics, and tracing.
Features
- Distributed by Design - All events include node metadata for cluster-wide visibility
- Python-to-Elixir Event Folding - Python worker metrics appear as Elixir
:telemetryevents - Bidirectional gRPC Stream - Real-time event streaming with runtime control
- Atom Safety - Curated event catalog prevents atom table exhaustion
- Runtime Control - Adjust sampling rates, filtering, and toggle telemetry without restarting workers
- Zero External Dependencies - Core system uses only stdlib +
:telemetry - High Performance - <10μs overhead per event, <1% CPU impact
Quick Start
Attaching Event Handlers
# In your application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Attach telemetry handlers
:telemetry.attach(
"my-app-python-monitor",
[:snakepit, :python, :call, :stop],
&MyApp.Telemetry.handle_python_call/4,
nil
)
children = [
# ... your supervision tree
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
defmodule MyApp.Telemetry do
require Logger
def handle_python_call(_event, measurements, metadata, _config) do
duration_ms = measurements.duration / 1_000_000
Logger.info("Python call completed",
command: metadata.command,
duration_ms: duration_ms,
worker_id: metadata.worker_id,
node: metadata.node
)
end
endEmitting from Python
from snakepit_bridge import telemetry
def my_tool(ctx, data):
# Use span for automatic timing
with telemetry.span("tool.execution", {"tool": "my_tool"}):
result = expensive_operation(data)
# Emit custom metrics
telemetry.emit(
"tool.result_size",
{"bytes": len(result)},
{"tool": "my_tool"}
)
return resultEvent Catalog
Snakepit emits events across three layers:
Layer 1: Infrastructure Events (Elixir)
Pool Management:
[:snakepit, :pool, :initialized]- Pool initialization complete[:snakepit, :pool, :status]- Periodic pool status snapshot[:snakepit, :pool, :queue, :enqueued]- Request queued (no workers available)[:snakepit, :pool, :queue, :dequeued]- Request dequeued (worker available)[:snakepit, :pool, :queue, :timeout]- Request timed out in queue
Worker Lifecycle:
[:snakepit, :pool, :worker, :spawn_started]- Worker spawn initiated[:snakepit, :pool, :worker, :spawned]- Worker ready and connected[:snakepit, :pool, :worker, :spawn_failed]- Worker failed to start[:snakepit, :pool, :worker, :terminated]- Worker terminated[:snakepit, :pool, :worker, :restarted]- Worker restarted by supervisor
Session Management:
[:snakepit, :session, :created]- New session created[:snakepit, :session, :destroyed]- Session destroyed[:snakepit, :session, :affinity, :assigned]- Session assigned to worker[:snakepit, :session, :affinity, :broken]- Session affinity broken
Layer 2: Python Execution Events (Folded from Python)
Call Lifecycle:
[:snakepit, :python, :call, :start]- Python command started[:snakepit, :python, :call, :stop]- Python command completed successfully[:snakepit, :python, :call, :exception]- Python command raised exception
Tool Execution:
[:snakepit, :python, :tool, :execution, :start]- Tool execution started[:snakepit, :python, :tool, :execution, :stop]- Tool execution completed[:snakepit, :python, :tool, :execution, :exception]- Tool execution failed[:snakepit, :python, :tool, :result_size]- Tool result size metric
Resource Metrics:
[:snakepit, :python, :memory, :sampled]- Python process memory usage[:snakepit, :python, :cpu, :sampled]- Python process CPU usage[:snakepit, :python, :gc, :completed]- Python garbage collection completed[:snakepit, :python, :error, :occurred]- Python error detected
Layer 3: gRPC Bridge Events (Elixir)
Call Events:
[:snakepit, :grpc, :call, :start]- gRPC call initiated[:snakepit, :grpc, :call, :stop]- gRPC call completed[:snakepit, :grpc, :call, :exception]- gRPC call failed
Stream Events:
[:snakepit, :grpc, :stream, :opened]- Streaming RPC opened[:snakepit, :grpc, :stream, :message]- Stream message sent/received[:snakepit, :grpc, :stream, :closed]- Stream closed
Connection Events:
[:snakepit, :grpc, :connection, :established]- gRPC channel connected[:snakepit, :grpc, :connection, :lost]- gRPC connection lost[:snakepit, :grpc, :connection, :reconnected]- gRPC reconnected after failure
Usage Patterns
Monitoring Python Call Performance
:telemetry.attach(
"python-perf-monitor",
[:snakepit, :python, :call, :stop],
fn _event, %{duration: duration}, metadata, _ ->
duration_ms = duration / 1_000_000
if duration_ms > 1000 do
Logger.warning("Slow Python call detected",
command: metadata.command,
duration_ms: duration_ms,
worker_id: metadata.worker_id
)
end
end,
nil
)Tracking Worker Health
:telemetry.attach(
"worker-health-monitor",
[:snakepit, :pool, :worker, :restarted],
fn _event, %{restart_count: count}, metadata, _ ->
if count > 5 do
Logger.error("Worker restarting frequently",
worker_id: metadata.worker_id,
restart_count: count,
reason: metadata.reason
)
# Alert ops team
MyApp.Alerts.send_alert(:worker_flapping, metadata)
end
end,
nil
)Monitoring Queue Depth
:telemetry.attach(
"queue-depth-monitor",
[:snakepit, :pool, :status],
fn _event, %{queue_depth: depth}, metadata, _ ->
if depth > 50 do
Logger.error("High queue depth detected",
pool: metadata.pool_name,
queue_depth: depth,
available_workers: metadata.available_workers
)
# Trigger autoscaling
MyApp.Autoscaler.scale_up(metadata.pool_name)
end
end,
nil
)Distributed Tracing with Correlation IDs
:telemetry.attach_many(
"distributed-tracer",
[
[:snakepit, :pool, :queue, :enqueued],
[:snakepit, :python, :call, :start],
[:snakepit, :python, :call, :stop]
],
fn event, measurements, %{correlation_id: id} = metadata, _ ->
# All events for the same request share the same correlation_id
MyApp.Tracing.record_span(id, event, measurements, metadata)
end,
nil
)Integration with Metrics Systems
Prometheus
# mix.exs
def deps do
[
{:snakepit, "~> 0.7"},
{:telemetry_metrics_prometheus, "~> 1.1"}
]
end
# lib/myapp/telemetry.ex
defmodule MyApp.Telemetry do
use Supervisor
import Telemetry.Metrics
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
def init(_arg) do
children = [
{:telemetry_metrics_prometheus, metrics: metrics()}
]
Supervisor.init(children, strategy: :one_for_one)
end
defp metrics do
[
# Pool metrics
last_value("snakepit.pool.status.queue_depth",
tags: [:node, :pool_name]
),
last_value("snakepit.pool.status.available_workers",
tags: [:node, :pool_name]
),
# Python call metrics
summary("snakepit.python.call.stop.duration",
unit: {:native, :millisecond},
tags: [:node, :pool_name, :command]
),
counter("snakepit.python.call.exception.count",
tags: [:node, :error_type]
),
# Worker lifecycle
counter("snakepit.pool.worker.spawned.count",
tags: [:node, :pool_name]
)
]
end
endStatsD
# mix.exs
{:telemetry_metrics_statsd, "~> 0.7"}
# In your telemetry module
children = [
{TelemetryMetricsStatsd,
metrics: metrics(),
host: "statsd.local",
port: 8125}
]OpenTelemetry
# mix.exs
{:opentelemetry_telemetry, "~> 1.0"}
# Attach OTEL handlers
:telemetry.attach_many(
"otel-tracer",
[
[:snakepit, :python, :call, :start],
[:snakepit, :python, :call, :stop],
[:snakepit, :python, :call, :exception]
],
&OpentelemetryTelemetry.handle_event/4,
%{span_name: "snakepit.python.call"}
)Runtime Control
Adjusting Sampling Rates
# Reduce to 10% sampling for high-frequency events
Snakepit.Telemetry.GrpcStream.update_sampling("worker_1", 0.1)
# Apply to specific event patterns
Snakepit.Telemetry.GrpcStream.update_sampling(
"worker_1",
0.1,
["python.call.*"]
)Toggling Telemetry
# Disable telemetry for a specific worker
Snakepit.Telemetry.GrpcStream.toggle("worker_1", false)
# Re-enable
Snakepit.Telemetry.GrpcStream.toggle("worker_1", true)Event Filtering
# Only allow specific events
Snakepit.Telemetry.GrpcStream.update_filter("worker_1",
allow: ["python.call.*", "python.tool.*"]
)
# Block specific events
Snakepit.Telemetry.GrpcStream.update_filter("worker_1",
deny: ["python.memory.sampled"]
)Python API
Basic Event Emission
from snakepit_bridge import telemetry
# Emit a simple event
telemetry.emit(
"tool.execution.start",
{"system_time": time.time_ns()},
{"tool": "my_tool", "operation": "predict"},
correlation_id="abc-123"
)Span Context Manager
from snakepit_bridge import telemetry
def my_tool(ctx, data):
# Automatically emits start/stop/exception events
with telemetry.span("tool.execution", {"tool": "my_tool"}):
result = process_data(data)
# Emit additional metrics within the span
telemetry.emit(
"tool.result_size",
{"bytes": len(result)},
{"tool": "my_tool"}
)
return resultCorrelation ID Propagation
from snakepit_bridge import telemetry
def my_tool(ctx, data):
# Get correlation ID from context
correlation_id = telemetry.get_correlation_id()
# All events within this tool will share the same correlation_id
with telemetry.span("tool.execution", {"tool": "my_tool"}, correlation_id):
result = do_work(data)
return resultEvent Structure
All telemetry events follow the standard :telemetry structure:
:telemetry.execute(
event_name :: [atom()], # [:snakepit, :component, :resource, :action]
measurements :: %{atom() => number()},
metadata :: %{atom() => term()}
)Measurements
Numeric data about the event:
%{
duration: 1_234_567, # Native time units
system_time: 1_698_234_567_890, # System.system_time()
queue_depth: 5,
memory_bytes: 1_048_576
}Metadata
Contextual information about the event:
%{
node: :node1@localhost, # Always included
pool_name: :default,
worker_id: "worker_1",
command: "predict",
correlation_id: "abc-123", # For distributed tracing
result: :success
}Performance Considerations
Event Frequency
High Frequency (consider sampling):
[:snakepit, :python, :call, :*]- Every command execution[:snakepit, :grpc, :call, :*]- Every gRPC call
Medium Frequency:
[:snakepit, :pool, :worker, :*]- Worker lifecycle[:snakepit, :session, :*]- Session lifecycle
Low Frequency:
[:snakepit, :pool, :status]- Periodic (every 5-60s)[:snakepit, :python, :memory, :sampled]- Periodic
Sampling Strategy
# For high-frequency events, use sampling
config :snakepit, :telemetry,
sampling_rate: 0.1 # 10% of events
# Or sample per-worker at runtime
Snakepit.Telemetry.GrpcStream.update_sampling("worker_1", 0.1)Performance Impact
- Event emission: ~1-5 μs
- gRPC serialization: ~1-2 μs
- Validation: ~2-3 μs
- Total overhead: <10 μs per event
- CPU impact: <1% at 100% sampling, <0.1% at 10% sampling
Troubleshooting
No Events Received
# Check if telemetry is enabled
Application.get_env(:snakepit, :telemetry, [])
# List attached handlers
:telemetry.list_handlers([:snakepit, :python, :call, :stop])
# Test emission manually
:telemetry.execute(
[:snakepit, :python, :call, :stop],
%{duration: 1000},
%{command: "test"}
)Handler Crashes
# Always wrap handlers with error handling
def handle_event(event, measurements, metadata, _config) do
try do
actual_handler(event, measurements, metadata)
rescue
e ->
Logger.error("Telemetry handler crashed: #{inspect(e)}")
end
endHigh Memory Usage
Check Python telemetry queue:
# In Python worker
backend = telemetry.get_backend()
if hasattr(backend, 'stream'):
dropped = backend.stream.dropped_count
if dropped > 0:
logger.warning(f"Telemetry dropped {dropped} events")Performance Issues
# Reduce sampling rate
Snakepit.Telemetry.GrpcStream.update_sampling("worker_1", 0.1)
# Or disable entirely for specific workers
Snakepit.Telemetry.GrpcStream.toggle("worker_1", false)Architecture
The telemetry system consists of three main components:
- Event Catalog (
Snakepit.Telemetry.Naming) - Validates event names and prevents atom table exhaustion - Metadata Safety (
Snakepit.Telemetry.SafeMetadata) - Sanitizes metadata from Python - gRPC Stream Manager (
Snakepit.Telemetry.GrpcStream) - Manages bidirectional streams with Python workers
Events flow: Python → gRPC Stream → Validation → :telemetry.execute() → Your Handlers
Additional Resources
- Detailed Design: See
docs/20251028/telemetry/00_ARCHITECTURE.md - Event Catalog: See
docs/20251028/telemetry/01_EVENT_CATALOG.md - Python Integration: See
docs/20251028/telemetry/02_PYTHON_INTEGRATION.md - Client Guide: See
docs/20251028/telemetry/03_CLIENT_GUIDE.md - gRPC Stream Details: See
docs/20251028/telemetry/04_GRPC_STREAM.md
License
Same as Snakepit - see LICENSE file.