SnmpKit.SnmpMgr.Stream (snmpkit v0.6.4)
High-performance streaming SNMP operations for memory-efficient processing.
This module provides streaming APIs that allow processing of large SNMP datasets without loading everything into memory at once. Perfect for large tables and real-time monitoring scenarios.
Summary
Functions
Creates a concurrent stream that processes multiple devices in parallel.
Creates a filtered stream that applies predicates during data fetching.
Creates a real-time monitoring stream that polls devices at intervals.
Creates a stream for processing large SNMP tables.
Creates a stream for walking large SNMP trees without memory overhead.
Functions
Creates a concurrent stream that processes multiple devices in parallel.
Combines results from multiple devices into a single stream with configurable concurrency and ordering.
Parameters
device_operations
- List of {target, operation, oid, opts} tuplesopts
- Options including :max_concurrent, :ordered, :timeout
Examples
# Concurrent table walks
operations = [
{"switch1", :walk_table, "ifTable", []},
{"switch2", :walk_table, "ifTable", []},
{"router1", :walk, "ipRouteTable", []}
]
operations
|> SnmpKit.SnmpMgr.Stream.concurrent_stream(max_concurrent: 3)
|> Stream.each(&process_device_data/1)
|> Stream.run()
Creates a filtered stream that applies predicates during data fetching.
This is more efficient than Stream.filter/2 for large datasets as it can skip unnecessary network requests based on OID patterns.
Parameters
target
- The target deviceroot_oid
- Starting OIDfilter_fn
- Function to filter OIDs and valuesopts
- Stream options
Examples
# Only fetch interface names (column 2)
filter_fn = fn {oid, _type, _value} ->
case SnmpKit.SnmpLib.OID.string_to_list(oid) do
{:ok, oid_list} -> List.last(oid_list, 2) |> hd() == 2
_ -> false
end
end
"switch.local"
|> SnmpKit.SnmpMgr.Stream.filtered_stream("ifTable", filter_fn)
|> Enum.to_list()
Creates a real-time monitoring stream that polls devices at intervals.
Provides a continuous stream of SNMP data for real-time monitoring and alerting applications.
Parameters
targets
- List of {target, oid} tuples to monitoropts
- Options including :interval, :buffer_size, :error_handling
Examples
# Monitor multiple devices
targets = [
{"switch1", "ifInOctets.1"},
{"switch2", "ifInOctets.1"},
{"router1", "sysUpTime.0"}
]
targets
|> SnmpKit.SnmpMgr.Stream.monitor_stream(interval: 30_000)
|> Stream.each(&send_to_metrics_system/1)
|> Stream.run()
# Monitor with error handling
targets
|> SnmpKit.SnmpMgr.Stream.monitor_stream(
interval: 10_000,
error_handling: :skip_errors
)
|> Stream.filter(&is_successful_reading/1)
|> Stream.each(&update_dashboard/1)
|> Stream.run()
Creates a stream for processing large SNMP tables.
Optimized for table structures with intelligent chunking based on table columns and indexes.
Parameters
target
- The target devicetable_oid
- The table OID to streamopts
- Options including :chunk_size, :columns, :indexes
Examples
# Stream interface table with column filtering
"switch.local"
|> SnmpKit.SnmpMgr.Stream.table_stream("ifTable", columns: [:ifDescr, :ifOperStatus])
|> Stream.filter(fn {_index, data} -> data[:ifOperStatus] == 1 end)
|> Stream.map(fn {index, data} -> {index, data[:ifDescr]} end)
|> Enum.to_list()
# Process table with custom chunk size
"device.local"
|> SnmpKit.SnmpMgr.Stream.table_stream("ipRouteTable", chunk_size: 200)
|> Stream.each(&process_route_entry/1)
|> Stream.run()
Creates a stream for walking large SNMP trees without memory overhead.
The stream lazily fetches data in chunks, allowing processing of arbitrarily large SNMP trees with constant memory usage.
Parameters
target
- The target deviceroot_oid
- Starting OID for the walkopts
- Options including :chunk_size, :version, :adaptive
Examples
# Process a large table efficiently
"switch.local"
|> SnmpKit.SnmpMgr.Stream.walk_stream("ifTable")
|> Stream.filter(fn {_oid, value} -> String.contains?(value, "Gigabit") end)
|> Stream.map(&extract_interface_info/1)
|> Enum.to_list()
# Real-time processing with backpressure
"router.local"
|> SnmpKit.SnmpMgr.Stream.walk_stream("ipRouteTable", chunk_size: 100)
|> Stream.each(&update_routing_database/1)
|> Stream.run()