SnmpKit.SnmpLib (snmpkit v0.6.6)
Unified SNMP library providing RFC-compliant PDU encoding/decoding, OID manipulation, and SNMP utilities.
This library consolidates common SNMP functionality from multiple projects into a single, well-tested, and performant library suitable for both SNMP managers and simulators. Phase 2 provides complete RFC compliance including SNMPv2c exception values and proper multibyte OID encoding.
Phase 2 Core Modules
SnmpKit.SnmpLib.PDU
- SNMP PDU encoding/decoding with full RFC compliance- SNMPv1 and SNMPv2c protocol support
- SNMPv2c exception values (noSuchObject, noSuchInstance, endOfMibView)
- High-performance encoding/decoding
- Comprehensive error handling
SnmpKit.SnmpLib.ASN1
- Low-level ASN.1 BER encoding/decoding- RFC-compliant OID multibyte encoding (values ≥ 128)
- Complete integer, string, null, sequence support
- Optimized length handling for large values
- Robust error handling and validation
SnmpKit.SnmpLib.OID
- OID string/list conversion and manipulation utilities- Fast string/list conversions with validation
- Tree operations and comparisons
- Table index parsing and construction
- Enterprise OID utilities
SnmpKit.SnmpLib.Types
- SNMP data type validation and formatting- Complete SNMP type system support
- SNMPv2c exception value handling
- Human-readable formatting
- Range checking and validation
SnmpKit.SnmpLib.Transport
- UDP socket management for SNMP communications- Socket creation and management
- Address resolution and validation
- Performance optimizations
Key Features
- 100% RFC Compliance: Passes comprehensive RFC test suite (30/30 tests)
- SNMPv2c Exception Values: Proper encoding/decoding of special response values
- Multibyte OID Support: Correct handling of OID components ≥ 128
- High Performance: Optimized encoding/decoding with fast paths
- Comprehensive Testing: Extensive test coverage with edge cases
- Production Ready: Used in real SNMP management systems
Phase 3B: Advanced Features
Phase 3B adds enterprise-grade capabilities for high-scale SNMP deployments:
SnmpKit.SnmpLib.Pool
- Connection pooling and session management- FIFO, round-robin, and device-affinity strategies
- Automatic overflow handling and health monitoring
- 60-80% reduction in socket creation overhead
- Support for 100+ concurrent device operations
SnmpKit.SnmpLib.ErrorHandler
- Intelligent error handling and recovery- Exponential backoff with jitter for retry operations
- Circuit breaker patterns for failing device management
- Error classification (transient, permanent, degraded)
- Adaptive timeout calculation based on device performance
SnmpKit.SnmpLib.Monitor
- Performance monitoring and analytics- Real-time operation metrics and device statistics
- Configurable alerting system with callback support
- Data export in JSON, CSV, and Prometheus formats
- Health scoring and trend analysis
SnmpKit.SnmpLib.Manager
- High-level SNMP management operations- Simple API for GET, GETBULK, SET operations
- Connection reuse and performance optimizations
- Comprehensive error handling with meaningful messages
- Timeout management and community support
Phase 4: Real-World Integration & Optimization
Phase 4 provides production-ready integration and optimization features:
SnmpKit.SnmpLib.Config
- Configuration management system- Environment-aware configuration (dev/test/prod)
- Hot-reload capabilities and validation
- Multi-tenant deployment support
- Secrets management and security
SnmpKit.SnmpLib.Dashboard
- Real-time monitoring and visualization- Live performance dashboards and metrics
- Alert management and notification routing
- Prometheus/Grafana integration
- Historical analytics and capacity planning
SnmpKit.SnmpLib.Cache
- Intelligent caching system- Multi-level caching (L1/L2/L3) with compression
- Adaptive TTL based on data volatility
- Smart invalidation and cache warming
- 50-80% reduction in redundant queries
Quick Start
Basic SNMP Operations
# Simple SNMP GET operation
{:ok, {type, value}} = SnmpKit.SnmpLib.Manager.get("192.168.1.1", [1, 3, 6, 1, 2, 1, 1, 1, 0])
# SNMP GETBULK for efficient bulk retrieval
{:ok, results} = SnmpKit.SnmpLib.Manager.get_bulk("192.168.1.1", [1, 3, 6, 1, 2, 1, 2, 2],
max_repetitions: 20)
# SNMP SET operation
{:ok, :success} = SnmpKit.SnmpLib.Manager.set("192.168.1.1", [1, 3, 6, 1, 2, 1, 1, 5, 0],
{:string, "New System Name"})
High-Performance Connection Pooling
# Start a connection pool for network monitoring
{:ok, _pid} = SnmpKit.SnmpLib.Pool.start_pool(:network_monitor,
strategy: :device_affinity,
size: 20,
max_overflow: 10
)
# Use pooled connections for improved performance
SnmpKit.SnmpLib.Pool.with_connection(:network_monitor, "192.168.1.1", fn conn ->
SnmpKit.SnmpLib.Manager.get_multi(conn.socket, "192.168.1.1", oids, conn.opts)
end)
Intelligent Error Handling
# Retry operations with exponential backoff
result = SnmpKit.SnmpLib.ErrorHandler.with_retry(fn ->
SnmpKit.SnmpLib.Manager.get("unreliable.device.local", [1, 3, 6, 1, 2, 1, 1, 1, 0])
end, max_attempts: 5, base_delay: 2000)
# Circuit breaker for problematic devices
{:ok, breaker} = SnmpKit.SnmpLib.ErrorHandler.start_circuit_breaker("192.168.1.1")
result = SnmpKit.SnmpLib.ErrorHandler.call_through_breaker(breaker, fn ->
SnmpKit.SnmpLib.Manager.get_bulk("192.168.1.1", [1, 3, 6, 1, 2, 1, 2, 2])
end)
Performance Monitoring and Analytics
# Start monitoring system
{:ok, _pid} = SnmpKit.SnmpLib.Monitor.start_link()
# Record operation metrics
SnmpKit.SnmpLib.Monitor.record_operation(%{
device: "192.168.1.1",
operation: :get,
duration: 245,
result: :success
})
# Get device statistics and health scores
stats = SnmpKit.SnmpLib.Monitor.get_device_stats("192.168.1.1")
IO.puts("Device health score: " <> to_string(stats.health_score))
# Set up automated alerting
SnmpKit.SnmpLib.Monitor.set_alert_threshold("192.168.1.1", :response_time, 5000)
Configuration Management
# Load production configuration
{:ok, _pid} = SnmpKit.SnmpLib.Config.start_link(
config_file: "/etc/snmp_lib/production.exs",
environment: :prod
)
# Get configuration values with fallbacks
timeout = SnmpKit.SnmpLib.Config.get(:snmp, :default_timeout, 5000)
pool_size = SnmpKit.SnmpLib.Config.get(:pool, :default_size, 10)
# Hot-reload configuration
:ok = SnmpKit.SnmpLib.Config.reload()
Real-Time Dashboard and Monitoring
# Start dashboard with Prometheus integration
{:ok, _pid} = SnmpKit.SnmpLib.Dashboard.start_link(
port: 4000,
prometheus_enabled: true,
retention_days: 14
)
# Record custom metrics
SnmpKit.SnmpLib.Dashboard.record_metric(:snmp_response_time, 125, %{
device: "192.168.1.1",
operation: "get"
})
# Create alerts for monitoring
SnmpKit.SnmpLib.Dashboard.create_alert(:device_unreachable, :critical, %{
device: "192.168.1.1",
consecutive_failures: 5
})
# Export metrics for external systems
prometheus_data = SnmpKit.SnmpLib.Dashboard.export_prometheus()
Intelligent Caching
# Start cache with compression and adaptive TTL
{:ok, _pid} = SnmpKit.SnmpLib.Cache.start_link(
max_size: 50_000,
compression_enabled: true,
adaptive_ttl_enabled: true
)
# Cache SNMP responses with adaptive TTL
SnmpKit.SnmpLib.Cache.put_adaptive("device_1:sysDescr", description,
base_ttl: 3_600_000,
volatility: :low
)
# Retrieve from cache with fallback
device_desc = case SnmpKit.SnmpLib.Cache.get("device_1:sysDescr") do
{:ok, cached_desc} -> cached_desc
:miss ->
{:ok, desc} = SnmpKit.SnmpLib.Manager.get("device_1", [1,3,6,1,2,1,1,1,0])
SnmpKit.SnmpLib.Cache.put("device_1:sysDescr", desc, ttl: 3_600_000)
desc
end
# Warm cache for predictable access patterns
SnmpKit.SnmpLib.Cache.warm_cache("device_1", :auto, strategy: :predictive)
Low-Level PDU Operations
# Encode a GET request PDU
iex> pdu = SnmpKit.SnmpLib.PDU.build_get_request([1, 3, 6, 1, 2, 1, 1, 1, 0], 12345)
iex> message = SnmpKit.SnmpLib.PDU.build_message(pdu, "public", :v2c)
iex> {:ok, encoded} = SnmpKit.SnmpLib.PDU.encode_message(message)
iex> is_binary(encoded)
true
# Build GETBULK request (SNMPv2c)
iex> bulk_pdu = SnmpKit.SnmpLib.PDU.build_get_bulk_request([1, 3, 6, 1, 2, 1, 2, 2], 456, 0, 10)
iex> bulk_pdu.type
:get_bulk_request
# OID manipulation with multibyte values
iex> {:ok, oid_list} = SnmpKit.SnmpLib.OID.string_to_list("1.3.6.1.4.1.200.1")
iex> oid_list
[1, 3, 6, 1, 4, 1, 200, 1]
iex> {:ok, oid_string} = SnmpKit.SnmpLib.OID.list_to_string([1, 3, 6, 1, 4, 1, 200, 1])
iex> oid_string
"1.3.6.1.4.1.200.1"
# Handle SNMPv2c exception values
iex> {:ok, exception_val} = SnmpKit.SnmpLib.Types.coerce_value(:no_such_object, nil)
iex> exception_val
{:no_such_object, nil}
Real-World Integration Examples
Network Monitoring System
# Monitor multiple devices with error handling
defmodule NetworkMonitor do
def poll_devices(device_list, community \ "public") do
device_list
|> Task.async_stream(fn device ->
case SnmpKit.SnmpLib.Manager.get(device, "1.3.6.1.2.1.1.3.0",
community: community, timeout: 5000) do
{:ok, uptime} -> {device, :ok, uptime}
{:error, reason} -> {device, :error, reason}
end
end, max_concurrency: 10, timeout: 10_000)
|> Enum.map(fn {:ok, result} -> result end)
end
def get_interface_stats(device, community \ "public") do
base_oid = [1, 3, 6, 1, 2, 1, 2, 2, 1]
# Get interface table using GETBULK
case SnmpKit.SnmpLib.Manager.get_bulk(device, base_oid,
community: community,
max_repetitions: 50) do
{:ok, varbinds} ->
varbinds
|> Enum.group_by(fn {oid, _value} ->
# Group by interface index (last component)
List.last(oid)
end)
|> Enum.map(fn {if_index, binds} ->
%{
interface: if_index,
stats: parse_interface_binds(binds)
}
end)
{:error, reason} -> {:error, reason}
end
end
defp parse_interface_binds(binds) do
Enum.reduce(binds, %{}, fn {oid, value}, acc ->
case oid do
[1, 3, 6, 1, 2, 1, 2, 2, 1, 10, _] -> Map.put(acc, :in_octets, value)
[1, 3, 6, 1, 2, 1, 2, 2, 1, 16, _] -> Map.put(acc, :out_octets, value)
[1, 3, 6, 1, 2, 1, 2, 2, 1, 2, _] -> Map.put(acc, :description, value)
_ -> acc
end
end)
end
end
# Usage example
devices = ["192.168.1.1", "192.168.1.2", "192.168.1.3"]
results = NetworkMonitor.poll_devices(devices, "monitoring")
SNMP Agent Simulator
# Build custom SNMP responses for testing
defmodule SnmpSimulator do
def create_system_response(request_id, community) do
# Build response with system information
varbinds = [
{[1, 3, 6, 1, 2, 1, 1, 1, 0], "Linux Test Server"},
{[1, 3, 6, 1, 2, 1, 1, 2, 0], [1, 3, 6, 1, 4, 1, 8072]},
{[1, 3, 6, 1, 2, 1, 1, 3, 0], 123456789}
]
response_pdu = SnmpKit.SnmpLib.PDU.build_response(request_id, 0, 0, varbinds)
message = SnmpKit.SnmpLib.PDU.build_message(response_pdu, community, :v2c)
case SnmpKit.SnmpLib.PDU.encode_message(message) do
{:ok, encoded} -> {:ok, encoded}
{:error, reason} -> {:error, reason}
end
end
def handle_bulk_request(request_pdu, community) do
# Simulate interface table response
base_oid = [1, 3, 6, 1, 2, 1, 2, 2, 1]
max_reps = request_pdu.max_repetitions
varbinds = for i <- 1..max_reps do
[
{base_oid ++ [2, i], "eth" <> Integer.to_string(i)}, # ifDescr
{base_oid ++ [10, i], :rand.uniform(1000000)}, # ifInOctets
{base_oid ++ [16, i], :rand.uniform(1000000)} # ifOutOctets
]
end |> List.flatten()
response_pdu = SnmpKit.SnmpLib.PDU.build_response(
request_pdu.request_id, 0, 0, varbinds
)
message = SnmpKit.SnmpLib.PDU.build_message(response_pdu, community, :v2c)
SnmpKit.SnmpLib.PDU.encode_message(message)
end
end
High-Performance Data Collection
# Efficient bulk data collection with connection reuse
defmodule PerformanceCollector do
def collect_interface_data(devices, opts \ []) do
concurrency = Keyword.get(opts, :concurrency, 20)
timeout = Keyword.get(opts, :timeout, 5000)
community = Keyword.get(opts, :community, "public")
start_time = System.monotonic_time(:microsecond)
results = devices
|> Task.async_stream(fn device ->
collect_device_interfaces(device, community, timeout)
end, max_concurrency: concurrency, timeout: timeout + 1000)
|> Enum.map(fn
{:ok, result} -> result
{:exit, reason} -> {:error, {:timeout, reason}}
end)
end_time = System.monotonic_time(:microsecond)
duration_ms = (end_time - start_time) / 1000
%{
results: results,
performance: %{
total_devices: length(devices),
duration_ms: duration_ms,
devices_per_second: length(devices) / (duration_ms / 1000),
success_rate: calculate_success_rate(results)
}
}
end
defp collect_device_interfaces(device, community, timeout) do
# Use GETBULK for efficient table walking
case SnmpKit.SnmpLib.Manager.get_bulk(
device,
[1, 3, 6, 1, 2, 1, 2, 2, 1, 2], # ifDescr table
community: community,
timeout: timeout,
max_repetitions: 100
) do
{:ok, varbinds} ->
{:ok, %{device: device, interface_count: length(varbinds), data: varbinds}}
{:error, reason} ->
{:error, %{device: device, reason: reason}}
end
end
defp calculate_success_rate(results) do
total = length(results)
successes = Enum.count(results, fn
{:ok, _} -> true
_ -> false
end)
if total > 0, do: (successes / total) * 100, else: 0
end
end
Performance Benchmarking Examples
Encoding/Decoding Performance
# Benchmark PDU encoding performance
defmodule SnmpBenchmark do
def benchmark_encoding(iterations \ 10_000) do
# Prepare test data
pdu = SnmpKit.SnmpLib.PDU.build_get_request([1, 3, 6, 1, 2, 1, 1, 1, 0], 12345)
message = SnmpKit.SnmpLib.PDU.build_message(pdu, "public", :v2c)
# Benchmark encoding
{encode_time, _} = :timer.tc(fn ->
for _ <- 1..iterations do
{:ok, _encoded} = SnmpKit.SnmpLib.PDU.encode_message(message)
end
end)
# Encode once for decoding benchmark
{:ok, encoded} = SnmpKit.SnmpLib.PDU.encode_message(message)
# Benchmark decoding
{decode_time, _} = :timer.tc(fn ->
for _ <- 1..iterations do
{:ok, _decoded} = SnmpKit.SnmpLib.PDU.decode_message(encoded)
end
end)
%{
iterations: iterations,
encode_time_ms: encode_time / 1000,
decode_time_ms: decode_time / 1000,
encode_ops_per_sec: iterations / (encode_time / 1_000_000),
decode_ops_per_sec: iterations / (decode_time / 1_000_000),
encode_time_per_op_us: encode_time / iterations,
decode_time_per_op_us: decode_time / iterations
}
end
def benchmark_bulk_operations(device_count \ 100) do
devices = for i <- 1..device_count, do: "192.168.1." <> Integer.to_string(i)
# Benchmark sequential operations
{seq_time, seq_results} = :timer.tc(fn ->
Enum.map(devices, fn device ->
SnmpKit.SnmpLib.Manager.get(device, [1, 3, 6, 1, 2, 1, 1, 3, 0], timeout: 100)
end)
end)
# Benchmark concurrent operations
{conc_time, conc_results} = :timer.tc(fn ->
devices
|> Task.async_stream(fn device ->
SnmpKit.SnmpLib.Manager.get(device, [1, 3, 6, 1, 2, 1, 1, 3, 0], timeout: 100)
end, max_concurrency: 50, timeout: 1000)
|> Enum.map(fn {:ok, result} -> result end)
end)
%{
device_count: device_count,
sequential: %{
time_ms: seq_time / 1000,
ops_per_sec: device_count / (seq_time / 1_000_000),
success_count: count_successes(seq_results)
},
concurrent: %{
time_ms: conc_time / 1000,
ops_per_sec: device_count / (conc_time / 1_000_000),
success_count: count_successes(conc_results),
speedup: seq_time / conc_time
}
}
end
def benchmark_oid_operations(iterations \ 100_000) do
test_oids = [
"1.3.6.1.2.1.1.1.0",
"1.3.6.1.4.1.8072.1.3.2.3.1.2.8.110.101.116.45.115.110.109.112",
"1.3.6.1.2.1.2.2.1.10.1000"
]
results = for oid_string <- test_oids do
# Benchmark string to list conversion
{str_to_list_time, _} = :timer.tc(fn ->
for _ <- 1..iterations do
{:ok, _list} = SnmpKit.SnmpLib.OID.string_to_list(oid_string)
end
end)
# Convert once for reverse benchmark
{:ok, oid_list} = SnmpKit.SnmpLib.OID.string_to_list(oid_string)
# Benchmark list to string conversion
{list_to_str_time, _} = :timer.tc(fn ->
for _ <- 1..iterations do
{:ok, _string} = SnmpKit.SnmpLib.OID.list_to_string(oid_list)
end
end)
%{
oid: oid_string,
oid_length: length(oid_list),
str_to_list_us_per_op: str_to_list_time / iterations,
list_to_str_us_per_op: list_to_str_time / iterations,
str_to_list_ops_per_sec: iterations / (str_to_list_time / 1_000_000),
list_to_str_ops_per_sec: iterations / (list_to_str_time / 1_000_000)
}
end
%{
iterations: iterations,
oid_benchmarks: results,
average_str_to_list_us: Enum.reduce(results, 0, &(&1.str_to_list_us_per_op + &2)) / length(results),
average_list_to_str_us: Enum.reduce(results, 0, &(&1.list_to_str_us_per_op + &2)) / length(results)
}
end
defp count_successes(results) do
Enum.count(results, fn
{:ok, _} -> true
_ -> false
end)
end
end
# Example usage:
# encoding_perf = SnmpBenchmark.benchmark_encoding(50_000)
# IO.puts("Encoding: " <> Integer.to_string(trunc(encoding_perf.encode_ops_per_sec)) <> " ops/sec")
# IO.puts("Decoding: " <> Integer.to_string(trunc(encoding_perf.decode_ops_per_sec)) <> " ops/sec")
# bulk_perf = SnmpBenchmark.benchmark_bulk_operations(200)
# IO.puts("Sequential: " <> Float.to_string(bulk_perf.sequential.time_ms) <> "ms")
# IO.puts("Concurrent: " <> Float.to_string(bulk_perf.concurrent.time_ms) <> "ms (" <> Float.to_string(bulk_perf.concurrent.speedup) <> "x faster)")
# oid_perf = SnmpBenchmark.benchmark_oid_operations(100_000)
# IO.puts("Average OID conversion: " <> Float.to_string(oid_perf.average_str_to_list_us) <> "μs per operation")
RFC Compliance
This library achieves 100% compliance with:
- RFC 1157 (SNMPv1)
- RFC 1905 (SNMPv2c Protocol Operations)
- RFC 3416 (SNMPv2c Enhanced Operations)
- ITU-T X.690 (ASN.1 BER Encoding Rules)
Summary
Functions
@spec info() :: map()
Returns comprehensive information about the SnmpLib library capabilities.
Useful for debugging, configuration validation, and feature discovery.
Returns
A map containing:
:version
: Library version:features
: Available features and capabilities:modules
: Core modules and their descriptions:compliance
: RFC compliance information
Examples
info = SnmpLib.info()
IO.puts("SNMP Library v" <> info.version)
IO.puts("Features: " <> Enum.join(info.features, ", "))
Returns the version of the SnmpLib library.
Examples
iex> is_binary(SnmpLib.version())
true
iex> SnmpLib.version() |> String.contains?(".")
true