PcapFileEx.Merge (pcap_file_ex v0.5.5)
View SourceMulti-file PCAP/PCAPNG timeline merge with nanosecond precision.
This module provides functionality to merge multiple packet capture files into a single chronological stream. Packets are sorted by nanosecond-precision timestamps, making it ideal for correlating captures from multiple network taps or synchronized systems.
Clock Synchronization
For accurate multi-file merging, ensure all capture systems have synchronized clocks using NTP (Network Time Protocol) or chronyd. See the README for chronyd setup instructions.
Features
- Nanosecond precision: Preserves full timestamp accuracy
- Memory efficient: Streaming merge using priority queue (O(N files) memory)
- PCAP + PCAPNG: Supports both formats, with PCAPNG interface remapping
- Datalink validation: Ensures all files share compatible datalink types
- Source annotation: Optionally track which file each packet came from
- Clock validation: Optional validation of clock synchronization
- Configurable error handling:
:skip,:halt, or:collectmodes
Examples
# Basic merge of two PCAP files
{:ok, stream} = PcapFileEx.Merge.stream(["server1.pcap", "server2.pcap"])
packets = Enum.to_list(stream)
# Merge with source annotation
{:ok, stream} = PcapFileEx.Merge.stream(
["tap1.pcap", "tap2.pcap"],
annotate_source: true
)
Enum.each(stream, fn {packet, meta} ->
IO.puts("Packet from #{meta.source_file}")
end)
# Merge with clock validation
case PcapFileEx.Merge.validate_clocks(["server1.pcap", "server2.pcap"]) do
{:ok, stats} ->
IO.inspect(stats.max_drift_ms)
{:ok, stream} = PcapFileEx.Merge.stream(["server1.pcap", "server2.pcap"])
{:error, :excessive_drift, meta} ->
IO.puts("Clock drift too large: #{meta.max_drift_ms}ms")
end
# Bang variant (raises on errors)
stream = PcapFileEx.Merge.stream!(["server1.pcap", "server2.pcap"])
# Count total packets across files
count = PcapFileEx.Merge.count(["server1.pcap", "server2.pcap"])
Summary
Functions
Counts the total number of packets across multiple files without loading them.
Creates a lazy stream that merges packets from multiple PCAP/PCAPNG files in chronological order.
Same as stream/2 but raises on errors instead of returning error tuples.
Validates clock synchronization across multiple capture files.
Types
@type error_mode() :: :skip | :halt | :collect
@type merge_option() :: {:annotate_source, boolean()} | {:on_error, error_mode()} | {:validate_clocks, boolean()}
@type path() :: String.t()
Functions
@spec count([path()]) :: non_neg_integer()
Counts the total number of packets across multiple files without loading them.
This is more efficient than merging and counting, as it only reads packet headers without full parsing.
Examples
count = PcapFileEx.Merge.count(["server1.pcap", "server2.pcap"])
IO.puts("Total packets: #{count}")
@spec stream([path()], [merge_option()]) :: {:ok, Enumerable.t()} | {:error, term()}
Creates a lazy stream that merges packets from multiple PCAP/PCAPNG files in chronological order.
Parameters
paths- List of file paths to mergeopts- Keyword list of options::annotate_source(boolean, default:false) - Include source file metadata:on_error(:skip | :halt | :collect, default::halt) - Error handling mode:validate_clocks(boolean, default:false) - Validate clock synchronization
Returns
{:ok, stream}- Stream that emits merged packets{:error, reason}- If validation fails
Stream Item Types
The stream emits different item types depending on options:
# Default: bare packets
stream([paths])
# => %Packet{}, %Packet{}, ...
# With annotation
stream([paths], annotate_source: true)
# => {%Packet{}, %{source_file: ...}}, ...
# With :collect error mode
stream([paths], on_error: :collect)
# => {:ok, %Packet{}}, {:error, %{...}}, ...
# With annotation + :collect (nested)
stream([paths], annotate_source: true, on_error: :collect)
# => {:ok, {%Packet{}, %{source_file: ...}}}, {:error, %{...}}, ...
# With :skip mode
stream([paths], on_error: :skip)
# => %Packet{}, {:skipped_packet, %{count: 1, ...}}, %Packet{}, ...Examples
{:ok, stream} = PcapFileEx.Merge.stream(["server1.pcap", "server2.pcap"])
{:ok, stream} = PcapFileEx.Merge.stream(
["tap1.pcap", "tap2.pcap"],
annotate_source: true,
on_error: :collect
)
@spec stream!([path()], [merge_option()]) :: Enumerable.t()
Same as stream/2 but raises on errors instead of returning error tuples.
Examples
stream = PcapFileEx.Merge.stream!(["server1.pcap", "server2.pcap"])Raises
PcapFileEx.NoCommonDatalinkError- When files have incompatible datalink typesFile.Error- When a file cannot be openedArgumentError- When paths list is empty or invalid
Validates clock synchronization across multiple capture files.
This function performs a full scan of all files to collect timing statistics and detect potential clock drift between systems. It's useful for validating that captures were properly synchronized before merging.
Performance Note: This function performs a full scan of all files and is NOT included in the merge overhead target. Results are cached by (file_path, mtime, size) to avoid repeated scans.
Parameters
paths- List of file paths to validate
Returns
{:ok, stats}- Validation succeeded, returns statistics map{:error, :excessive_drift, meta}- Clock drift exceeds threshold
Statistics Map
%{
max_drift_ms: float(), # Maximum drift between any two files
files: [
%{
path: String.t(),
first_timestamp: Timestamp.t(),
last_timestamp: Timestamp.t(),
duration_ms: float()
}
]
}Examples
case PcapFileEx.Merge.validate_clocks(["server1.pcap", "server2.pcap"]) do
{:ok, stats} ->
IO.puts("Max drift: #{stats.max_drift_ms}ms")
{:error, :excessive_drift, meta} ->
IO.puts("Drift too large: #{meta.max_drift_ms}ms")
end