SnmpKit.SnmpMgr.Stream (snmpkit v1.2.0)
High-performance streaming SNMP operations for memory-efficient processing.
This module provides streaming APIs that allow processing of large SNMP datasets without loading everything into memory at once. Perfect for large tables and real-time monitoring scenarios.
Summary
Functions
Creates a concurrent stream that processes multiple devices in parallel.
Creates a filtered stream that applies predicates during data fetching.
Creates a real-time monitoring stream that polls devices at intervals.
Creates a stream for processing large SNMP tables.
Creates a stream for walking large SNMP trees without memory overhead.
Functions
Creates a concurrent stream that processes multiple devices in parallel.
Combines results from multiple devices into a single stream with configurable concurrency and ordering.
Parameters
- device_operations- List of {target, operation, oid, opts} tuples
- opts- Options including :max_concurrent, :ordered, :timeout
Examples
# Concurrent table walks
operations = [
  {"switch1", :walk_table, "ifTable", []},
  {"switch2", :walk_table, "ifTable", []},
  {"router1", :walk, "ipRouteTable", []}
]
operations
|> SnmpKit.SnmpMgr.Stream.concurrent_stream(max_concurrent: 3)
|> Stream.each(&process_device_data/1)
|> Stream.run()Creates a filtered stream that applies predicates during data fetching.
This is more efficient than Stream.filter/2 for large datasets as it can skip unnecessary network requests based on OID patterns.
Parameters
- target- The target device
- root_oid- Starting OID
- filter_fn- Function to filter OIDs and values
- opts- Stream options
Examples
# Only fetch interface names (column 2)
filter_fn = fn {oid, _type, _value} ->
  case SnmpKit.SnmpLib.OID.string_to_list(oid) do
    {:ok, oid_list} -> List.last(oid_list, 2) |> hd() == 2
    _ -> false
  end
end
"switch.local"
|> SnmpKit.SnmpMgr.Stream.filtered_stream("ifTable", filter_fn)
|> Enum.to_list()Creates a real-time monitoring stream that polls devices at intervals.
Provides a continuous stream of SNMP data for real-time monitoring and alerting applications.
Parameters
- targets- List of {target, oid} tuples to monitor
- opts- Options including :interval, :buffer_size, :error_handling
Examples
# Monitor multiple devices
targets = [
  {"switch1", "ifInOctets.1"},
  {"switch2", "ifInOctets.1"},
  {"router1", "sysUpTime.0"}
]
targets
|> SnmpKit.SnmpMgr.Stream.monitor_stream(interval: 30_000)
|> Stream.each(&send_to_metrics_system/1)
|> Stream.run()
# Monitor with error handling
targets
|> SnmpKit.SnmpMgr.Stream.monitor_stream(
     interval: 10_000,
     error_handling: :skip_errors
   )
|> Stream.filter(&is_successful_reading/1)
|> Stream.each(&update_dashboard/1)
|> Stream.run()Creates a stream for processing large SNMP tables.
Optimized for table structures with intelligent chunking based on table columns and indexes.
Parameters
- target- The target device
- table_oid- The table OID to stream
- opts- Options including :chunk_size, :columns, :indexes
Examples
# Stream interface table with column filtering
"switch.local"
|> SnmpKit.SnmpMgr.Stream.table_stream("ifTable", columns: [:ifDescr, :ifOperStatus])
|> Stream.filter(fn {_index, data} -> data[:ifOperStatus] == 1 end)
|> Stream.map(fn {index, data} -> {index, data[:ifDescr]} end)
|> Enum.to_list()
# Process table with custom chunk size
"device.local"
|> SnmpKit.SnmpMgr.Stream.table_stream("ipRouteTable", chunk_size: 200)
|> Stream.each(&process_route_entry/1)
|> Stream.run()Creates a stream for walking large SNMP trees without memory overhead.
The stream lazily fetches data in chunks, allowing processing of arbitrarily large SNMP trees with constant memory usage.
Parameters
- target- The target device
- root_oid- Starting OID for the walk
- opts- Options including :chunk_size, :version, :adaptive
Examples
# Process a large table efficiently
"switch.local"
|> SnmpKit.SnmpMgr.Stream.walk_stream("ifTable")
|> Stream.filter(fn {_oid, value} -> String.contains?(value, "Gigabit") end)
|> Stream.map(&extract_interface_info/1)
|> Enum.to_list()
# Real-time processing with backpressure
"router.local"
|> SnmpKit.SnmpMgr.Stream.walk_stream("ipRouteTable", chunk_size: 100)
|> Stream.each(&update_routing_database/1)
|> Stream.run()