Snakepit.Config (Snakepit v0.13.0)

Copy Markdown View Source

Configuration management for Snakepit pools.

Handles validation and normalization of pool configurations, supporting both legacy single-pool and new multi-pool configurations.

Backward Compatibility

Existing v0.5.x configurations continue to work:

# Legacy config (v0.5.x) - still works!
config :snakepit,
  pooling_enabled: true,
  adapter_module: Snakepit.Adapters.GRPCPython,
  pool_size: 100

New Multi-Pool Configuration (v0.6.0+)

config :snakepit,
  pools: [
    %{
      name: :default,
      worker_profile: :process,
      pool_size: 100,
      adapter_module: Snakepit.Adapters.GRPCPython
    },
    %{
      name: :hpc,
      worker_profile: :thread,
      pool_size: 4,
      threads_per_worker: 16
    }
  ]

gRPC Listener Configuration

Snakepit exposes a gRPC server for Python workers. Configure it with :grpc_listener (preferred) or legacy :grpc_port / :grpc_host.

config :snakepit,
  grpc_listener: [mode: :internal]

config :snakepit,
  grpc_listener: [mode: :external, host: "localhost", port: 50_051]

config :snakepit,
  grpc_listener: [mode: :external_pool, host: "localhost", base_port: 50_051, pool_size: 8]

Listener tuning knobs (all optional):

  • :grpc_listener_ready_timeout_ms
  • :grpc_listener_port_check_interval_ms
  • :grpc_listener_reuse_attempts
  • :grpc_listener_reuse_wait_timeout_ms
  • :grpc_listener_reuse_retry_delay_ms

Instance Isolation

Snakepit can scope runtime resources to avoid cross-instance collisions. Use :instance_name (or SNAKEPIT_INSTANCE_NAME) for a human-readable name, and :instance_token (or SNAKEPIT_INSTANCE_TOKEN) for strong isolation across concurrent runs. Runtime state is persisted under :data_dir.

config :snakepit,
  instance_name: "my_app",
  instance_token: "run_a",
  data_dir: "/var/lib/snakepit"

Configuration Schema

Per-pool configuration options:

Required

  • name - Pool identifier (atom)
  • adapter_module - Adapter module

Profile Selection

  • worker_profile - :process or :thread (default: :process)

Common Options

  • pool_size - Number of workers
  • adapter_args - CLI arguments for adapter
  • adapter_env - Environment variables
  • capacity_strategy - :pool, :profile, or :hybrid (default: :pool)
  • affinity - :hint, :strict_queue, or :strict_fail_fast (default: :hint)

Process Profile Specific

  • startup_batch_size - Workers per batch (default: 8)
  • startup_batch_delay_ms - Delay between batches (default: 750)

Thread Profile Specific

  • threads_per_worker - Thread pool size per worker
  • thread_safety_checks - Enable runtime checks

Lifecycle Management

  • worker_ttl - Time-to-live (:infinity or {value, :seconds/:minutes/:hours})
  • worker_max_requests - Max requests before recycling (:infinity or integer)

Heartbeat options are mirrored in snakepit_bridge.heartbeat.HeartbeatConfig, so any new keys added here must be added to the Python struct and documented in the heartbeat guides to keep both sides in sync.

Normalized Shape

Snakepit.Config.normalize_pool_config/1 converts user input into a canonical map that downstream components rely on. The resulting structure (documented under normalized_pool_config/0) always includes heartbeat defaults, adapter metadata, and profile-specific knobs so pool, worker supervisor, and diagnostics modules never have to pattern-match on partial user input.

Summary

Functions

Resolve adapter args from normalized pool config or legacy single-pool config.

Resolve adapter module with consistent precedence: explicit override, pool config, legacy pool_config, global adapter, then optional default.

Resolve capacity strategy with consistent precedence: pool config, legacy pool_config, global strategy, then defaults.

Resolve the data directory used for runtime persistence.

Get configuration for a specific named pool.

Get and validate pool configurations from application environment.

Get the profile module for a pool configuration.

Load and validate the gRPC listener configuration.

Load the gRPC listener configuration or raise on error.

Interval (ms) between port readiness checks when reusing an existing gRPC listener.

Timeout for waiting on the gRPC listener to publish its assigned port.

Number of attempts to reuse or rebind a gRPC listener before failing.

Delay (ms) between gRPC listener reuse retries.

Max wait (ms) for an already-started gRPC listener to publish its port before retrying.

Timeout for sending telemetry control messages to Python workers.

Timeout for opening telemetry streams to Python workers.

Timeout (ms) for GRPCWorker periodic health-check RPC calls.

Returns the normalized default heartbeat configuration, merged with application env overrides.

Resolve the instance name used for runtime isolation.

Resolve a CLI-safe instance name identifier for process scoping.

Resolve the instance token used for runtime isolation.

Resolve a CLI-safe instance token identifier for process scoping.

Maximum concurrent worker checks during lifecycle scan.

Timeout (ms) per worker action during lifecycle checks.

Returns true when the application is configured with explicit multi-pool mode.

Normalize a pool configuration by filling in defaults.

Resolve max workers for a pool, preserving legacy :pool_config fallback.

Resolve startup batch delay (ms) for process-profile worker initialization.

Resolve startup batch size for process-profile worker initialization.

Interval (ms) for batching DETS flushes in Snakepit.Pool.ProcessRegistry.

Normalized rogue cleanup configuration for process registry startup cleanup.

Check if a pool configuration is using the thread profile.

Validate a single pool configuration.

Worker restart cleanup maximum retry attempts.

Worker restart cleanup polling interval in milliseconds.

Types

grpc_listener_config()

@type grpc_listener_config() :: %{
  mode: grpc_listener_mode(),
  host: String.t(),
  bind_host: String.t(),
  port: non_neg_integer() | nil,
  base_port: pos_integer() | nil,
  pool_size: pos_integer() | nil
}

grpc_listener_mode()

@type grpc_listener_mode() :: :internal | :external | :external_pool

normalized_pool_config()

@type normalized_pool_config() :: map()

pool_config()

@type pool_config() :: map()

Normalized pool configuration returned by normalize_pool_config/1.

%{
  name: atom(),
  worker_profile: :process | :thread,
  pool_size: pos_integer(),
  adapter_module: module(),
  adapter_args: list(),
  adapter_env: list(),
  capacity_strategy: :pool | :profile | :hybrid,
  affinity: :hint | :strict_queue | :strict_fail_fast,
  pool_identifier: atom() | nil,
  worker_ttl: :infinity | {integer(), :seconds | :minutes | :hours},
  worker_max_requests: :infinity | pos_integer(),
  heartbeat: map(),
  # Profile-specific fields:
  startup_batch_size: pos_integer(),
  startup_batch_delay_ms: non_neg_integer(),
  threads_per_worker: pos_integer(),
  thread_safety_checks: boolean()
}

validation_result()

@type validation_result() :: {:ok, [pool_config()]} | {:error, term()}

Functions

adapter_args(pool_config \\ %{})

@spec adapter_args(map()) :: list()

Resolve adapter args from normalized pool config or legacy single-pool config.

adapter_module(pool_config \\ %{}, opts \\ [])

@spec adapter_module(
  map(),
  keyword()
) :: module() | nil

Resolve adapter module with consistent precedence: explicit override, pool config, legacy pool_config, global adapter, then optional default.

capacity_strategy(pool_config \\ %{})

@spec capacity_strategy(map()) :: :pool | :profile | :hybrid

Resolve capacity strategy with consistent precedence: pool config, legacy pool_config, global strategy, then defaults.

data_dir()

@spec data_dir() :: String.t()

Resolve the data directory used for runtime persistence.

get_pool_config(pool_name)

@spec get_pool_config(atom()) :: {:ok, pool_config()} | {:error, term()}

Get configuration for a specific named pool.

Returns {:ok, config} or {:error, reason}.

The error can be :pool_not_found if the pool doesn't exist, or any error from get_pool_configs/0 if there's a configuration issue.

Examples

iex> Snakepit.Config.get_pool_config(:default)
{:ok, %{name: :default, worker_profile: :process, ...}}

get_pool_configs()

@spec get_pool_configs() :: validation_result()

Get and validate pool configurations from application environment.

Supports both legacy single-pool and new multi-pool configurations.

Returns {:ok, [pool_configs]} or {:error, reason}.

Examples

# With legacy config
{:ok, [%{name: :default, worker_profile: :process, ...}]}

# With multi-pool config
{:ok, [%{name: :default, ...}, %{name: :hpc, ...}]}

get_profile_module(config)

@spec get_profile_module(pool_config()) :: module()

Get the profile module for a pool configuration.

Returns the module that implements the WorkerProfile behaviour.

Examples

iex> Snakepit.Config.get_profile_module(%{worker_profile: :process})
Snakepit.WorkerProfile.Process

iex> Snakepit.Config.get_profile_module(%{worker_profile: :thread})
Snakepit.WorkerProfile.Thread

grpc_listener_config()

@spec grpc_listener_config() :: {:ok, grpc_listener_config()} | {:error, term()}

Load and validate the gRPC listener configuration.

Returns {:ok, config} or {:error, reason}.

grpc_listener_config!()

@spec grpc_listener_config!() :: grpc_listener_config()

Load the gRPC listener configuration or raise on error.

grpc_listener_port_check_interval_ms()

@spec grpc_listener_port_check_interval_ms() :: pos_integer()

Interval (ms) between port readiness checks when reusing an existing gRPC listener.

grpc_listener_ready_timeout_ms()

@spec grpc_listener_ready_timeout_ms() :: pos_integer()

Timeout for waiting on the gRPC listener to publish its assigned port.

grpc_listener_reuse_attempts()

@spec grpc_listener_reuse_attempts() :: pos_integer()

Number of attempts to reuse or rebind a gRPC listener before failing.

grpc_listener_reuse_retry_delay_ms()

@spec grpc_listener_reuse_retry_delay_ms() :: pos_integer()

Delay (ms) between gRPC listener reuse retries.

grpc_listener_reuse_wait_timeout_ms()

@spec grpc_listener_reuse_wait_timeout_ms() :: pos_integer()

Max wait (ms) for an already-started gRPC listener to publish its port before retrying.

grpc_stream_control_timeout_ms()

@spec grpc_stream_control_timeout_ms() :: pos_integer()

Timeout for sending telemetry control messages to Python workers.

grpc_stream_open_timeout_ms()

@spec grpc_stream_open_timeout_ms() :: pos_integer()

Timeout for opening telemetry streams to Python workers.

grpc_worker_health_check_timeout_ms()

@spec grpc_worker_health_check_timeout_ms() :: pos_integer()

Timeout (ms) for GRPCWorker periodic health-check RPC calls.

heartbeat_defaults()

@spec heartbeat_defaults() :: map()

Returns the normalized default heartbeat configuration, merged with application env overrides.

This shape is shared with snakepit_bridge.heartbeat.HeartbeatConfig. When adding new keys, update both modules to keep the cross-language schema aligned.

instance_name()

@spec instance_name() :: String.t() | nil

Resolve the instance name used for runtime isolation.

instance_name_identifier()

@spec instance_name_identifier() :: String.t() | nil

Resolve a CLI-safe instance name identifier for process scoping.

instance_token()

@spec instance_token() :: String.t() | nil

Resolve the instance token used for runtime isolation.

instance_token_identifier()

@spec instance_token_identifier() :: String.t() | nil

Resolve a CLI-safe instance token identifier for process scoping.

lifecycle_check_max_concurrency()

@spec lifecycle_check_max_concurrency() :: pos_integer()

Maximum concurrent worker checks during lifecycle scan.

lifecycle_worker_action_timeout_ms()

@spec lifecycle_worker_action_timeout_ms() :: pos_integer()

Timeout (ms) per worker action during lifecycle checks.

multi_pool_mode?()

@spec multi_pool_mode?() :: boolean()

Returns true when the application is configured with explicit multi-pool mode.

normalize_pool_config(config)

@spec normalize_pool_config(map()) :: pool_config()

Normalize a pool configuration by filling in defaults.

Examples

iex> Snakepit.Config.normalize_pool_config(%{name: :test})
%{
  name: :test,
  worker_profile: :process,
  pool_size: 16,
  # ... other defaults
}

pool_max_workers(pool_config \\ %{})

@spec pool_max_workers(map()) :: pos_integer()

Resolve max workers for a pool, preserving legacy :pool_config fallback.

pool_startup_batch_delay_ms(pool_config \\ %{})

@spec pool_startup_batch_delay_ms(map()) :: non_neg_integer()

Resolve startup batch delay (ms) for process-profile worker initialization.

pool_startup_batch_size(pool_config \\ %{})

@spec pool_startup_batch_size(map()) :: pos_integer()

Resolve startup batch size for process-profile worker initialization.

process_registry_dets_flush_interval_ms()

@spec process_registry_dets_flush_interval_ms() :: pos_integer()

Interval (ms) for batching DETS flushes in Snakepit.Pool.ProcessRegistry.

rogue_cleanup_config()

@spec rogue_cleanup_config() :: map()

Normalized rogue cleanup configuration for process registry startup cleanup.

thread_profile?(config)

@spec thread_profile?(pool_config()) :: boolean()

Check if a pool configuration is using the thread profile.

Examples

iex> Snakepit.Config.thread_profile?(%{worker_profile: :thread})
true

iex> Snakepit.Config.thread_profile?(%{worker_profile: :process})
false

validate_pool_config(config)

@spec validate_pool_config(map()) :: {:ok, pool_config()} | {:error, term()}

Validate a single pool configuration.

Returns {:ok, normalized_config} or {:error, reason}.

worker_supervisor_cleanup_max_retries()

@spec worker_supervisor_cleanup_max_retries() :: pos_integer()

Worker restart cleanup maximum retry attempts.

worker_supervisor_cleanup_retry_interval_ms()

@spec worker_supervisor_cleanup_retry_interval_ms() :: pos_integer()

Worker restart cleanup polling interval in milliseconds.