Multi-File PCAP Merge Guide
View SourcePcapFileEx provides chronological merging of multiple PCAP/PCAPNG files. This guide explains when and how to use the merge functionality effectively.
Overview
The PcapFileEx.Merge module merges packets from multiple capture files into a single chronologically-ordered stream using nanosecond-precision timestamps.
Key Features
| Feature | Description | Benefit |
|---|---|---|
| Nanosecond precision | Uses Timestamp.compare/2 for accurate ordering | Preserves microsecond/nanosecond timestamps |
| Memory efficient | O(N files) memory via min-heap | Handles unlimited file sizes |
| Mixed formats | PCAP + PCAPNG in same merge | No conversion needed |
| Interface remapping | Global interface ID assignment | Prevents PCAPNG interface collisions |
| Source annotation | Track packet origins | Debug and provenance tracking |
| Clock validation | Detect timestamp drift | Identify clock sync issues |
| Flexible error handling | :halt, :skip, :collect modes | Control error behavior |
Decision Tree: When to Use Merge
Need to combine multiple capture files?
├─ YES: Are files from synchronized clocks?
│ ├─ YES: Are files < 100MB each?
│ │ ├─ YES: Use Merge.stream/2 (simple)
│ │ └─ NO: Use Merge.stream/2 with lazy processing
│ └─ NO: Use Merge.validate_clocks/1 first
│ ├─ Drift < 10s: Use merge (with warning)
│ └─ Drift > 10s: Fix clock sync, then merge
└─ NO: Use standard PcapFileEx.stream/1
Need to track packet origins?
├─ YES: Use annotate_source: true option
└─ NO: Use default options
Files might have corrupt packets?
├─ YES: Use on_error: :collect or :skip
└─ NO: Use default on_error: :haltBasic Usage
Simple Merge
# Merge multiple files chronologically
{:ok, stream} = PcapFileEx.Merge.stream([
"server1.pcap",
"server2.pcap",
"server3.pcap"
])
# Process as normal stream
packets = Enum.to_list(stream)
# Or use bang variant (raises on error)
stream = PcapFileEx.Merge.stream!([
"server1.pcap",
"server2.pcap"
])Count Total Packets
# Fast packet count without loading packets
count = PcapFileEx.Merge.count([
"capture1.pcap",
"capture2.pcap",
"capture3.pcap"
])
IO.puts("Total packets: #{count}")Validate Clock Synchronization
files = ["server1.pcap", "server2.pcap", "server3.pcap"]
case PcapFileEx.Merge.validate_clocks(files) do
{:ok, stats} ->
IO.puts("Clock drift: #{stats.max_drift_ms}ms - acceptable")
{:ok, stream} = PcapFileEx.Merge.stream(files)
# Process stream...
{:error, :excessive_drift, stats} ->
IO.puts("WARNING: Clock drift #{stats.max_drift_ms}ms exceeds threshold")
IO.puts("Files:")
Enum.each(stats.files, fn file_stats ->
IO.puts(" #{file_stats.path}: #{file_stats.count} packets")
IO.puts(" First: #{file_stats.first_timestamp}")
IO.puts(" Last: #{file_stats.last_timestamp}")
end)
# Decide whether to proceed with merge or fix clock sync
endAdvanced Features
Source Annotation
Track which file each packet came from:
{:ok, stream} = PcapFileEx.Merge.stream(
["server1.pcap", "server2.pcap"],
annotate_source: true
)
# Each item is now {packet, metadata}
stream
|> Enum.take(10)
|> Enum.each(fn {packet, metadata} ->
IO.puts("Packet from #{metadata.source_file}")
IO.puts(" File index: #{metadata.file_index}")
IO.puts(" Packet index: #{metadata.packet_index}")
IO.puts(" Timestamp: #{packet.timestamp_precise}")
# PCAPNG files include interface ID info
if Map.has_key?(metadata, :original_interface_id) do
IO.puts(" Original interface: #{metadata.original_interface_id}")
IO.puts(" Remapped interface: #{metadata.remapped_interface_id}")
end
end)Error Handling Modes
Halt Mode (Default)
# Stop on first error - safest behavior
{:ok, stream} = PcapFileEx.Merge.stream(
files,
on_error: :halt # default
)
# Stream will halt if any packet fails to parse
packets = Enum.to_list(stream)Skip Mode
# Skip corrupt packets, emit skip markers
{:ok, stream} = PcapFileEx.Merge.stream(
files,
on_error: :skip
)
stream
|> Enum.each(fn
%PcapFileEx.Packet{} = packet ->
# Normal packet
process_packet(packet)
{:skipped_packet, %{count: count, last_error: error}} ->
# Corrupt packet skipped
Logger.warning("Skipped #{count} packet(s): #{error.reason}")
end)Collect Mode
# Wrap all items in result tuples
{:ok, stream} = PcapFileEx.Merge.stream(
files,
on_error: :collect
)
{packets, errors} = Enum.reduce(stream, {[], []}, fn
{:ok, packet}, {pkts, errs} ->
{[packet | pkts], errs}
{:error, metadata}, {pkts, errs} ->
{pkts, [metadata | errs]}
end)
IO.puts("Successfully parsed: #{length(packets)} packets")
IO.puts("Errors: #{length(errors)}")
Enum.each(errors, fn error ->
IO.puts("Error in #{error.source_file} at packet #{error.packet_index}")
IO.puts(" Reason: #{error.reason}")
end)Collect + Annotation (Nested Tuples)
# Combine error collection with source tracking
{:ok, stream} = PcapFileEx.Merge.stream(
files,
annotate_source: true,
on_error: :collect
)
# Items are now {:ok, {packet, metadata}} or {:error, metadata}
stream
|> Enum.take(100)
|> Enum.each(fn
{:ok, {packet, metadata}} ->
IO.puts("Packet from #{metadata.source_file}: #{byte_size(packet.data)} bytes")
{:error, metadata} ->
Logger.error("Failed to parse #{metadata.source_file} packet #{metadata.packet_index}")
end)PCAPNG Interface Remapping
When merging multiple PCAPNG files, interface IDs are automatically remapped to prevent collisions:
# file1.pcapng has interfaces 0, 1
# file2.pcapng has interfaces 0, 1
# Merged stream has global interfaces 0, 1, 2, 3
{:ok, stream} = PcapFileEx.Merge.stream(
["file1.pcapng", "file2.pcapng"],
annotate_source: true
)
stream
|> Enum.take(5)
|> Enum.each(fn {packet, metadata} ->
# packet.interface_id is the global remapped ID
# metadata.original_interface_id is the file-local ID
# metadata.remapped_interface_id == packet.interface_id (always)
IO.puts("From #{metadata.source_file}")
IO.puts(" Original interface: #{metadata.original_interface_id}")
IO.puts(" Global interface: #{metadata.remapped_interface_id}")
# Invariant always holds: packet.interface_id == packet.interface.id
if packet.interface do
IO.puts(" Interface name: #{packet.interface.name}")
end
end)Clock Synchronization Best Practices
Why Clock Sync Matters
Without synchronized clocks, packets will be merged in wrong order:
Server A clock: 2025-11-09 10:00:00.000 (fast by 5 seconds)
Server B clock: 2025-11-09 09:59:55.000 (accurate)
Actual timeline:
09:59:55.000 - Server B: HTTP request
09:59:55.100 - Server A: HTTP response (but timestamp says 10:00:00.100!)
Merged timeline WITHOUT sync:
09:59:55.000 - Server B: HTTP request
10:00:00.100 - Server A: HTTP response <-- WRONG ORDER! (5 seconds late)
Result: HTTP response appears 5 seconds after request (impossible!)Recommended: chronyd (NTP)
See README.md section "Clock Synchronization for Multi-File Merge" for:
- Installation instructions (Linux/macOS)
- Configuration examples
- Verification commands
- Troubleshooting drift issues
Pre-Merge Validation
files = ["server1.pcap", "server2.pcap", "server3.pcap"]
case PcapFileEx.Merge.validate_clocks(files) do
{:ok, %{max_drift_ms: drift}} when drift < 1000 ->
# < 1 second drift is excellent
IO.puts("✅ Clocks well synchronized (#{drift}ms drift)")
{:ok, stream} = PcapFileEx.Merge.stream(files)
{:ok, %{max_drift_ms: drift}} when drift < 10_000 ->
# 1-10 seconds drift is acceptable for most use cases
IO.puts("⚠️ Moderate clock drift (#{drift}ms) - proceed with caution")
{:ok, stream} = PcapFileEx.Merge.stream(files)
{:error, :excessive_drift, %{max_drift_ms: drift}} ->
# > 10 seconds drift is problematic
IO.puts("❌ Excessive clock drift (#{drift}ms) - fix NTP sync before merging")
:error
endPerformance Characteristics
Memory Usage
# Merge is memory-efficient: O(N files) not O(M packets)
# Only one packet per file is buffered in memory
# Small files (3 files × 10MB each)
{:ok, stream} = PcapFileEx.Merge.stream([
"small1.pcap", # 10MB
"small2.pcap", # 10MB
"small3.pcap" # 10MB
])
# Memory: ~3 packets buffered (few KB)
# Total: 30MB of files, but only KB of memory used
# Large files (3 files × 1GB each)
{:ok, stream} = PcapFileEx.Merge.stream([
"large1.pcap", # 1GB
"large2.pcap", # 1GB
"large3.pcap" # 1GB
])
# Memory: Still ~3 packets buffered (few KB)
# Total: 3GB of files, but only KB of memory usedTime Complexity
- Merge algorithm: O(M log N) where M = total packets, N = number of files
- Heap operations: O(log N) per packet (push/pop from min-heap)
- Timestamp comparison: O(1) using
Timestamp.compare/2
Lazy Evaluation
# Stream is lazy - packets only read when consumed
{:ok, stream} = PcapFileEx.Merge.stream([
"file1.pcap",
"file2.pcap",
"file3.pcap"
])
# No packets loaded yet! Files opened but not read.
# Take first 10 packets - only reads enough to produce 10 results
first_10 = Enum.take(stream, 10)
# Find first HTTP packet - stops as soon as found
first_http = Enum.find(stream, fn packet ->
:http in packet.protocols
end)Common Patterns
Distributed Network Capture
# Capture from multiple network taps, merge chronologically
firewall_capture = "firewall.pcap"
web_server_capture = "webserver.pcap"
db_server_capture = "database.pcap"
{:ok, stream} = PcapFileEx.Merge.stream(
[firewall_capture, web_server_capture, db_server_capture],
annotate_source: true
)
# Trace HTTP request through all systems
stream
|> Stream.filter(fn {packet, _meta} -> :http in packet.protocols end)
|> Stream.take(100)
|> Enum.each(fn {packet, metadata} ->
location = Path.basename(metadata.source_file, ".pcap")
IO.puts("[#{location}] HTTP packet at #{packet.timestamp}")
end)Time-Range Analysis
# Merge and filter to specific time window
files = ["capture1.pcap", "capture2.pcap", "capture3.pcap"]
{:ok, stream} = PcapFileEx.Merge.stream(files)
# Find packets in specific 10-second window
start_time = ~U[2025-11-09 10:30:00Z]
end_time = ~U[2025-11-09 10:30:10Z]
packets_in_window =
stream
|> Stream.filter(fn packet ->
DateTime.compare(packet.timestamp, start_time) in [:gt, :eq] and
DateTime.compare(packet.timestamp, end_time) in [:lt, :eq]
end)
|> Enum.to_list()
IO.puts("Found #{length(packets_in_window)} packets in time window")Multi-Site Correlation
# Correlate events across multiple geographic locations
sites = [
{"us-east", "captures/us-east.pcap"},
{"us-west", "captures/us-west.pcap"},
{"eu-west", "captures/eu-west.pcap"},
{"ap-south", "captures/ap-south.pcap"}
]
file_paths = Enum.map(sites, fn {_site, path} -> path end)
{:ok, stream} = PcapFileEx.Merge.stream(
file_paths,
annotate_source: true
)
# Track HTTP requests across sites
stream
|> Stream.filter(fn {packet, _meta} -> :http in packet.protocols end)
|> Enum.each(fn {packet, metadata} ->
{site, _} = Enum.find(sites, fn {_, path} ->
path == metadata.source_file
end)
IO.puts("[#{site}] #{packet.timestamp} - HTTP #{byte_size(packet.data)} bytes")
end)Error Recovery
# Merge with automatic error recovery
files = ["potentially_corrupt1.pcap", "potentially_corrupt2.pcap"]
{:ok, stream} = PcapFileEx.Merge.stream(
files,
annotate_source: true,
on_error: :skip
)
{success_count, skip_count} = Enum.reduce(stream, {0, 0}, fn
%PcapFileEx.Packet{}, {success, skip} ->
{success + 1, skip}
{:skipped_packet, meta}, {success, skip} ->
Logger.warning("Skipped #{meta.count} packet(s) in #{meta.last_error.source_file}")
{success, skip + meta.count}
end)
IO.puts("Successfully parsed: #{success_count}")
IO.puts("Skipped corrupt packets: #{skip_count}")Troubleshooting
Problem: Files Not Merging in Expected Order
Symptom: Packets appear out of order despite using merge
Cause: Clock drift between capture systems
Solution:
# Validate clocks first
{:error, :excessive_drift, stats} = PcapFileEx.Merge.validate_clocks(files)
IO.puts("Clock drift: #{stats.max_drift_ms}ms")
Enum.each(stats.files, fn file_stats ->
IO.puts("#{file_stats.path}:")
IO.puts(" First: #{file_stats.first_timestamp}")
IO.puts(" Last: #{file_stats.last_timestamp}")
end)
# Fix: Synchronize clocks with chronyd/NTP before capturingProblem: "file not found" Error
Symptom: {:error, {:file_not_found, path}}
Solution:
# Validate files exist before merging
files = ["file1.pcap", "file2.pcap", "file3.pcap"]
existing_files = Enum.filter(files, &File.exists?/1)
missing_files = files -- existing_files
if missing_files != [] do
IO.puts("Missing files: #{inspect(missing_files)}")
else
{:ok, stream} = PcapFileEx.Merge.stream(existing_files)
endProblem: Memory Usage Growing
Symptom: Memory increases during merge
Cause: Using Enum.to_list/1 instead of streaming
Solution:
# ❌ Bad: Loads all packets into memory
{:ok, stream} = PcapFileEx.Merge.stream(files)
all_packets = Enum.to_list(stream) # Memory grows!
# ✅ Good: Process lazily
{:ok, stream} = PcapFileEx.Merge.stream(files)
stream
|> Stream.filter(fn packet -> :http in packet.protocols end)
|> Stream.take(100)
|> Enum.each(&process_packet/1) # Constant memoryProblem: Interface IDs Don't Match
Symptom: For PCAPNG merges, packet.interface_id != packet.interface.id
This should never happen! If you see this, it's a bug in PcapFileEx. Please report with:
# Include this debugging info in bug report
{:ok, stream} = PcapFileEx.Merge.stream(files, annotate_source: true)
stream
|> Enum.take(10)
|> Enum.each(fn {packet, metadata} ->
if packet.interface && packet.interface_id != packet.interface.id do
IO.puts("BUG: Interface ID mismatch!")
IO.puts(" Source: #{metadata.source_file}")
IO.puts(" packet.interface_id: #{packet.interface_id}")
IO.puts(" packet.interface.id: #{packet.interface.id}")
IO.puts(" Original ID: #{metadata.original_interface_id}")
IO.puts(" Remapped ID: #{metadata.remapped_interface_id}")
end
end)Related Documentation
- Usage Rules - Main usage guide with decision trees
- Performance Guide - Performance optimization strategies
- Format Guide - PCAP vs PCAPNG differences
- Examples - Complete working examples
- README.md - Clock synchronization setup instructions