SnmpKit.SnmpLib (snmpkit v0.6.3)

Unified SNMP library providing RFC-compliant PDU encoding/decoding, OID manipulation, and SNMP utilities.

This library consolidates common SNMP functionality from multiple projects into a single, well-tested, and performant library suitable for both SNMP managers and simulators. Phase 2 provides complete RFC compliance including SNMPv2c exception values and proper multibyte OID encoding.

Phase 2 Core Modules

  • SnmpKit.SnmpLib.PDU - SNMP PDU encoding/decoding with full RFC compliance

    • SNMPv1 and SNMPv2c protocol support
    • SNMPv2c exception values (noSuchObject, noSuchInstance, endOfMibView)
    • High-performance encoding/decoding
    • Comprehensive error handling
  • SnmpKit.SnmpLib.ASN1 - Low-level ASN.1 BER encoding/decoding

    • RFC-compliant OID multibyte encoding (values ≥ 128)
    • Complete integer, string, null, sequence support
    • Optimized length handling for large values
    • Robust error handling and validation
  • SnmpKit.SnmpLib.OID - OID string/list conversion and manipulation utilities

    • Fast string/list conversions with validation
    • Tree operations and comparisons
    • Table index parsing and construction
    • Enterprise OID utilities
  • SnmpKit.SnmpLib.Types - SNMP data type validation and formatting

    • Complete SNMP type system support
    • SNMPv2c exception value handling
    • Human-readable formatting
    • Range checking and validation
  • SnmpKit.SnmpLib.Transport - UDP socket management for SNMP communications

    • Socket creation and management
    • Address resolution and validation
    • Performance optimizations

Key Features

  • 100% RFC Compliance: Passes comprehensive RFC test suite (30/30 tests)
  • SNMPv2c Exception Values: Proper encoding/decoding of special response values
  • Multibyte OID Support: Correct handling of OID components ≥ 128
  • High Performance: Optimized encoding/decoding with fast paths
  • Comprehensive Testing: Extensive test coverage with edge cases
  • Production Ready: Used in real SNMP management systems

Phase 3B: Advanced Features

Phase 3B adds enterprise-grade capabilities for high-scale SNMP deployments:

  • SnmpKit.SnmpLib.Pool - Connection pooling and session management

    • FIFO, round-robin, and device-affinity strategies
    • Automatic overflow handling and health monitoring
    • 60-80% reduction in socket creation overhead
    • Support for 100+ concurrent device operations
  • SnmpKit.SnmpLib.ErrorHandler - Intelligent error handling and recovery

    • Exponential backoff with jitter for retry operations
    • Circuit breaker patterns for failing device management
    • Error classification (transient, permanent, degraded)
    • Adaptive timeout calculation based on device performance
  • SnmpKit.SnmpLib.Monitor - Performance monitoring and analytics

    • Real-time operation metrics and device statistics
    • Configurable alerting system with callback support
    • Data export in JSON, CSV, and Prometheus formats
    • Health scoring and trend analysis
  • SnmpKit.SnmpLib.Manager - High-level SNMP management operations

    • Simple API for GET, GETBULK, SET operations
    • Connection reuse and performance optimizations
    • Comprehensive error handling with meaningful messages
    • Timeout management and community support

Phase 4: Real-World Integration & Optimization

Phase 4 provides production-ready integration and optimization features:

  • SnmpKit.SnmpLib.Config - Configuration management system

    • Environment-aware configuration (dev/test/prod)
    • Hot-reload capabilities and validation
    • Multi-tenant deployment support
    • Secrets management and security
  • SnmpKit.SnmpLib.Dashboard - Real-time monitoring and visualization

    • Live performance dashboards and metrics
    • Alert management and notification routing
    • Prometheus/Grafana integration
    • Historical analytics and capacity planning
  • SnmpKit.SnmpLib.Cache - Intelligent caching system

    • Multi-level caching (L1/L2/L3) with compression
    • Adaptive TTL based on data volatility
    • Smart invalidation and cache warming
    • 50-80% reduction in redundant queries

Quick Start

Basic SNMP Operations

# Simple SNMP GET operation
{:ok, {type, value}} = SnmpKit.SnmpLib.Manager.get("192.168.1.1", [1, 3, 6, 1, 2, 1, 1, 1, 0])

# SNMP GETBULK for efficient bulk retrieval
{:ok, results} = SnmpKit.SnmpLib.Manager.get_bulk("192.168.1.1", [1, 3, 6, 1, 2, 1, 2, 2],
                                           max_repetitions: 20)

# SNMP SET operation
{:ok, :success} = SnmpKit.SnmpLib.Manager.set("192.168.1.1", [1, 3, 6, 1, 2, 1, 1, 5, 0],
                                      {:string, "New System Name"})

High-Performance Connection Pooling

# Start a connection pool for network monitoring
{:ok, _pid} = SnmpKit.SnmpLib.Pool.start_pool(:network_monitor,
  strategy: :device_affinity,
  size: 20,
  max_overflow: 10
)

# Use pooled connections for improved performance
SnmpKit.SnmpLib.Pool.with_connection(:network_monitor, "192.168.1.1", fn conn ->
  SnmpKit.SnmpLib.Manager.get_multi(conn.socket, "192.168.1.1", oids, conn.opts)
end)

Intelligent Error Handling

# Retry operations with exponential backoff
result = SnmpKit.SnmpLib.ErrorHandler.with_retry(fn ->
  SnmpKit.SnmpLib.Manager.get("unreliable.device.local", [1, 3, 6, 1, 2, 1, 1, 1, 0])
end, max_attempts: 5, base_delay: 2000)

# Circuit breaker for problematic devices
{:ok, breaker} = SnmpKit.SnmpLib.ErrorHandler.start_circuit_breaker("192.168.1.1")
result = SnmpKit.SnmpLib.ErrorHandler.call_through_breaker(breaker, fn ->
  SnmpKit.SnmpLib.Manager.get_bulk("192.168.1.1", [1, 3, 6, 1, 2, 1, 2, 2])
end)

Performance Monitoring and Analytics

# Start monitoring system
{:ok, _pid} = SnmpKit.SnmpLib.Monitor.start_link()

# Record operation metrics
SnmpKit.SnmpLib.Monitor.record_operation(%{
  device: "192.168.1.1",
  operation: :get,
  duration: 245,
  result: :success
})

# Get device statistics and health scores
stats = SnmpKit.SnmpLib.Monitor.get_device_stats("192.168.1.1")
IO.puts("Device health score: " <> to_string(stats.health_score))

# Set up automated alerting
SnmpKit.SnmpLib.Monitor.set_alert_threshold("192.168.1.1", :response_time, 5000)

Configuration Management

# Load production configuration
{:ok, _pid} = SnmpKit.SnmpLib.Config.start_link(
  config_file: "/etc/snmp_lib/production.exs",
  environment: :prod
)

# Get configuration values with fallbacks
timeout = SnmpKit.SnmpLib.Config.get(:snmp, :default_timeout, 5000)
pool_size = SnmpKit.SnmpLib.Config.get(:pool, :default_size, 10)

# Hot-reload configuration
:ok = SnmpKit.SnmpLib.Config.reload()

Real-Time Dashboard and Monitoring

# Start dashboard with Prometheus integration
{:ok, _pid} = SnmpKit.SnmpLib.Dashboard.start_link(
  port: 4000,
  prometheus_enabled: true,
  retention_days: 14
)

# Record custom metrics
SnmpKit.SnmpLib.Dashboard.record_metric(:snmp_response_time, 125, %{
  device: "192.168.1.1",
  operation: "get"
})

# Create alerts for monitoring
SnmpKit.SnmpLib.Dashboard.create_alert(:device_unreachable, :critical, %{
  device: "192.168.1.1",
  consecutive_failures: 5
})

# Export metrics for external systems
prometheus_data = SnmpKit.SnmpLib.Dashboard.export_prometheus()

Intelligent Caching

# Start cache with compression and adaptive TTL
{:ok, _pid} = SnmpKit.SnmpLib.Cache.start_link(
  max_size: 50_000,
  compression_enabled: true,
  adaptive_ttl_enabled: true
)

# Cache SNMP responses with adaptive TTL
SnmpKit.SnmpLib.Cache.put_adaptive("device_1:sysDescr", description,
  base_ttl: 3_600_000,
  volatility: :low
)

# Retrieve from cache with fallback
device_desc = case SnmpKit.SnmpLib.Cache.get("device_1:sysDescr") do
  {:ok, cached_desc} -> cached_desc
  :miss ->
    {:ok, desc} = SnmpKit.SnmpLib.Manager.get("device_1", [1,3,6,1,2,1,1,1,0])
    SnmpKit.SnmpLib.Cache.put("device_1:sysDescr", desc, ttl: 3_600_000)
    desc
end

# Warm cache for predictable access patterns
SnmpKit.SnmpLib.Cache.warm_cache("device_1", :auto, strategy: :predictive)

Low-Level PDU Operations

# Encode a GET request PDU
iex> pdu = SnmpKit.SnmpLib.PDU.build_get_request([1, 3, 6, 1, 2, 1, 1, 1, 0], 12345)
iex> message = SnmpKit.SnmpLib.PDU.build_message(pdu, "public", :v2c)
iex> {:ok, encoded} = SnmpKit.SnmpLib.PDU.encode_message(message)
iex> is_binary(encoded)
true

# Build GETBULK request (SNMPv2c)
iex> bulk_pdu = SnmpKit.SnmpLib.PDU.build_get_bulk_request([1, 3, 6, 1, 2, 1, 2, 2], 456, 0, 10)
iex> bulk_pdu.type
:get_bulk_request

# OID manipulation with multibyte values
iex> {:ok, oid_list} = SnmpKit.SnmpLib.OID.string_to_list("1.3.6.1.4.1.200.1")
iex> oid_list
[1, 3, 6, 1, 4, 1, 200, 1]
iex> {:ok, oid_string} = SnmpKit.SnmpLib.OID.list_to_string([1, 3, 6, 1, 4, 1, 200, 1])
iex> oid_string
"1.3.6.1.4.1.200.1"

# Handle SNMPv2c exception values
iex> {:ok, exception_val} = SnmpKit.SnmpLib.Types.coerce_value(:no_such_object, nil)
iex> exception_val
{:no_such_object, nil}

Real-World Integration Examples

Network Monitoring System

# Monitor multiple devices with error handling
defmodule NetworkMonitor do
  def poll_devices(device_list, community \ "public") do
    device_list
    |> Task.async_stream(fn device ->
      case SnmpKit.SnmpLib.Manager.get(device, "1.3.6.1.2.1.1.3.0",
                               community: community, timeout: 5000) do
        {:ok, uptime} -> {device, :ok, uptime}
        {:error, reason} -> {device, :error, reason}
      end
    end, max_concurrency: 10, timeout: 10_000)
    |> Enum.map(fn {:ok, result} -> result end)
  end

  def get_interface_stats(device, community \ "public") do
    base_oid = [1, 3, 6, 1, 2, 1, 2, 2, 1]

    # Get interface table using GETBULK
    case SnmpKit.SnmpLib.Manager.get_bulk(device, base_oid,
                                  community: community,
                                  max_repetitions: 50) do
      {:ok, varbinds} ->
        varbinds
        |> Enum.group_by(fn {oid, _value} ->
          # Group by interface index (last component)
          List.last(oid)
        end)
        |> Enum.map(fn {if_index, binds} ->
          %{
            interface: if_index,
            stats: parse_interface_binds(binds)
          }
        end)

      {:error, reason} -> {:error, reason}
    end
  end

  defp parse_interface_binds(binds) do
    Enum.reduce(binds, %{}, fn {oid, value}, acc ->
      case oid do
        [1, 3, 6, 1, 2, 1, 2, 2, 1, 10, _] -> Map.put(acc, :in_octets, value)
        [1, 3, 6, 1, 2, 1, 2, 2, 1, 16, _] -> Map.put(acc, :out_octets, value)
        [1, 3, 6, 1, 2, 1, 2, 2, 1, 2, _] -> Map.put(acc, :description, value)
        _ -> acc
      end
    end)
  end
end

# Usage example
devices = ["192.168.1.1", "192.168.1.2", "192.168.1.3"]
results = NetworkMonitor.poll_devices(devices, "monitoring")

SNMP Agent Simulator

# Build custom SNMP responses for testing
defmodule SnmpSimulator do
  def create_system_response(request_id, community) do
    # Build response with system information
    varbinds = [
      {[1, 3, 6, 1, 2, 1, 1, 1, 0], "Linux Test Server"},
      {[1, 3, 6, 1, 2, 1, 1, 2, 0], [1, 3, 6, 1, 4, 1, 8072]},
      {[1, 3, 6, 1, 2, 1, 1, 3, 0], 123456789}
    ]

    response_pdu = SnmpKit.SnmpLib.PDU.build_response(request_id, 0, 0, varbinds)
    message = SnmpKit.SnmpLib.PDU.build_message(response_pdu, community, :v2c)

    case SnmpKit.SnmpLib.PDU.encode_message(message) do
      {:ok, encoded} -> {:ok, encoded}
      {:error, reason} -> {:error, reason}
    end
  end

  def handle_bulk_request(request_pdu, community) do
    # Simulate interface table response
    base_oid = [1, 3, 6, 1, 2, 1, 2, 2, 1]
    max_reps = request_pdu.max_repetitions

    varbinds = for i <- 1..max_reps do
      [
        {base_oid ++ [2, i], "eth" <> Integer.to_string(i)},           # ifDescr
        {base_oid ++ [10, i], :rand.uniform(1000000)}, # ifInOctets
        {base_oid ++ [16, i], :rand.uniform(1000000)}  # ifOutOctets
      ]
    end |> List.flatten()

    response_pdu = SnmpKit.SnmpLib.PDU.build_response(
      request_pdu.request_id, 0, 0, varbinds
    )

    message = SnmpKit.SnmpLib.PDU.build_message(response_pdu, community, :v2c)
    SnmpKit.SnmpLib.PDU.encode_message(message)
  end
end

High-Performance Data Collection

# Efficient bulk data collection with connection reuse
defmodule PerformanceCollector do
  def collect_interface_data(devices, opts \ []) do
    concurrency = Keyword.get(opts, :concurrency, 20)
    timeout = Keyword.get(opts, :timeout, 5000)
    community = Keyword.get(opts, :community, "public")

    start_time = System.monotonic_time(:microsecond)

    results = devices
    |> Task.async_stream(fn device ->
      collect_device_interfaces(device, community, timeout)
    end, max_concurrency: concurrency, timeout: timeout + 1000)
    |> Enum.map(fn
      {:ok, result} -> result
      {:exit, reason} -> {:error, {:timeout, reason}}
    end)

    end_time = System.monotonic_time(:microsecond)
    duration_ms = (end_time - start_time) / 1000

    %{
      results: results,
      performance: %{
        total_devices: length(devices),
        duration_ms: duration_ms,
        devices_per_second: length(devices) / (duration_ms / 1000),
        success_rate: calculate_success_rate(results)
      }
    }
  end

  defp collect_device_interfaces(device, community, timeout) do
    # Use GETBULK for efficient table walking
    case SnmpKit.SnmpLib.Manager.get_bulk(
      device,
      [1, 3, 6, 1, 2, 1, 2, 2, 1, 2], # ifDescr table
      community: community,
      timeout: timeout,
      max_repetitions: 100
    ) do
      {:ok, varbinds} ->
        {:ok, %{device: device, interface_count: length(varbinds), data: varbinds}}
      {:error, reason} ->
        {:error, %{device: device, reason: reason}}
    end
  end

  defp calculate_success_rate(results) do
    total = length(results)
    successes = Enum.count(results, fn
      {:ok, _} -> true
      _ -> false
    end)

    if total > 0, do: (successes / total) * 100, else: 0
  end
end

Performance Benchmarking Examples

Encoding/Decoding Performance

# Benchmark PDU encoding performance
defmodule SnmpBenchmark do
  def benchmark_encoding(iterations \ 10_000) do
    # Prepare test data
    pdu = SnmpKit.SnmpLib.PDU.build_get_request([1, 3, 6, 1, 2, 1, 1, 1, 0], 12345)
    message = SnmpKit.SnmpLib.PDU.build_message(pdu, "public", :v2c)

    # Benchmark encoding
    {encode_time, _} = :timer.tc(fn ->
      for _ <- 1..iterations do
        {:ok, _encoded} = SnmpKit.SnmpLib.PDU.encode_message(message)
      end
    end)

    # Encode once for decoding benchmark
    {:ok, encoded} = SnmpKit.SnmpLib.PDU.encode_message(message)

    # Benchmark decoding
    {decode_time, _} = :timer.tc(fn ->
      for _ <- 1..iterations do
        {:ok, _decoded} = SnmpKit.SnmpLib.PDU.decode_message(encoded)
      end
    end)

    %{
      iterations: iterations,
      encode_time_ms: encode_time / 1000,
      decode_time_ms: decode_time / 1000,
      encode_ops_per_sec: iterations / (encode_time / 1_000_000),
      decode_ops_per_sec: iterations / (decode_time / 1_000_000),
      encode_time_per_op_us: encode_time / iterations,
      decode_time_per_op_us: decode_time / iterations
    }
  end

  def benchmark_bulk_operations(device_count \ 100) do
    devices = for i <- 1..device_count, do: "192.168.1." <> Integer.to_string(i)

    # Benchmark sequential operations
    {seq_time, seq_results} = :timer.tc(fn ->
      Enum.map(devices, fn device ->
        SnmpKit.SnmpLib.Manager.get(device, [1, 3, 6, 1, 2, 1, 1, 3, 0], timeout: 100)
      end)
    end)

    # Benchmark concurrent operations
    {conc_time, conc_results} = :timer.tc(fn ->
      devices
      |> Task.async_stream(fn device ->
        SnmpKit.SnmpLib.Manager.get(device, [1, 3, 6, 1, 2, 1, 1, 3, 0], timeout: 100)
      end, max_concurrency: 50, timeout: 1000)
      |> Enum.map(fn {:ok, result} -> result end)
    end)

    %{
      device_count: device_count,
      sequential: %{
        time_ms: seq_time / 1000,
        ops_per_sec: device_count / (seq_time / 1_000_000),
        success_count: count_successes(seq_results)
      },
      concurrent: %{
        time_ms: conc_time / 1000,
        ops_per_sec: device_count / (conc_time / 1_000_000),
        success_count: count_successes(conc_results),
        speedup: seq_time / conc_time
      }
    }
  end

  def benchmark_oid_operations(iterations \ 100_000) do
    test_oids = [
      "1.3.6.1.2.1.1.1.0",
      "1.3.6.1.4.1.8072.1.3.2.3.1.2.8.110.101.116.45.115.110.109.112",
      "1.3.6.1.2.1.2.2.1.10.1000"
    ]

    results = for oid_string <- test_oids do
      # Benchmark string to list conversion
      {str_to_list_time, _} = :timer.tc(fn ->
        for _ <- 1..iterations do
          {:ok, _list} = SnmpKit.SnmpLib.OID.string_to_list(oid_string)
        end
      end)

      # Convert once for reverse benchmark
      {:ok, oid_list} = SnmpKit.SnmpLib.OID.string_to_list(oid_string)

      # Benchmark list to string conversion
      {list_to_str_time, _} = :timer.tc(fn ->
        for _ <- 1..iterations do
          {:ok, _string} = SnmpKit.SnmpLib.OID.list_to_string(oid_list)
        end
      end)

      %{
        oid: oid_string,
        oid_length: length(oid_list),
        str_to_list_us_per_op: str_to_list_time / iterations,
        list_to_str_us_per_op: list_to_str_time / iterations,
        str_to_list_ops_per_sec: iterations / (str_to_list_time / 1_000_000),
        list_to_str_ops_per_sec: iterations / (list_to_str_time / 1_000_000)
      }
    end

    %{
      iterations: iterations,
      oid_benchmarks: results,
      average_str_to_list_us: Enum.reduce(results, 0, &(&1.str_to_list_us_per_op + &2)) / length(results),
      average_list_to_str_us: Enum.reduce(results, 0, &(&1.list_to_str_us_per_op + &2)) / length(results)
    }
  end

  defp count_successes(results) do
    Enum.count(results, fn
      {:ok, _} -> true
      _ -> false
    end)
  end
end

# Example usage:
# encoding_perf = SnmpBenchmark.benchmark_encoding(50_000)
# IO.puts("Encoding: " <> Integer.to_string(trunc(encoding_perf.encode_ops_per_sec)) <> " ops/sec")
# IO.puts("Decoding: " <> Integer.to_string(trunc(encoding_perf.decode_ops_per_sec)) <> " ops/sec")

# bulk_perf = SnmpBenchmark.benchmark_bulk_operations(200)
# IO.puts("Sequential: " <> Float.to_string(bulk_perf.sequential.time_ms) <> "ms")
# IO.puts("Concurrent: " <> Float.to_string(bulk_perf.concurrent.time_ms) <> "ms (" <> Float.to_string(bulk_perf.concurrent.speedup) <> "x faster)")

# oid_perf = SnmpBenchmark.benchmark_oid_operations(100_000)
# IO.puts("Average OID conversion: " <> Float.to_string(oid_perf.average_str_to_list_us) <> "μs per operation")

RFC Compliance

This library achieves 100% compliance with:

  • RFC 1157 (SNMPv1)
  • RFC 1905 (SNMPv2c Protocol Operations)
  • RFC 3416 (SNMPv2c Enhanced Operations)
  • ITU-T X.690 (ASN.1 BER Encoding Rules)

Summary

Functions

Returns comprehensive information about the SnmpLib library capabilities.

Returns the version of the SnmpLib library.

Functions

info()

@spec info() :: map()

Returns comprehensive information about the SnmpLib library capabilities.

Useful for debugging, configuration validation, and feature discovery.

Returns

A map containing:

  • :version: Library version
  • :features: Available features and capabilities
  • :modules: Core modules and their descriptions
  • :compliance: RFC compliance information

Examples

info = SnmpLib.info()
IO.puts("SNMP Library v" <> info.version)
IO.puts("Features: " <> Enum.join(info.features, ", "))

version()

Returns the version of the SnmpLib library.

Examples

iex> is_binary(SnmpLib.version())
true

iex> SnmpLib.version() |> String.contains?(".")
true