PcapFileEx.Stream (pcap_file_ex v0.5.5)
View SourceStream protocol implementation for lazy packet reading.
Safe vs Raising Variants
This module provides two styles of stream APIs:
Safe variants (do not raise mid-stream)
packets/1- Returns{:ok, stream} | {:error, reason}, emits tagged tuplesfrom_reader/1- Returns stream that emits tagged tuples
These emit {:ok, packet} for successful reads and {:error, metadata} for
corrupted packets or read failures. The stream halts after emitting an error tuple.
Raising variants (raise on errors)
packets!/1- Raises on file open or mid-stream errorsfrom_reader!/1- Raises on mid-stream errors
These provide the simpler API but cannot gracefully handle corrupted files.
Error Handling
Safe streams emit error tuples with context:
{:error, %{reason: "parser error message", packet_index: 42}}You can handle these in several ways:
# Stop on first error
{:ok, stream} = Stream.packets("capture.pcap")
Enum.reduce_while(stream, [], fn
{:ok, packet}, acc -> {:cont, [packet | acc]}
{:error, %{reason: r, packet_index: i}}, _acc ->
{:halt, {:error, "Packet #{i} failed: #{r}"}}
end)
# Skip errors and continue
stream
|> Stream.filter(fn
{:ok, _} -> true
{:error, meta} -> Logger.warning("Skipped: #{inspect(meta)}"); false
end)
|> Stream.map(fn {:ok, packet} -> packet end)Migration from 0.2.x to 0.3.0
In v0.2.x, packets/1 validated at construction but raised mid-stream on errors.
In v0.3.0, it emits tagged tuples instead:
- Before:
{:ok, stream} = packets(path); packets = Enum.to_list(stream) - After: Handle tuples or use
packets!/1for raising behavior
Summary
Types
Error metadata emitted by safe streams when a packet read fails.
Tagged tuple emitted by safe streams. Either a successful packet read or an error.
Functions
Creates a lazy stream of packets from an already opened reader.
Creates a lazy stream of packets from an already opened reader, raising on errors.
Creates a lazy stream of packets from a PCAP file.
Creates a lazy stream of packets from a PCAP file, raising on errors.
Types
@type error_metadata() :: %{reason: String.t(), packet_index: non_neg_integer()}
Error metadata emitted by safe streams when a packet read fails.
Contains the error reason and the 0-based index of the packet that failed to read.
@type stream_item() :: {:ok, PcapFileEx.Packet.t()} | {:error, error_metadata()}
Tagged tuple emitted by safe streams. Either a successful packet read or an error.
Functions
@spec from_reader(PcapFileEx.Pcap.t() | PcapFileEx.PcapNg.t()) :: Enumerable.t(stream_item())
Creates a lazy stream of packets from an already opened reader.
This does NOT automatically close the reader when done. Works with both PCAP and PCAPNG readers.
The returned stream emits tagged tuples:
{:ok, packet}for successful packet reads{:error, metadata}for read failures (corrupted data, I/O errors, etc.)
The stream halts after emitting an error tuple. For raising behavior,
use from_reader!/1 instead.
Options
:hosts_map- Map of IP address strings to hostname strings for endpoint resolution
Examples
{:ok, reader} = PcapFileEx.Pcap.open("capture.pcap")
stream = PcapFileEx.Stream.from_reader(reader)
valid_packets = stream
|> Stream.filter(fn
{:ok, _} -> true
{:error, _} -> false
end)
|> Stream.map(fn {:ok, packet} -> packet end)
|> Enum.take(10)
PcapFileEx.Pcap.close(reader)
# PCAPNG example
{:ok, reader} = PcapFileEx.PcapNg.open("capture.pcapng")
stream = PcapFileEx.Stream.from_reader(reader)
# ... process stream ...
PcapFileEx.PcapNg.close(reader)
# With hosts mapping
hosts = %{"192.168.1.1" => "gateway"}
stream = PcapFileEx.Stream.from_reader(reader, hosts_map: hosts)
@spec from_reader( PcapFileEx.Pcap.t() | PcapFileEx.PcapNg.t(), keyword() ) :: Enumerable.t(stream_item())
@spec from_reader!(PcapFileEx.Pcap.t() | PcapFileEx.PcapNg.t()) :: Enumerable.t()
Creates a lazy stream of packets from an already opened reader, raising on errors.
This does NOT automatically close the reader when done. Works with both PCAP and PCAPNG readers.
Options
:hosts_map- Map of IP address strings to hostname strings for endpoint resolution
Examples
{:ok, reader} = PcapFileEx.Pcap.open("capture.pcap")
PcapFileEx.Stream.from_reader!(reader)
|> Enum.take(10)
PcapFileEx.Pcap.close(reader)
# With hosts mapping
hosts = %{"192.168.1.1" => "gateway"}
PcapFileEx.Stream.from_reader!(reader, hosts_map: hosts)
|> Enum.take(10)
@spec from_reader!( PcapFileEx.Pcap.t() | PcapFileEx.PcapNg.t(), keyword() ) :: Enumerable.t()
@spec packets(Path.t()) :: {:ok, Enumerable.t(stream_item())} | {:error, String.t()}
Creates a lazy stream of packets from a PCAP file.
Pre-validates that the file can be opened before returning the stream.
Returns {:ok, stream} on success or {:error, reason} if the file
cannot be opened.
The stream automatically handles opening and closing the file.
The returned stream emits tagged tuples:
{:ok, packet}for successful packet reads{:error, metadata}for read failures (corrupted data, I/O errors, etc.)
The stream halts after emitting an error tuple. To continue reading past
errors or to get raising behavior, use packets!/1 instead.
Options
:hosts_map- Map of IP address strings to hostname strings for endpoint resolution
Examples
# Basic usage with pattern matching
{:ok, stream} = PcapFileEx.Stream.packets("capture.pcap")
packets = stream
|> Stream.map(fn
{:ok, packet} -> packet
{:error, meta} -> raise "Error at packet #{meta.packet_index}: #{meta.reason}"
end)
|> Enum.to_list()
# Stop on first error
result = Enum.reduce_while(stream, [], fn
{:ok, packet}, acc -> {:cont, [packet | acc]}
{:error, %{packet_index: i, reason: r}}, _acc ->
{:halt, {:error, "Failed at packet #{i}: #{r}"}}
end)
# Skip errors and continue
valid_packets = stream
|> Stream.filter(fn
{:ok, _} -> true
{:error, _} -> false
end)
|> Stream.map(fn {:ok, packet} -> packet end)
|> Enum.to_list()
# Handle file open errors
case PcapFileEx.Stream.packets("nonexistent.pcap") do
{:ok, stream} -> process_stream(stream)
{:error, msg} -> IO.puts("Cannot open file: #{msg}")
end
# With hosts mapping
hosts = %{"192.168.1.1" => "gateway", "10.0.0.1" => "server"}
{:ok, stream} = PcapFileEx.Stream.packets("capture.pcap", hosts_map: hosts)
@spec packets( Path.t(), keyword() ) :: {:ok, Enumerable.t(stream_item())} | {:error, String.t()}
@spec packets!(Path.t()) :: Enumerable.t()
Creates a lazy stream of packets from a PCAP file, raising on errors.
This is the old behavior from version 0.1.x.
Options
:hosts_map- Map of IP address strings to hostname strings for endpoint resolution
Examples
PcapFileEx.Stream.packets!("capture.pcap")
|> Stream.filter(fn packet -> byte_size(packet.data) > 1000 end)
|> Enum.take(10)
# With hosts mapping
hosts = %{"192.168.1.1" => "gateway"}
PcapFileEx.Stream.packets!("capture.pcap", hosts_map: hosts)
|> Enum.take(10)
@spec packets!( Path.t(), keyword() ) :: Enumerable.t()