Snakepit

View Source
Snakepit Logo

A high-performance, generalized process pooler and session manager for external language integrations in Elixir

Hex Version Hex Docs License: MIT Elixir

Features

  • High-performance process pooling with concurrent worker initialization
  • Session affinity for stateful operations across requests
  • gRPC streaming for real-time progress updates and large data transfers
  • Bidirectional tool bridge allowing Python to call Elixir functions and vice versa
  • Production-ready process management with automatic orphan cleanup
  • Hardware detection for ML accelerators (CUDA, MPS, ROCm)
  • Fault tolerance with circuit breakers, retry policies, and crash barriers
  • Comprehensive telemetry with OpenTelemetry support
  • Dual worker profiles (process isolation or threaded parallelism)
  • Zero-copy data interop via DLPack and Arrow

Installation

Add snakepit to your dependencies in mix.exs:

def deps do
  [
    {:snakepit, "~> 0.8.7"}
  ]
end

Then run:

mix deps.get
mix snakepit.setup    # Install Python dependencies and generate gRPC stubs
mix snakepit.doctor   # Verify environment is correctly configured

Quick Start

# Execute a command on any available worker
{:ok, result} = Snakepit.execute("ping", %{})

# Execute with session affinity (same worker for related requests)
{:ok, result} = Snakepit.execute_in_session("session_123", "process_data", %{input: data})

# Stream results for long-running operations
Snakepit.execute_stream("batch_process", %{items: items}, fn chunk ->
  IO.puts("Progress: #{chunk["progress"]}%")
end)

Configuration

Simple Configuration

# config/config.exs
config :snakepit,
  pooling_enabled: true,
  adapter_module: Snakepit.Adapters.GRPCPython,
  adapter_args: ["--adapter", "your_adapter_module"],
  pool_size: 10,
  log_level: :error

Multi-Pool Configuration (v0.6+)

config :snakepit,
  pools: [
    %{
      name: :default,
      worker_profile: :process,
      pool_size: 10,
      adapter_module: Snakepit.Adapters.GRPCPython,
      adapter_args: ["--adapter", "my_app.adapters.MainAdapter"]
    },
    %{
      name: :compute,
      worker_profile: :thread,
      pool_size: 4,
      threads_per_worker: 8,
      adapter_args: ["--adapter", "my_app.adapters.ComputeAdapter"]
    }
  ]

Logging Configuration

Snakepit is silent by default (errors only):

config :snakepit, log_level: :error          # Default - errors only
config :snakepit, log_level: :info           # Include info messages
config :snakepit, log_level: :debug          # Verbose debugging
config :snakepit, log_level: :none           # Complete silence

# Filter to specific categories
config :snakepit, log_level: :debug, log_categories: [:grpc, :pool]

Core API

Basic Execution

# Simple command execution
{:ok, result} = Snakepit.execute("command_name", %{param: "value"})

# With timeout
{:ok, result} = Snakepit.execute("slow_command", %{}, timeout: 30_000)

# Target specific pool
{:ok, result} = Snakepit.execute("ml_inference", %{}, pool: :compute)

Session Affinity

Sessions route related requests to the same worker, enabling stateful operations:

session_id = "user_#{user.id}"

# First call establishes worker affinity
{:ok, _} = Snakepit.execute_in_session(session_id, "load_model", %{model: "gpt-4"})

# Subsequent calls go to the same worker
{:ok, result} = Snakepit.execute_in_session(session_id, "generate", %{prompt: "Hello"})
{:ok, result} = Snakepit.execute_in_session(session_id, "generate", %{prompt: "Continue"})

Streaming Operations

# Stream with callback for progress updates
Snakepit.execute_stream("train_model", %{epochs: 100}, fn chunk ->
  case chunk do
    %{"type" => "progress", "epoch" => n, "loss" => loss} ->
      IO.puts("Epoch #{n}: loss=#{loss}")
    %{"type" => "complete", "model_path" => path} ->
      IO.puts("Training complete: #{path}")
  end
end)

# Stream with session affinity
Snakepit.execute_in_session_stream(session_id, "process_batch", %{}, fn chunk ->
  handle_chunk(chunk)
end)

Pool Statistics

# Get pool statistics
stats = Snakepit.get_stats()
# => %{requests: 1523, errors: 2, queued: 0, queue_timeouts: 0}

# List worker IDs
workers = Snakepit.list_workers()
# => ["worker_1", "worker_2", "worker_3"]

# Wait for pool to be ready (useful in tests)
:ok = Snakepit.Pool.await_ready(:default, 10_000)

Worker Profiles

Process Profile (Default)

One Python process per worker. Full process isolation, works with all Python versions.

%{
  name: :default,
  worker_profile: :process,
  pool_size: 10,
  startup_batch_size: 4,      # Spawn 4 workers at a time
  startup_batch_delay_ms: 500 # Wait between batches
}

Best for: I/O-bound workloads, maximum isolation, Python < 3.13.

Thread Profile (Python 3.13+)

Multiple threads within each Python process. Shared memory, true parallelism with free-threaded Python.

%{
  name: :compute,
  worker_profile: :thread,
  pool_size: 4,
  threads_per_worker: 8,
  thread_safety_checks: true
}

Best for: CPU-bound ML workloads, large shared models, Python 3.13+ with free-threading.

Hardware Detection

Snakepit detects available ML accelerators for intelligent device selection:

# Detect all hardware
info = Snakepit.Hardware.detect()
# => %{accelerator: :cuda, cpu: %{cores: 8, ...}, cuda: %{devices: [...]}}

# Check capabilities
caps = Snakepit.Hardware.capabilities()
# => %{cuda: true, mps: false, rocm: false, avx2: true}

# Select device with fallback chain
{:ok, device} = Snakepit.Hardware.select_with_fallback([:cuda, :mps, :cpu])
# => {:ok, {:cuda, 0}}

# Generate hardware identity for lock files
identity = Snakepit.Hardware.identity()
File.write!("hardware.lock", Jason.encode!(identity))

Supported accelerators: CPU (with AVX/AVX2/AVX-512 detection), NVIDIA CUDA, Apple MPS, AMD ROCm.

Fault Tolerance

Circuit Breaker

Prevents cascading failures by temporarily blocking requests after repeated failures:

{:ok, cb} = Snakepit.CircuitBreaker.start_link(
  failure_threshold: 5,
  reset_timeout_ms: 30_000
)

# Execute through circuit breaker
case Snakepit.CircuitBreaker.call(cb, fn -> external_api_call() end) do
  {:ok, result} -> handle_result(result)
  {:error, :circuit_open} -> {:error, :service_unavailable}
  {:error, reason} -> {:error, reason}
end

# Check state
Snakepit.CircuitBreaker.state(cb)  # => :closed | :open | :half_open

Retry Policies

policy = Snakepit.RetryPolicy.new(
  max_attempts: 4,
  backoff_ms: [100, 200, 400, 800],
  jitter: true,
  retriable_errors: [:timeout, :unavailable]
)

# Use with Executor
Snakepit.Executor.execute_with_retry(
  fn -> flaky_operation() end,
  max_attempts: 3,
  backoff_ms: [100, 200, 400]
)

Health Monitoring

{:ok, hm} = Snakepit.HealthMonitor.start_link(
  pool: :default,
  max_crashes: 5,
  crash_window_ms: 60_000
)

Snakepit.HealthMonitor.healthy?(hm)  # => true | false
Snakepit.HealthMonitor.stats(hm)     # => %{total_crashes: 2, ...}

Executor Helpers

# With timeout
Snakepit.Executor.execute_with_timeout(fn -> slow_op() end, timeout_ms: 5000)

# With retry
Snakepit.Executor.execute_with_retry(fn -> flaky_op() end, max_attempts: 3)

# Combined retry + circuit breaker
Snakepit.Executor.execute_with_protection(circuit_breaker, fn ->
  risky_operation()
end, max_attempts: 3)

# Batch execution
Snakepit.Executor.execute_batch(
  [fn -> op1() end, fn -> op2() end, fn -> op3() end],
  max_concurrency: 2
)

Python Adapters

Creating an Adapter

Adapters follow a per-request lifecycle: a new instance is created for each RPC request, initialize() is called at the start, the tool executes, then cleanup() is called (even on error).

# my_adapter.py
from snakepit_bridge import BaseAdapter, tool

# Module-level cache for expensive resources (shared across requests)
_model_cache = {}

class MyAdapter(BaseAdapter):
    def __init__(self):
        super().__init__()
        self.model = None

    def initialize(self):
        """Called at the start of each request."""
        # Load from cache or disk (cache persists across requests)
        if "model" not in _model_cache:
            _model_cache["model"] = load_model()
        self.model = _model_cache["model"]

    def cleanup(self):
        """Called at the end of each request (even on error)."""
        # Release request-specific resources (not the cached model)
        pass

    @tool(description="Run inference on input data")
    def predict(self, input_data: dict) -> dict:
        result = self.model.predict(input_data["text"])
        return {"prediction": result, "confidence": 0.95}

    @tool(description="Process with progress updates", supports_streaming=True)
    def batch_process(self, items: list):
        for i, item in enumerate(items):
            result = self.process_item(item)
            yield {"progress": (i + 1) / len(items) * 100, "result": result}

Thread-Safe Adapters (Python 3.13+)

import threading
from snakepit_bridge import ThreadSafeAdapter, tool, thread_safe_method

# Module-level shared resources (thread-safe access required)
_shared_model = None
_model_lock = threading.Lock()

class ThreadedAdapter(ThreadSafeAdapter):
    __thread_safe__ = True

    def __init__(self):
        super().__init__()
        self.model = None

    def initialize(self):
        """Load shared model (with thread-safe initialization)."""
        global _shared_model
        with _model_lock:
            if _shared_model is None:
                _shared_model = load_model()
        self.model = _shared_model

    @tool
    @thread_safe_method
    def predict(self, text: str) -> dict:
        # Thread-local cache
        cache = self.get_thread_local("cache", default={})
        if text in cache:
            return cache[text]

        result = self.model.predict(text)
        cache[text] = result
        self.set_thread_local("cache", cache)
        return result

    @tool
    @thread_safe_method
    def update_config(self, config: dict):
        # Protect shared mutable state
        with self.acquire_lock():
            self.config.update(config)

Session Context and Elixir Tool Calls

@tool(description="Process using Elixir tools")
def hybrid_process(self, data: dict) -> dict:
    # Access session information
    session_id = self.session_context.session_id

    # Call a registered Elixir tool
    validation = self.session_context.call_elixir_tool(
        "validate_data",
        {"data": data, "schema": "user"}
    )

    if validation["valid"]:
        return self.process(data)
    else:
        return {"error": validation["errors"]}

Bidirectional Tool Bridge

Registering Elixir Tools

# Register an Elixir function callable from Python
Snakepit.Bridge.ToolRegistry.register_elixir_tool(
  session_id,
  "calculate_hash",
  fn params ->
    hash = :crypto.hash(:sha256, params["data"]) |> Base.encode16()
    %{hash: hash, algorithm: "sha256"}
  end,
  %{
    description: "Calculate SHA256 hash of data",
    exposed_to_python: true
  }
)

Calling from Python

# Python adapter can call registered Elixir tools
result = self.session_context.call_elixir_tool("calculate_hash", {"data": "hello"})
# => {"hash": "2CF24DBA5FB0A30E...", "algorithm": "sha256"}

# Or use the proxy
hash_tool = self.session_context.elixir_tools["calculate_hash"]
result = hash_tool(data="hello")

Telemetry & Observability

Attaching Handlers

# Worker lifecycle events
:telemetry.attach("worker-spawned", [:snakepit, :pool, :worker, :spawned], fn
  _event, %{duration: duration}, %{worker_id: id}, _config ->
    Logger.info("Worker #{id} spawned in #{duration}ms")
end, nil)

# Python execution events
:telemetry.attach("python-call", [:snakepit, :grpc_worker, :execute, :stop], fn
  _event, %{duration_ms: ms}, %{command: cmd}, _config ->
    Logger.info("#{cmd} completed in #{ms}ms")
end, nil)

Python Telemetry API

from snakepit_bridge import telemetry

# Emit custom events
telemetry.emit(
    "model.inference",
    measurements={"latency_ms": 45, "tokens": 128},
    metadata={"model": "gpt-4", "batch_size": 1}
)

# Automatic timing with spans
with telemetry.span("data_processing", {"stage": "preprocessing"}):
    processed = preprocess(data)  # Automatically timed

OpenTelemetry Integration

config :snakepit, :opentelemetry, %{enabled: true}

Process Management

Automatic Cleanup

Snakepit automatically tracks and cleans up Python processes:

  • Each BEAM run gets a unique run ID stored in process registry
  • On application restart, orphaned processes from previous runs are killed
  • Graceful shutdown sends SIGTERM, followed by SIGKILL if needed

Manual Cleanup

# Force cleanup of all worker processes
Snakepit.cleanup()

Script Mode

For Mix tasks and scripts, use run_as_script/2 for automatic lifecycle management:

Snakepit.run_as_script(fn ->
  {:ok, result} = Snakepit.execute("process_data", %{})
  IO.inspect(result)
end, halt: true)

Mix Tasks

TaskDescription
mix snakepit.setupInstall Python dependencies and generate gRPC stubs
mix snakepit.doctorVerify environment is correctly configured
mix snakepit.statusShow pool status and worker information
mix snakepit.gen.adapter NAMEGenerate adapter scaffolding

Examples

The examples/ directory contains working demonstrations:

ExampleDescription
grpc_basic.exsBasic execute, ping, echo, add operations
grpc_sessions.exsSession affinity and isolation
grpc_streaming.exsPool scaling and concurrent execution
hardware_detection.exsHardware detection for ML workloads
crash_recovery.exsCircuit breaker, retry, health monitoring
bidirectional_tools_demo.exsCross-language tool calls
telemetry_basic.exsTelemetry event handling
threaded_profile_demo.exsPython 3.13+ thread profile

Run examples:

mix run examples/grpc_basic.exs
mix run examples/hardware_detection.exs

Documentation

GuideDescription
Getting StartedInstallation and first steps
ConfigurationAll configuration options
Worker ProfilesProcess vs thread profiles
Hardware DetectionML accelerator detection
Fault ToleranceCircuit breaker, retry, health
StreaminggRPC streaming operations
Python AdaptersWriting Python adapters
ObservabilityTelemetry and logging
ProductionDeployment and troubleshooting

Requirements

  • Elixir 1.18+
  • Erlang/OTP 27+
  • Python 3.9+ (3.13+ for thread profile)
  • gRPC Python packages (grpcio, grpcio-tools)

License

MIT License - see LICENSE for details.