SnmpKit.SnmpMgr.Stream (snmpkit v0.6.3)

High-performance streaming SNMP operations for memory-efficient processing.

This module provides streaming APIs that allow processing of large SNMP datasets without loading everything into memory at once. Perfect for large tables and real-time monitoring scenarios.

Summary

Functions

Creates a concurrent stream that processes multiple devices in parallel.

Creates a filtered stream that applies predicates during data fetching.

Creates a real-time monitoring stream that polls devices at intervals.

Creates a stream for processing large SNMP tables.

Creates a stream for walking large SNMP trees without memory overhead.

Functions

concurrent_stream(device_operations, opts \\ [])

Creates a concurrent stream that processes multiple devices in parallel.

Combines results from multiple devices into a single stream with configurable concurrency and ordering.

Parameters

  • device_operations - List of {target, operation, oid, opts} tuples
  • opts - Options including :max_concurrent, :ordered, :timeout

Examples

# Concurrent table walks
operations = [
  {"switch1", :walk_table, "ifTable", []},
  {"switch2", :walk_table, "ifTable", []},
  {"router1", :walk, "ipRouteTable", []}
]

operations
|> SnmpKit.SnmpMgr.Stream.concurrent_stream(max_concurrent: 3)
|> Stream.each(&process_device_data/1)
|> Stream.run()

filtered_stream(target, root_oid, filter_fn, opts \\ [])

Creates a filtered stream that applies predicates during data fetching.

This is more efficient than Stream.filter/2 for large datasets as it can skip unnecessary network requests based on OID patterns.

Parameters

  • target - The target device
  • root_oid - Starting OID
  • filter_fn - Function to filter OIDs and values
  • opts - Stream options

Examples

# Only fetch interface names (column 2)
filter_fn = fn {oid, _type, _value} ->
  case SnmpKit.SnmpLib.OID.string_to_list(oid) do
    {:ok, oid_list} -> List.last(oid_list, 2) |> hd() == 2
    _ -> false
  end
end

"switch.local"
|> SnmpKit.SnmpMgr.Stream.filtered_stream("ifTable", filter_fn)
|> Enum.to_list()

monitor_stream(targets, opts \\ [])

Creates a real-time monitoring stream that polls devices at intervals.

Provides a continuous stream of SNMP data for real-time monitoring and alerting applications.

Parameters

  • targets - List of {target, oid} tuples to monitor
  • opts - Options including :interval, :buffer_size, :error_handling

Examples

# Monitor multiple devices
targets = [
  {"switch1", "ifInOctets.1"},
  {"switch2", "ifInOctets.1"},
  {"router1", "sysUpTime.0"}
]

targets
|> SnmpKit.SnmpMgr.Stream.monitor_stream(interval: 30_000)
|> Stream.each(&send_to_metrics_system/1)
|> Stream.run()

# Monitor with error handling
targets
|> SnmpKit.SnmpMgr.Stream.monitor_stream(
     interval: 10_000,
     error_handling: :skip_errors
   )
|> Stream.filter(&is_successful_reading/1)
|> Stream.each(&update_dashboard/1)
|> Stream.run()

table_stream(target, table_oid, opts \\ [])

Creates a stream for processing large SNMP tables.

Optimized for table structures with intelligent chunking based on table columns and indexes.

Parameters

  • target - The target device
  • table_oid - The table OID to stream
  • opts - Options including :chunk_size, :columns, :indexes

Examples

# Stream interface table with column filtering
"switch.local"
|> SnmpKit.SnmpMgr.Stream.table_stream("ifTable", columns: [:ifDescr, :ifOperStatus])
|> Stream.filter(fn {_index, data} -> data[:ifOperStatus] == 1 end)
|> Stream.map(fn {index, data} -> {index, data[:ifDescr]} end)
|> Enum.to_list()

# Process table with custom chunk size
"device.local"
|> SnmpKit.SnmpMgr.Stream.table_stream("ipRouteTable", chunk_size: 200)
|> Stream.each(&process_route_entry/1)
|> Stream.run()

walk_stream(target, root_oid, opts \\ [])

Creates a stream for walking large SNMP trees without memory overhead.

The stream lazily fetches data in chunks, allowing processing of arbitrarily large SNMP trees with constant memory usage.

Parameters

  • target - The target device
  • root_oid - Starting OID for the walk
  • opts - Options including :chunk_size, :version, :adaptive

Examples

# Process a large table efficiently
"switch.local"
|> SnmpKit.SnmpMgr.Stream.walk_stream("ifTable")
|> Stream.filter(fn {_oid, value} -> String.contains?(value, "Gigabit") end)
|> Stream.map(&extract_interface_info/1)
|> Enum.to_list()

# Real-time processing with backpressure
"router.local"
|> SnmpKit.SnmpMgr.Stream.walk_stream("ipRouteTable", chunk_size: 100)
|> Stream.each(&update_routing_database/1)
|> Stream.run()