PcapFileEx

View Source

High-performance Elixir library for reading and parsing PCAP (Packet Capture) files.

Features

  • Fast Binary Parsing - Rust NIF implementation for high performance
  • Pre-Filtering - BPF-style filtering in Rust layer (10-100x speedup for selective queries)
  • Memory Efficient - Lazy streaming support for large files
  • Type Safe - Elixir structs with proper typespecs
  • Simple API - Easy-to-use functions for common tasks
  • PCAP Support - Read legacy PCAP format files (microsecond and nanosecond precision)
  • PCAPNG Support - Read next-generation PCAPNG format files
  • Interface Metadata - Surface interface descriptors and timestamp resolution from PCAPNG captures
  • Timestamp Precision - Automatic detection and support for both microsecond and nanosecond timestamp formats
  • Auto-Detection - Automatic format detection based on magic numbers
  • Cross-Platform - Works with PCAP files from macOS (microsecond) and Linux (nanosecond) without conversion
  • TCP Reassembly - Reassemble HTTP messages split across multiple TCP packets
  • HTTP Body Decoding - Automatic decoding of JSON, ETF, form data, and text bodies
  • HTTP/2 Analysis - Reconstruct HTTP/2 cleartext (h2c) request/response exchanges from PCAP files
  • Traffic Flows Analysis - Unified API to identify and group traffic by protocol (HTTP/1, HTTP/2, UDP)
  • Hosts Mapping - Map IP addresses to human-readable hostnames for easier analysis
  • Statistics - Compute packet counts, sizes, time ranges, and distributions
  • Filtering - Rich DSL for filtering packets by size, time, content
  • Multi-File Merge - Merge multiple captures by nanosecond-precision timestamps with clock validation
  • PCAP/PCAPNG Writing - Create, export, filter, and convert captures with format auto-detection
  • Validation - File format validation and accessibility checks
  • Property-Based Testing - 94 property tests with StreamData for comprehensive edge case coverage

Supported Platforms

PcapFileEx ships with precompiled NIFs for the following platforms (inspired by elixir-explorer/explorer):

ARM Architectures

  • macOS (Apple Silicon): aarch64-apple-darwin - M1, M2, M3, M4 chips
  • Linux (ARM64): aarch64-unknown-linux-gnu - Raspberry Pi 4/5, ARM servers

x86_64 Architectures

  • Linux (Intel/AMD): x86_64-unknown-linux-gnu - Ubuntu, Debian, Fedora, RHEL, etc.
  • Windows (MSVC): x86_64-pc-windows-msvc - Visual Studio toolchain
  • Windows (GNU): x86_64-pc-windows-gnu - MinGW/MSYS2 toolchain
  • FreeBSD: x86_64-unknown-freebsd - FreeBSD 12+

CPU Variants

For x86_64 platforms (Linux, Windows, FreeBSD), two binary variants are available:

  1. Default - Optimized with modern CPU features (AVX, FMA, SSE4.2, POPCNT)

    • Best performance on CPUs from ~2013 onwards (Intel Haswell, AMD Excavator or newer)
  2. Legacy CPU - Compatible with older processors

    • Use when you see "Illegal instruction" errors on older hardware
    • Automatically selected on Linux based on CPU detection
    • Manually enable with: PCAP_FILE_EX_USE_LEGACY_ARTIFACTS=1

Total precompiled binaries: 10 (6 base targets + 4 legacy variants)

Build from Source

If your platform isn't listed or you prefer to compile locally:

# Force local compilation
PCAP_FILE_EX_BUILD=1 mix deps.compile pcap_file_ex

Requirements: Rust toolchain (cargo, rustc) - tested with 1.91.0+

Intel Mac Users: Precompiled binaries for x86_64-apple-darwin were removed in v0.5.2. You can either compile from source using PCAP_FILE_EX_BUILD=1 or stay on v0.5.1.

Installation

From Git (Current)

Add pcap_file_ex as a Git dependency in your mix.exs:

def deps do
  [
    {:pcap_file_ex, git: "https://github.com/lucian/pcap_file_ex.git"}
  ]
end

Then fetch dependencies and compile:

mix deps.get
mix compile

Requirements:

  • Elixir ~> 1.19 (tested with 1.19.2)
  • Erlang/OTP 28+ (tested with 28.1.1)
  • Rust toolchain (cargo, rustc) - Only required when:
    • Using as a Git dependency (not yet published to Hex)
    • Forcing local build with PCAP_FILE_EX_BUILD=1
    • Platform not in the supported platforms list above

Note: When using as a Git dependency, the native code will be compiled automatically during mix compile. Once published to Hex, precompiled binaries will be used automatically for supported platforms.

From Hex

def deps do
  [
    {:pcap_file_ex, "~> 0.5.0"}
  ]
end

Precompiled binaries are downloaded automatically for supported platforms.

Getting Started

New to this project? Get up and running in seconds:

Quick Setup

# Clone the repository
git clone https://github.com/lucian/pcap_file_ex.git
cd pcap_file_ex

# One-command setup (installs deps, tools, git hooks)
mix setup

# Verify your environment
mix check.doctor

What mix setup does:

  1. Fetches Elixir dependencies
  2. Installs cargo-outdated and cargo-deny (Rust security tools)
  3. Compiles the project (including Rust NIFs)
  4. Installs git hooks for quality checks

What mix check.doctor verifies:

  • ✓ Elixir version (>= 1.18)
  • ✓ Erlang/OTP version
  • ✓ Rust/Cargo installed
  • ✓ cargo-outdated installed
  • ✓ cargo-deny installed
  • ✓ Git hooks configured

Prerequisites

Before running mix setup, ensure you have:

Development Workflow

# Run tests
mix test

# Run all quality checks locally (format, lint, test)
mix ci

# Check for outdated dependencies
mix deps.check

# Format code
mix format

Git Hooks

Git hooks run automatically to catch issues before CI:

Pre-commit (fast ~5-10s):

  • Format checks (Elixir + Rust)
  • Linting (Credo)

Pre-push (slower ~30-60s):

  • Full test suite
  • Dialyzer type checking
  • Rust linting (Clippy)
  • Security audit (cargo-deny)

Skip hooks when needed:

git commit --no-verify
git push --no-verify

First Steps

  1. Run the test suite to ensure everything works:

    mix test
    
  2. Explore the examples in this README

  3. Read the architecture in the project's development documentation

  4. Check the roadmap to see what's planned

Troubleshooting Setup

If mix setup fails or mix check.doctor shows errors:

Missing Rust:

# Install Rust via rustup
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
source $HOME/.cargo/env

Cargo tools installation fails:

# Install manually
cargo install cargo-outdated cargo-deny

Git hooks not installed:

# Install manually
mix git_hooks.install

See the Development Setup section below for detailed environment configuration including dumpcap for test fixture generation.

AI-Assisted Development

This library includes comprehensive usage rules for LLM-based coding assistants. If you're using AI tools like Claude Code, GitHub Copilot, or Cursor, the library provides detailed guidance to help generate correct, performant code.

For AI Assistants: See usage-rules.md for complete API guidance, common patterns, and performance best practices.

Key guidance includes:

  • Automatic format detection (always use PcapFileEx.open/1)
  • Filtering strategy selection (PreFilter for large files = 10-100x faster)
  • Resource management patterns
  • HTTP body auto-decoding
  • Performance optimization techniques

To integrate with your AI workflow using the usage_rules package:

# In your mix.exs
{:usage_rules, "~> 0.1", only: [:dev]}

# Then sync to your project's AI instructions
mix usage_rules.sync CLAUDE.md pcap_file_ex

Development Setup

Prerequisites

For developing and testing PcapFileEx, you'll need:

  • Elixir ~> 1.19 (tested with 1.19.2)
  • Erlang/OTP 28+ (tested with 28.1.1)
  • Rust toolchain (cargo, rustc) - For compiling native extensions (tested with 1.91.0)
  • dumpcap - For generating test fixtures (optional but recommended)
  • Python 3 - For test traffic generation scripts

Tidewave MCP Integration (Optional)

This project supports Tidewave MCP for enhanced development with live code evaluation and documentation access.

Setup:

  1. Add Tidewave to your dependencies (if not already present):

    # mix.exs
    def deps do
    [
     {:tidewave, "~> 0.1", only: :dev}
    ]
    end
  2. Start the Tidewave MCP server (choose one):

Option A: Background server (no IEx shell)

mix tidewave

Option B: Interactive IEx shell with MCP server

iex -S mix tidewave-iex

Both options start a Bandit server on port 4000 with the Tidewave plug. Use Option B when you want both MCP access and an interactive Elixir shell for manual testing.

  1. MCP configuration (.mcp.json - already configured in this project):
    {
    "mcpServers": {
     "tidewave": {
       "type": "http",
       "url": "http://localhost:4000/tidewave/mcp"
     }
    }
    }

Available Tools:

  • mcp__tidewave__project_eval - Run Elixir code in project context
  • mcp__tidewave__get_docs - Access module/function documentation
  • mcp__tidewave__get_source_location - Find source definitions
  • mcp__tidewave__get_logs - View application logs
  • mcp__tidewave__search_package_docs - Search dependency documentation

Example Usage:

# Test a function
mcp__tidewave__project_eval({
  code: "PcapFileEx.Packet.new(1234567890, 0, 100, <<1,2,3>>)"
})

# Get documentation
mcp__tidewave__get_docs({reference: "PcapFileEx.Pcap.open/1"})

# Find source location
mcp__tidewave__get_source_location({reference: "PcapFileEx.Stream"})

This is particularly useful when working with AI coding assistants like Claude Code, as it provides live introspection of your running Elixir project.

Installing dumpcap

dumpcap is used to generate test fixtures. While optional, some tests will be skipped without it.

macOS

brew install wireshark

This installs dumpcap with ChmodBPF, allowing packet capture without sudo.

Linux (Ubuntu/Debian)

# Install dumpcap
sudo apt-get install tshark

# Setup non-root packet capture (recommended)
sudo dpkg-reconfigure wireshark-common  # Select "Yes"
sudo usermod -aG wireshark $USER
newgrp wireshark  # Or logout/login to activate group

Linux (Fedora/RHEL)

sudo dnf install wireshark-cli
sudo usermod -aG wireshark $USER
newgrp wireshark

Linux (Arch)

sudo pacman -S wireshark-cli
sudo usermod -aG wireshark $USER
newgrp wireshark

Running Tests

# Clone repository
git clone https://github.com/lucian/pcap_file_ex.git
cd pcap_file_ex

# Fetch dependencies
mix deps.get

# Compile (includes Rust NIF)
mix compile

# Run tests (auto-generates fixtures on first run)
mix test

Manual fixture generation:

# Generate all fixtures
mix test.fixtures

# Or manually
cd test/fixtures
./capture_test_traffic.sh

Verifying dumpcap Setup

Check if dumpcap has proper permissions:

dumpcap -D

This should list available network interfaces. If you see a permission error, see the Troubleshooting section below.

Quick Start

Read all packets

# Works with both PCAP and PCAPNG (auto-detected)
{:ok, packets} = PcapFileEx.read_all("capture.pcap")
{:ok, packets} = PcapFileEx.read_all("capture.pcapng")

Enum.each(packets, fn packet ->
  IO.puts("#{packet.timestamp}: #{byte_size(packet.data)} bytes")
end)

# Opt out of automatic decoding when you only need raw payloads
{:ok, raw_packets} = PcapFileEx.read_all("capture.pcapng", decode: false)
# Works with both formats - automatically detected
# v0.2.0+: stream/1 returns {:ok, stream} | {:error, reason}
{:ok, stream} = PcapFileEx.stream("large_capture.pcap")

# v0.2.0+: Safe streams emit {:ok, packet} and {:error, metadata} tuples
# Extract packets with pattern matching
stream
|> Stream.map(fn {:ok, packet} -> packet end)
|> Stream.filter(fn packet -> byte_size(packet.data) > 1000 end)
|> Stream.map(fn packet -> parse_packet(packet.data) end)
|> Enum.take(100)

# Or use stream!/1 for convenience (raises on errors)
PcapFileEx.stream!("large_capture.pcapng")
|> Enum.count()

# Disable automatic decoder attachment for performance-sensitive pipelines
{:ok, stream} = PcapFileEx.stream("large_capture.pcapng", decode: false)
stream
|> Stream.map(fn {:ok, packet} -> byte_size(packet.data) end)
|> Enum.sum()

Error Handling in Streams (v0.2.0+)

Safe stream variants emit tagged tuples, allowing graceful handling of corrupted files:

{:ok, stream} = PcapFileEx.stream("possibly_corrupted.pcap")

# Stop on first error
result = Enum.reduce_while(stream, [], fn
  {:ok, packet}, acc -> {:cont, [packet | acc]}
  {:error, %{packet_index: i, reason: r}}, _acc ->
    {:halt, {:error, "Failed at packet #{i}: #{r}"}}
end)

case result do
  packets when is_list(packets) -> {:ok, Enum.reverse(packets)}
  {:error, reason} -> IO.puts("Error: #{reason}")
end

# Skip errors and continue (collect partial results)
valid_packets =
  stream
  |> Stream.filter(fn
    {:ok, _} -> true
    {:error, %{packet_index: i, reason: r}} ->
      Logger.warning("Skipping packet #{i}: #{r}")
      false
  end)
  |> Stream.map(fn {:ok, packet} -> packet end)
  |> Enum.to_list()

# Collect both packets and errors
{packets, errors} = Enum.reduce(stream, {[], []}, fn
  {:ok, packet}, {pkts, errs} -> {[packet | pkts], errs}
  {:error, meta}, {pkts, errs} -> {pkts, [meta | errs]}
end)

IO.puts("Processed #{length(packets)} packets, #{length(errors)} errors")

Manual control

{:ok, reader} = PcapFileEx.open("capture.pcap")

# Access file header
IO.inspect(reader.header.datalink)      # "ethernet"
IO.inspect(reader.header.snaplen)       # 65535

# Read packets one by one
{:ok, packet} = PcapFileEx.Pcap.next_packet(reader)
IO.inspect(packet.timestamp)
IO.inspect(packet.orig_len)

# Close when done
PcapFileEx.Pcap.close(reader)

Inspect PCAPNG interfaces

{:ok, reader} = PcapFileEx.open("capture.pcapng")
{:ok, interfaces} = PcapFileEx.PcapNg.interfaces(reader)
Enum.each(interfaces, fn iface ->
  IO.puts("#{iface.id}: #{iface.name || iface.linktype} (#{iface.timestamp_resolution})")
end)

Each packet from a PCAPNG capture also carries interface_id, interface, and timestamp_resolution fields so you can attribute traffic to specific capture interfaces.

Examples

Filter by packet size

{:ok, stream} = PcapFileEx.stream("capture.pcap")

large_packets =
  stream
  |> Stream.filter(fn packet -> byte_size(packet.data) > 1500 end)
  |> Enum.to_list()

Count packets

{:ok, stream} = PcapFileEx.stream("capture.pcap")

count = stream |> Enum.count()

IO.puts("Total packets: #{count}")

Time range analysis

start_time = ~U[2025-11-02 10:00:00Z]
end_time = ~U[2025-11-02 11:00:00Z]

{:ok, stream} = PcapFileEx.stream("capture.pcap")

packets_in_range =
  stream
  |> Stream.filter(fn packet ->
    DateTime.compare(packet.timestamp, start_time) != :lt and
    DateTime.compare(packet.timestamp, end_time) != :gt
  end)
  |> Enum.to_list()

Process in batches

{:ok, stream} = PcapFileEx.stream("capture.pcap")

stream
|> Stream.chunk_every(1000)
|> Enum.each(fn batch ->
  # Process 1000 packets at a time
  analyze_batch(batch)
end)

Compute statistics

{:ok, stats} = PcapFileEx.Stats.compute("capture.pcap")
IO.puts("Packets: #{stats.packet_count}")
IO.puts("Total bytes: #{stats.total_bytes}")
IO.puts("Duration: #{stats.duration_seconds}s")
IO.puts("Avg packet size: #{stats.avg_packet_size}")

# For large files (>100MB), use streaming (constant memory)
{:ok, stats} = PcapFileEx.Stats.compute_streaming("huge_10gb.pcap")

# Combine with filtering
{:ok, stream} = PcapFileEx.stream("capture.pcap")

tcp_stats =
  stream
  |> Stream.filter(fn p -> :tcp in p.protocols end)
  |> PcapFileEx.Stats.compute_streaming()

Filter packets

# Chain multiple filters
{:ok, stream} = PcapFileEx.stream("capture.pcap")

stream
|> PcapFileEx.Filter.by_size(100..1500)
|> PcapFileEx.Filter.larger_than(500)
|> PcapFileEx.Filter.contains("HTTP")
|> Enum.take(10)

# Time-based filtering
start_time = ~U[2025-11-02 10:00:00Z]
end_time = ~U[2025-11-02 11:00:00Z]

{:ok, stream} = PcapFileEx.stream("capture.pcap")

stream
|> PcapFileEx.Filter.by_time_range(start_time, end_time)
|> Enum.to_list()

Pre-filtering (High Performance)

Pre-filtering applies filters in the Rust layer before packets are deserialized to Elixir, providing 10-100x speedup for selective queries on large files.

alias PcapFileEx.PreFilter

# Open a reader and set pre-filters
{:ok, reader} = PcapFileEx.Pcap.open("large_capture.pcap")

# Filter for TCP traffic on port 80
filters = [
  PreFilter.protocol("tcp"),
  PreFilter.port_dest(80)
]
:ok = PcapFileEx.Pcap.set_filter(reader, filters)

# Stream only matching packets (filtered in Rust!)
{:ok, stream} = PcapFileEx.Stream.from_reader(reader)
packets = stream |> Enum.take(100)

PcapFileEx.Pcap.close(reader)

# Also works with PCAPNG
{:ok, reader} = PcapFileEx.PcapNg.open("capture.pcapng")
:ok = PcapFileEx.PcapNg.set_filter(reader, [
  PreFilter.ip_source_cidr("192.168.1.0/24"),
  PreFilter.size_min(1000)
])
{:ok, stream} = PcapFileEx.Stream.from_reader(reader)
packets = stream |> Enum.to_list()
PcapFileEx.PcapNg.close(reader)

# Available filter types:
# - PreFilter.ip_source("1.2.3.4")
# - PreFilter.ip_dest("1.2.3.4")
# - PreFilter.ip_source_cidr("192.168.0.0/16")
# - PreFilter.ip_dest_cidr("10.0.0.0/8")
# - PreFilter.port_source(8080)
# - PreFilter.port_dest(443)
# - PreFilter.port_source_range(8000, 9000)
# - PreFilter.port_dest_range(80, 443)
# - PreFilter.protocol("tcp") # tcp, udp, icmp, ipv4, ipv6
# - PreFilter.size_min(100)
# - PreFilter.size_max(1500)
# - PreFilter.size_range(100, 1500)
# - PreFilter.timestamp_min(unix_seconds)
# - PreFilter.timestamp_max(unix_seconds)
# - PreFilter.all([filter1, filter2]) # AND
# - PreFilter.any([filter1, filter2]) # OR
# - PreFilter.negate(filter) # NOT

Performance: Pre-filters skip non-matching packets before creating Elixir terms, dramatically reducing memory allocation, GC pressure, and CPU usage. Benchmarks show 7-52x speedup depending on filter selectivity.

Filter by protocol

# Pull only HTTP application payloads
{:ok, stream} = PcapFileEx.stream("capture.pcapng")

http_packets =
  stream
  |> PcapFileEx.Filter.by_protocol(:http)
  |> Enum.to_list()

# Transport-level filtering works the same way
PcapFileEx.stream!("capture.pcapng")
|> PcapFileEx.Filter.by_protocol(:tcp)
|> Enum.take(5)

# Decode filtered packets into structured HTTP messages
decoded_http =
  PcapFileEx.stream!("capture.pcapng")
  |> PcapFileEx.Filter.by_protocol(:http)
  |> Enum.map(&PcapFileEx.Packet.decode_http!/1)

# Keep packet metadata + decoded payloads
packets_with_decoded =
  PcapFileEx.stream!("capture.pcapng")
  |> Enum.map(&PcapFileEx.Packet.attach_decoded/1)

Enum.each(packets_with_decoded, fn packet ->
  IO.inspect(%{
    timestamp: packet.timestamp,
    src: PcapFileEx.Packet.endpoint_to_string(packet.src),
    dst: PcapFileEx.Packet.endpoint_to_string(packet.dst),
    protocol: packet.protocol,
    decoded: packet.decoded
  })
end)

Decode with the pkt library

{:ok, packets} = PcapFileEx.read_all("capture.pcapng")
packet = hd(packets)
decoded = PcapFileEx.Packet.pkt_decode!(packet)
IO.inspect(decoded)

# Inspect supported protocol atoms
IO.inspect(PcapFileEx.Packet.known_protocols())

# Try application decoders registered at runtime
case PcapFileEx.Packet.decode_registered(packet) do
  {:ok, {protocol, value}} -> IO.inspect({protocol, value})
  :no_match -> :noop
  {:error, reason} -> IO.warn("decoder failed: #{inspect(reason)}")
end

decode_registered/1 leaves the packet untouched; call PcapFileEx.DecoderRegistry.unregister/1 when you want to remove a custom decoder.

Display filters

PcapFileEx.stream!("capture.pcapng")
|> PcapFileEx.DisplayFilter.filter("ip.src == 127.0.0.1 && http.request.method == \"GET\"")
|> Enum.to_list()

# Precompile when reusing across streams
{:ok, filter} = PcapFileEx.DisplayFilter.compile("tcp.srcport == 8899")

PcapFileEx.stream!("capture.pcapng")
|> PcapFileEx.DisplayFilter.run(filter)
|> Enum.take(5)

# Inspect available fields
PcapFileEx.DisplayFilter.FieldRegistry.fields()

Validate files

{:ok, :pcap} = PcapFileEx.Validator.validate("capture.pcap")
true = PcapFileEx.Validator.pcap?("capture.pcap")
{:ok, size} = PcapFileEx.Validator.file_size("capture.pcap")

Multi-file timeline merge

Merge multiple PCAP/PCAPNG files captured on different machines into a single chronological stream. Ideal for correlating traffic from multiple network taps or distributed systems.

# Basic merge - chronologically sorted by nanosecond-precision timestamps
{:ok, stream} = PcapFileEx.Merge.stream(["server1.pcap", "server2.pcap"])
packets = Enum.to_list(stream)

# Track which file each packet came from
{:ok, stream} = PcapFileEx.Merge.stream(
  ["tap1.pcap", "tap2.pcap"],
  annotate_source: true
)

Enum.each(stream, fn {packet, metadata} ->
  IO.puts("Packet from #{metadata.source_file} at #{metadata.packet_index}")
end)

# Validate clock synchronization before merging
case PcapFileEx.Merge.validate_clocks(["server1.pcap", "server2.pcap"]) do
  {:ok, stats} ->
    IO.puts("Max clock drift: #{stats.max_drift_ms}ms")
    {:ok, stream} = PcapFileEx.Merge.stream(["server1.pcap", "server2.pcap"])
  {:error, :excessive_drift, meta} ->
    IO.puts("Clock drift too large: #{meta.max_drift_ms}ms - check NTP sync")
end

# Count total packets across multiple files
count = PcapFileEx.Merge.count(["server1.pcap", "server2.pcap"])

Important: For accurate multi-file merging, synchronize clocks on all capture systems using NTP (Network Time Protocol) or chronyd. See Clock Synchronization for Multi-File Merge below for setup instructions.

Features:

  • Nanosecond precision - Preserves full timestamp accuracy
  • Memory efficient - O(N files) memory using streaming priority queue
  • Mixed formats - Merges PCAP and PCAPNG files together
  • Datalink validation - Ensures compatible link-layer protocols
  • PCAPNG interface remapping - Handles multi-interface captures automatically
  • Source annotation - Optional tracking of source file for each packet
  • Clock validation - Detects excessive clock drift

Clock Synchronization for Multi-File Merge

When merging PCAP files from multiple machines, accurate clock synchronization is critical. Without synchronized clocks, packets may be merged in the wrong order, breaking protocol flows and making analysis unreliable.

Why Clock Synchronization Matters

  • Chronological accuracy: Packets must be ordered by actual capture time, not local clock time
  • Protocol reconstruction: TCP reassembly requires correct packet ordering
  • Distributed tracing: Correlating events across systems needs synchronized timestamps
  • Forensic analysis: Timeline accuracy is essential for incident investigation

chronyd is a modern, high-performance NTP implementation that provides better clock synchronization than the older ntpd. It's especially effective on systems with:

  • Intermittent network connectivity
  • Virtual machines
  • Systems that suspend/resume frequently

Installation

Linux (Ubuntu/Debian)

# Install chronyd
sudo apt-get update
sudo apt-get install chrony

# Start and enable service
sudo systemctl start chronyd
sudo systemctl enable chronyd

Linux (Fedora/RHEL/CentOS)

# Install chronyd (usually pre-installed)
sudo dnf install chrony

# Start and enable service
sudo systemctl start chronyd
sudo systemctl enable chronyd

macOS

# macOS uses built-in ptp (Precision Time Protocol)
# No additional installation needed - managed by System Preferences

# Verify NTP is enabled
sudo systemsetup -getusingnetworktime

# Enable if needed
sudo systemsetup -setusingnetworktime on

Configuration

Basic chronyd configuration (/etc/chrony/chrony.conf):

# Use public NTP pool servers (default)
pool 2.pool.ntp.org iburst

# Or use specific time servers (recommended for production)
server time.cloudflare.com iburst
server time.google.com iburst
server time.apple.com iburst

# Record system clock drift
driftfile /var/lib/chrony/drift

# Allow system clock to be stepped in first three updates
# if offset > 1 second (good for VMs or systems with inaccurate clocks)
makestep 1.0 3

# Enable kernel synchronization of real-time clock (RTC)
rtcsync

After editing configuration:

sudo systemctl restart chronyd

Verification

Check chronyd status:

# View synchronization status
chronyc tracking

# Expected output:
# Reference ID    : A29FC87B (time.cloudflare.com)
# Stratum         : 3
# Ref time (UTC)  : Sat Nov 09 17:30:00 2025
# System time     : 0.000012389 seconds fast of NTP time
# Last offset     : +0.000005123 seconds
# RMS offset      : 0.000008234 seconds
# ...

# View NTP sources
chronyc sources

# Expected output shows multiple time sources with * indicating current sync:
# MS Name/IP address         Stratum Poll Reach LastRx Last sample
# ===============================================================================
# ^* time.cloudflare.com           1   6   377    23   +123us[ +156us] +/-   15ms
# ^- time.google.com               1   6   377    24   +234us[ +267us] +/-   20ms
# ^+ time.apple.com                1   6   377    25   +345us[ +378us] +/-   18ms

Good synchronization indicators:

  • System time offset < 1ms (ideally < 100µs)
  • Stratum ≤ 3 (distance from reference clock)
  • Last offset small (< 1ms recent drift)
  • Multiple sources reachable (* or + markers)

Check for excessive drift:

# On each capture system
chronyc tracking | grep "System time"

# If offset > 10ms between systems, wait for convergence or investigate:
# - Network issues
# - Firewall blocking NTP (UDP port 123)
# - Local time zone misconfiguration
# - Hardware clock issues

Validation in PcapFileEx

Before merging files, validate clock synchronization:

case PcapFileEx.Merge.validate_clocks(["server1.pcap", "server2.pcap", "server3.pcap"]) do
  {:ok, stats} ->
    IO.puts("✓ Clock validation passed")
    IO.puts("  Max drift: #{Float.round(stats.max_drift_ms, 2)}ms")

    # Show per-file timing stats
    Enum.each(stats.files, fn file ->
      IO.puts("  #{file.path}:")
      IO.puts("    First packet: #{file.first_timestamp}")
      IO.puts("    Duration: #{Float.round(file.duration_ms, 2)}ms")
    end)

    # Proceed with merge
    {:ok, stream} = PcapFileEx.Merge.stream([
      "server1.pcap",
      "server2.pcap",
      "server3.pcap"
    ])

  {:error, :excessive_drift, meta} ->
    IO.puts("✗ Clock validation failed")
    IO.puts("  Max drift: #{Float.round(meta.max_drift_ms, 2)}ms (threshold: 1000ms)")
    IO.puts("\nRecommendations:")
    IO.puts("  1. Verify chronyd is running on all capture systems")
    IO.puts("  2. Check chronyc tracking on each system")
    IO.puts("  3. Ensure NTP traffic (UDP 123) is not blocked")
    IO.puts("  4. Wait for clock convergence (may take 5-10 minutes)")
end

Best Practices

  1. Start chronyd before captures: Let clocks synchronize for 5-10 minutes before starting packet capture
  2. Use consistent NTP servers: Configure all systems to use the same NTP pool or servers
  3. Monitor during capture: Check chronyc tracking periodically during long captures
  4. Validate before merge: Always use PcapFileEx.Merge.validate_clocks/1 before merging
  5. Document time source: Record NTP configuration in capture metadata
  6. Use nanosecond precision: Prefer PCAP-ng format with nanosecond timestamps when possible

Acceptable Clock Drift

  • < 1ms: Excellent - suitable for high-precision protocol analysis
  • 1-10ms: Good - acceptable for most distributed system analysis
  • 10-100ms: Fair - may affect fine-grained timing analysis
  • 100-1000ms: Poor - noticeable ordering issues possible
  • > 1000ms: Unacceptable - PcapFileEx.Merge.validate_clocks/1 will fail

If drift exceeds 1000ms, the merge operation will fail by default to prevent incorrect chronological ordering.

Export and Write PCAP Files

Create new PCAP files, filter existing captures, or convert between formats.

Quick Export (Filter and Write)

# Extract HTTP traffic to new file
PcapFileEx.export_filtered!(
  "full_capture.pcap",
  "http_only.pcap",
  fn packet -> :http in packet.protocols end
)

# Time range extraction
start_time = ~U[2025-11-09 10:00:00Z]
end_time = ~U[2025-11-09 11:00:00Z]

PcapFileEx.export_filtered!(
  "full_day.pcapng",
  "incident_window.pcapng",
  fn packet ->
    DateTime.compare(packet.timestamp, start_time) != :lt and
    DateTime.compare(packet.timestamp, end_time) != :gt
  end
)

# Filter by packet size (>1000 bytes)
PcapFileEx.export_filtered!(
  "capture.pcap",
  "large_packets.pcap",
  fn packet -> byte_size(packet.data) > 1000 end
)

Format Conversion

# Convert PCAP to PCAPNG (preserves all packets)
PcapFileEx.copy("legacy.pcap", "modern.pcapng", format: :pcapng)

# Convert PCAPNG to PCAP
PcapFileEx.copy("capture.pcapng", "legacy.pcap", format: :pcap)

# Auto-detect format from extension
PcapFileEx.copy("input.pcap", "output.pcapng")  # Detects .pcapng extension

Timestamp Manipulation

# Shift all timestamps to start at Unix epoch (anonymization)
{:ok, packets} = PcapFileEx.read_all("original.pcap")
normalized = PcapFileEx.TimestampShift.normalize_to_epoch(packets)
{:ok, header} = PcapFileEx.get_header("original.pcap")
PcapFileEx.write!("anonymized.pcap", header, normalized)

# Shift by specific offset (e.g., +1 hour in nanoseconds)
one_hour_ns = 3_600_000_000_000
shifted = PcapFileEx.TimestampShift.shift_all(packets, one_hour_ns)
PcapFileEx.write!("time_shifted.pcap", header, shifted)

Manual Control (Streaming Writes)

For large files or when you need fine-grained control:

# Low-level PCAP writing
{:ok, header} = PcapFileEx.get_header("input.pcap")
{:ok, writer} = PcapFileEx.PcapWriter.open("output.pcap", header)

PcapFileEx.stream!("input.pcap")
|> Stream.filter(fn packet -> byte_size(packet.data) > 1000 end)
|> Enum.each(fn packet ->
  :ok = PcapFileEx.PcapWriter.write_packet(writer, packet)
end)

:ok = PcapFileEx.PcapWriter.close(writer)

Batch vs Streaming

# ✅ Small datasets (<1000 packets) - batch write
{:ok, packets} = PcapFileEx.read_all("small.pcap")
filtered = Enum.filter(packets, fn p -> :tcp in p.protocols end)
{:ok, header} = PcapFileEx.get_header("small.pcap")
PcapFileEx.write!("tcp_only.pcap", header, filtered)

# ✅ Large datasets (>1GB) - use export_filtered (streaming)
PcapFileEx.export_filtered!(
  "huge_50gb.pcapng",
  "filtered.pcap",
  fn p -> :tcp in p.protocols end
)

PCAPNG Multi-Interface Writing

# Create PCAPNG with multiple interfaces
interfaces = [
  %PcapFileEx.Interface{
    id: 0,
    linktype: "ethernet",
    snaplen: 65535,
    name: "eth0",
    timestamp_resolution: :microsecond,
    timestamp_resolution_raw: "microsecond",
    timestamp_offset_secs: 0
  },
  %PcapFileEx.Interface{
    id: 1,
    linktype: "wifi",
    snaplen: 65535,
    name: "wlan0",
    timestamp_resolution: :nanosecond,
    timestamp_resolution_raw: "nanosecond",
    timestamp_offset_secs: 0
  }
]

# Packets must have interface_id set for PCAPNG
packets = [
  %PcapFileEx.Packet{
    timestamp_precise: PcapFileEx.Timestamp.new(1000, 0),
    orig_len: 100,
    data: <<...>>,
    interface_id: 0  # Uses eth0 interface
  },
  %PcapFileEx.Packet{
    timestamp_precise: PcapFileEx.Timestamp.new(1001, 0),
    orig_len: 200,
    data: <<...>>,
    interface_id: 1  # Uses wlan0 interface
  }
]

{:ok, count} = PcapFileEx.PcapNgWriter.write_all(
  "multi_interface.pcapng",
  interfaces,
  packets
)

Note: Append mode has limitations in v0.4.0:

  • PCAP append: Not supported by upstream crate (returns clear error)
  • PCAPNG append: Not implemented in MVP
  • Future versions will add PCAPNG append support

Timestamp Precision Support

PcapFileEx automatically detects and supports both microsecond and nanosecond timestamp precision in PCAP files:

PCAP Magic Numbers

PCAP files identify their format and timestamp precision via magic numbers in the file header:

Magic NumberEndiannessTimestamp PrecisionDefault Platform
0xD4C3B2A1Little-endianMicrosecond (µs)macOS dumpcap
0xA1B2C3D4Big-endianMicrosecond (µs)-
0x4D3CB2A1Little-endianNanosecond (ns)Linux dumpcap
0xA1B23C4DBig-endianNanosecond (ns)-

Cross-Platform Compatibility

All formats are automatically detected and supported without configuration:

# macOS PCAP (microsecond precision)
{:ok, macos_reader} = PcapFileEx.Pcap.open("capture_macos.pcap")
assert macos_reader.header.ts_resolution == "microsecond"

# Linux PCAP (nanosecond precision)
{:ok, linux_reader} = PcapFileEx.Pcap.open("capture_linux.pcap")
assert linux_reader.header.ts_resolution == "nanosecond"

# Both formats read packets identically
{:ok, packets} = PcapFileEx.Pcap.read_all("any_pcap_file.pcap")

No Timestamp Conversion

Timestamps are preserved in their original precision - there is no automatic conversion between microsecond and nanosecond formats. This ensures:

  • ✅ Data integrity - original capture precision maintained
  • ✅ Lossless processing - no rounding or truncation
  • ✅ Cross-platform consistency - files from different OSes work identically

PCAPNG Format

PCAPNG files have their own timestamp resolution metadata and are fully supported on all platforms.

Nanosecond Precision Timestamps

New in v0.2.0: Full nanosecond precision support for accurate time analysis and packet sorting.

Elixir's DateTime type has a limitation: it only supports microsecond precision (6 decimal places), not nanosecond precision (9 decimal places). This means timestamps from nanosecond-resolution PCAP files get truncated.

To solve this, PcapFileEx now provides two timestamp fields on each packet:

%PcapFileEx.Packet{
  timestamp: ~U[2024-11-08 11:24:09.735188Z],  # DateTime (microsecond precision)
  timestamp_precise: %PcapFileEx.Timestamp{     # FULL nanosecond precision
    secs: 1731065049,
    nanos: 735188123  # All 9 digits preserved!
  },
  # ... other fields
}

When to use which field:

  • timestamp (DateTime) - Use for display, logging, and when microsecond precision is sufficient
  • timestamp_precise (Timestamp) - Use for sorting, merging multiple files, or precise time analysis

Example: Merging packets from multiple files chronologically

# Read packets from multiple PCAP files
files = ["capture1.pcapng", "capture2.pcapng", "capture3.pcapng"]

all_packets =
  files
  |> Enum.flat_map(fn file ->
    {:ok, packets} = PcapFileEx.read_all(file)
    packets
  end)
  |> Enum.sort_by(& &1.timestamp_precise, PcapFileEx.Timestamp)

# Now all packets are in chronological order with nanosecond precision

Example: Calculate precise time differences

{:ok, packets} = PcapFileEx.read_all("capture.pcapng")
[first, second | _] = packets

# Get difference in nanoseconds
diff_nanos = PcapFileEx.Timestamp.diff(second.timestamp_precise, first.timestamp_precise)
IO.puts("Time between packets: #{diff_nanos} nanoseconds")

# Convert to other units
diff_micros = div(diff_nanos, 1000)
diff_millis = div(diff_nanos, 1_000_000)

Timestamp API:

alias PcapFileEx.Timestamp

# Create a timestamp
ts = Timestamp.new(secs, nanos)

# Convert to total nanoseconds (useful for comparisons)
total_ns = Timestamp.to_unix_nanos(ts)
# => 1731065049735188123

# Convert to DateTime (loses nanosecond precision)
dt = Timestamp.to_datetime(ts)
# => ~U[2024-11-08 11:24:09.735188Z]

# Compare timestamps
Timestamp.compare(ts1, ts2)  # => :lt | :eq | :gt

# Calculate difference in nanoseconds
Timestamp.diff(ts1, ts2)  # => integer (nanoseconds)

Backward Compatibility:

Existing code continues to work unchanged - the timestamp field is still a DateTime for convenience:

# Your existing code still works!
packet.timestamp.year  # => 2024
packet.timestamp.month  # => 11
DateTime.compare(packet.timestamp, some_datetime)  # => :lt

See PcapFileEx.Timestamp module documentation for complete API details.

Data Structures

Packet

%PcapFileEx.Packet{
  timestamp: ~U[2025-11-02 12:34:56.123456Z],               # DateTime (microsecond precision)
  timestamp_precise: %PcapFileEx.Timestamp{...},            # Full nanosecond precision (v0.2.0+)
  orig_len: 1514,                                           # Original packet length
  data: <<0x00, 0x01, 0x02, ...>>,                          # Raw packet data (binary)
  datalink: "ethernet",                                      # Link-layer type for the packet
  protocols: [:ether, :ipv4, :tcp, :http],                  # Ordered protocol stack
  protocol: :tcp,                                            # Highest decoded protocol (:tcp, :udp, ...)
  src: %PcapFileEx.Endpoint{ip: "127.0.0.1", port: 55014},
  dst: %PcapFileEx.Endpoint{ip: "127.0.0.1", port: 8899},
  layers: [:ipv4, :tcp, :http],                             # Protocol layers (cached)
  payload: "GET /hello ...",                                 # Payload used during decoding
  decoded: %{http: %PcapFileEx.HTTP{...}}                    # Cached decoded payloads
}

Loopback captures are normalized automatically: the 4-byte pseudo-header is removed and datalink is remapped to "ipv4"/"ipv6" so that protocol decoders operate directly on the payload. Call PcapFileEx.Packet.pkt_decode/1 or pkt_decode!/1 to hand packets to the pkt library with the correct link type. Discover supported protocol atoms via PcapFileEx.Packet.known_protocols/0. Use PcapFileEx.Packet.attach_decoded/1 to stash decoded payloads back on the packet struct, or call PcapFileEx.Packet.decode_registered!/1 to fetch them directly.

Packets are decoded automatically using registered decoders. Pass decode: false to PcapFileEx.read_all/2 or PcapFileEx.stream/2 when you only need raw payloads without attaching decoded metadata.

Pattern matching on endpoints is now straightforward:

case packet.src do
  %PcapFileEx.Endpoint{ip: "127.0.0.1", port: 8899} -> :ok
  _ -> :other
end

Custom Decoders

You can extend the application-layer protocol support by registering additional decoders.

New API (v0.5.0+) - Matchers can return context to decoders:

PcapFileEx.DecoderRegistry.register(%{
  protocol: :my_proto,
  matcher: fn layers, payload ->
    # Extract context from layers when matching
    if Enum.any?(layers, &match?({:udp, _, _, _, _, _}, &1)) do
      case MyProto.decode(IO.iodata_to_binary(payload)) do
        {:ok, decoded} -> {:match, decoded}  # Cache decoded result
        _ -> false
      end
    else
      false
    end
  end,
  decoder: fn cached_decoded, _payload ->
    # Use cached result from matcher
    {:ok, cached_decoded}
  end,
  fields: [
    %{id: "myproto.value", type: :integer, extractor: fn decoded -> decoded["value"] end},
    %{id: "myproto.sensor", type: :string, extractor: fn decoded -> decoded["sensor"] end}
  ]
})

Legacy API (still supported with deprecation warnings):

PcapFileEx.DecoderRegistry.register(%{
  protocol: :my_proto,
  matcher: fn layers, payload ->
    Enum.any?(layers, &match?({:udp, _, _, _, _, _}, &1)) and
      MyProto.match?(IO.iodata_to_binary(payload))
  end,
  decoder: fn payload -> {:ok, MyProto.decode(IO.iodata_to_binary(payload))} end,
  fields: [...]
})

# Read packets using the custom decoder
{:ok, packets} = PcapFileEx.read_all("capture.pcapng")
packet = Enum.find(packets, &(:my_proto in &1.protocols))
{:ok, {:my_proto, decoded}} = PcapFileEx.Packet.decode_registered(packet)

# Persist the decoded payload on the packet struct
packet = PcapFileEx.Packet.attach_decoded(packet)
decoded = packet.decoded[:my_proto]

# Or get the decoded value directly (raises on decoder error)
decoded = PcapFileEx.Packet.decode_registered!(packet)

# Use the fields in display filters
PcapFileEx.stream!("capture.pcapng")
|> Enum.map(&PcapFileEx.Packet.attach_decoded/1)
|> PcapFileEx.DisplayFilter.filter("myproto.value >= 25")
|> Enum.to_list()

Remove a decoder with PcapFileEx.DecoderRegistry.unregister/1. Inspiration for protocol analysis logic can be taken from Wireshark dissectors (see the Lua dissector example).

Reassemble HTTP streams

# Lazily reconstruct HTTP requests with payloads that span multiple packets
PcapFileEx.TCP.stream_http_messages("captures/fixture.pcapng", types: [:request])
|> Enum.each(fn message ->
  IO.puts("#{message.http.method} #{message.http.uri} -> #{byte_size(message.http.body)} bytes")

  # Access automatically decoded body
  case message.http.decoded_body do
    map when is_map(map) -> IO.inspect(map, label: "JSON/ETF data")
    text when is_binary(text) -> IO.puts("Text: #{text}")
    nil -> IO.puts("Empty body")
  end
end)

# Responses are available too
PcapFileEx.TCP.stream_http_messages("captures/fixture.pcapng", types: [:response])
|> Enum.take(3)

# Filter by decoded content
PcapFileEx.TCP.stream_http_messages("capture.pcapng")
|> Stream.filter(fn msg ->
  is_map(msg.http.decoded_body) and msg.http.decoded_body["status"] == "error"
end)
|> Enum.to_list()

The helper buffers TCP payloads per direction until the full HTTP message is assembled (based on Content-Length when present) and returns %PcapFileEx.TCP.HTTPMessage{} structs with the decoded %PcapFileEx.HTTP{} payload.

HTTP Message with Automatic Body Decoding

%PcapFileEx.HTTP{
  type: :response,
  version: "1.0",
  status_code: 200,
  reason_phrase: "OK",
  headers: %{"content-type" => "application/json", "server" => "SimpleHTTP/0.6 Python/3.13.5"},
  body: "{\"message\":\"Hello, World!\"}",
  body_length: 28,
  complete?: true,
  raw: "HTTP/1.0 200 OK...",
  decoded_body: %{"message" => "Hello, World!"}  # Automatically decoded!
}

Automatic Body Decoding

HTTP bodies are automatically decoded based on content-type and magic bytes:

  • Erlang Term Format (ETF) - Detected by magic byte 131, decoded with :erlang.binary_to_term/1
  • JSON - When Content-Type contains "json", decoded with Jason (if available)
  • Form data - application/x-www-form-urlencoded decoded to a map
  • Text - text/* content-types returned as-is
  • Binary - Unknown types returned as raw binary

If decoding fails (e.g., malformed JSON), the raw binary is preserved. The decoded_body field is nil for empty bodies.

# Example: Filter JSON responses by decoded content
"capture.pcapng"
|> PcapFileEx.TCP.stream_http_responses()
|> Stream.filter(fn msg ->
  is_map(msg.http.decoded_body) and
  Map.get(msg.http.decoded_body, "status") == "success"
end)
|> Enum.to_list()

# Example: Inspect Erlang terms from ETF-encoded requests
"capture.pcapng"
|> PcapFileEx.TCP.stream_http_requests()
|> Enum.each(fn msg ->
  case msg.http.decoded_body do
    term when not is_binary(term) ->
      IO.inspect(term, label: "Decoded ETF term")
    _ -> :skip
  end
end)

Use PcapFileEx.Packet.decode_http/1 (or decode_http!/1) to obtain this structure directly from TCP payloads.

Hosts Mapping

Map IP addresses to human-readable hostnames for easier analysis:

# Define your hosts mapping
hosts = %{
  "172.25.0.4" => "api-gateway",
  "172.65.251.78" => "client-service",
  "10.0.0.1" => "database"
}

# Apply to streaming
{:ok, stream} = PcapFileEx.stream("capture.pcap", hosts_map: hosts)

stream
|> Stream.map(fn {:ok, packet} -> packet end)
|> Enum.each(fn packet ->
  # Endpoints now show hostnames when available
  IO.puts("#{packet.src} -> #{packet.dst}")
  # Output: "client-service:39604 -> api-gateway:9091"
end)

# Apply to read_all
{:ok, packets} = PcapFileEx.read_all("capture.pcap", hosts_map: hosts)

# Apply to HTTP/2 analysis
{:ok, complete, _incomplete} = PcapFileEx.HTTP2.analyze("capture.pcap", hosts_map: hosts)

Enum.each(complete, fn ex ->
  if PcapFileEx.HTTP2.Exchange.client_identified?(ex) do
    IO.puts("#{ex.client} -> #{ex.server}: #{ex.request.method} #{ex.request.path}")
    # Output: "client-service:39604 -> api-gateway:9091: GET /api/users"
  else
    {ep_a, ep_b} = PcapFileEx.HTTP2.Exchange.endpoints(ex)
    IO.puts("#{ep_a} <-> #{ep_b}")
  end
end)

# Use Endpoint struct directly
alias PcapFileEx.Endpoint

endpoint = Endpoint.new("172.25.0.4", 9091)
endpoint = Endpoint.with_hosts(endpoint, hosts)
IO.puts("#{endpoint}")  # "api-gateway:9091"

# Create endpoint from IP tuple (useful for custom analysis)
endpoint = Endpoint.from_tuple({{172, 25, 0, 4}, 9091}, hosts)
IO.puts("#{endpoint}")  # "api-gateway:9091"

HTTP/2 Analysis

Analyze HTTP/2 cleartext (h2c) traffic to reconstruct complete request/response exchanges:

# Analyze PCAP file for HTTP/2 exchanges
{:ok, complete, incomplete} = PcapFileEx.HTTP2.analyze("capture.pcap")

# Print complete exchanges
Enum.each(complete, fn ex ->
  IO.puts("#{ex.request.method} #{ex.request.path} -> #{ex.response.status}")
end)

# Filter by port for h2c traffic
{:ok, complete, _} = PcapFileEx.HTTP2.analyze("capture.pcap", port: 8080)

# Find error responses
errors = Enum.filter(complete, fn ex -> ex.response.status >= 400 end)

# Access request/response details
exchange = hd(complete)
exchange.request.method      # "GET"
exchange.request.path        # "/api/users"
exchange.response.status     # 200
exchange.response.body       # "{\"users\": [...]}"

# Access auto-decoded body (based on Content-Type)
case exchange.response.decoded_body do
  {:json, data} -> IO.inspect(data)  # Parsed JSON
  {:text, text} -> IO.puts(text)     # UTF-8 text
  {:multipart, parts} -> Enum.each(parts, &IO.inspect/1)  # Multipart parts
  {:binary, bin} -> IO.puts("Binary: #{byte_size(bin)} bytes")
  nil -> IO.puts("No body")
end

# Disable content decoding for raw binary access
{:ok, complete, _} = PcapFileEx.HTTP2.analyze("capture.pcap", decode_content: false)

# Check incomplete exchanges (RST_STREAM, GOAWAY, truncated)
Enum.each(incomplete, fn ex ->
  IO.puts("Stream #{ex.stream_id}: #{inspect(ex.reason)}")
end)

Limitations:

  • Cleartext only: No TLS-encrypted HTTP/2 (h2) support
  • Prior-knowledge h2c: No HTTP/1.1 Upgrade flow support
  • Analysis only: No playback server implementation

See the PcapFileEx.HTTP2 module documentation for complete patterns and best practices.

Traffic Flows Analysis

Analyze PCAP files to identify and group traffic by protocol (HTTP/1, HTTP/2, UDP):

# Analyze a PCAP file for all traffic flows
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng")

# Access flows by protocol
IO.puts("HTTP/1 flows: #{length(result.http1)}")
IO.puts("HTTP/2 flows: #{length(result.http2)}")
IO.puts("UDP flows: #{length(result.udp)}")

# Query specific flows
result.http2
|> Enum.filter(fn f -> f.flow.from == "web-client" end)
|> Enum.flat_map(& &1.streams)
|> Enum.each(fn stream ->
  IO.puts("#{stream.exchange.request.method} #{stream.exchange.request.path}")
end)

# Playback in timeline order
Enum.each(result.timeline, fn event ->
  data = PcapFileEx.Flows.AnalysisResult.get_event(result, event)
  playback(data)
end)

# With hosts mapping
hosts = %{
  "192.168.1.10" => "api-gateway",
  "192.168.1.20" => "metrics-collector"
}
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# O(1) lookup by FlowKey
key = PcapFileEx.FlowKey.new(:http2, client_endpoint, server_endpoint)
flow = PcapFileEx.Flows.AnalysisResult.get_flow(result, key)

Protocol Detection:

  • HTTP/2: Connection preface "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
  • HTTP/1: Request methods (GET, POST, etc.) or HTTP/ response
  • UDP: Collected separately and grouped by destination server

Features:

  • Unified timeline - All events sorted chronologically with nanosecond precision
  • O(1) flow lookups - FlowKey for efficient flow access by protocol and endpoints
  • Playback timing - response_delay_ms for HTTP, relative_offset_ms for UDP
  • Hosts mapping - Resolve IPs to human-readable hostnames
  • HTTP/1 reconstruction - Request/response pairing with chunked encoding support
  • HTTP/2 integration - Wraps existing HTTP/2 analyzer with flow metadata
  • Custom Decoders - Decode domain-specific protocols (UDP telemetry, 5G SBI multipart, etc.)

Custom Decoders for Flows

Decode protocol-specific payloads with custom decoders:

# Define a decoder for custom binary protocol on UDP port 5005
udp_decoder = %{
  protocol: :udp,
  match: %{port: 5005},
  decoder: &MyTelemetry.decode/1
}

# Define a decoder for 5G SBI multipart parts
ngap_decoder = %{
  protocol: :http1,
  match: %{scope: :multipart_part, content_type: "application/vnd.3gpp.ngap"},
  decoder: fn %{content_id: id}, payload ->
    {:ok, {:ngap, id, NGAP.parse(payload)}}
  end
}

# Analyze with custom decoders
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng",
  decoders: [udp_decoder, ngap_decoder]
)

# Access decoded UDP payload
datagram = hd(hd(result.udp).datagrams)
case datagram.payload do
  {:custom, data} -> IO.inspect(data)
  {:decode_error, reason} -> IO.puts("Failed: #{inspect(reason)}")
  raw when is_binary(raw) -> IO.puts("No decoder matched")
end

# Access decoded HTTP multipart part
exchange = hd(hd(result.http1).exchanges)
case exchange.response.decoded_body do
  {:multipart, parts} ->
    Enum.each(parts, fn part ->
      case part.body do
        {:custom, data} -> IO.inspect(data)
        _ -> :skip
      end
    end)
  _ -> :skip
end

Key features:

  • Binary-only: Custom decoders run only when built-in decoding yields {:binary, payload}
  • Result wrapping: Decoded values wrapped as {:custom, term} to distinguish from built-in decoding
  • Error handling: Decoder failures stored as {:decode_error, reason}
  • Match criteria: Port, content-type, scope, path, method, content-id

Binary Preservation for Playback

When you need both decoded data (for analysis) and original binary (for replay):

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng",
  decoders: [my_decoder],
  keep_binary: true  # Preserve original binary alongside decoded content
)

# UDP: payload_binary contains original when custom decoder was invoked
datagram = hd(hd(result.udp).datagrams)
case datagram.payload do
  {:custom, decoded} ->
    IO.inspect(decoded)  # Decoded for analysis
    replay(datagram.payload_binary)  # Original for playback
  raw when is_binary(raw) ->
    replay(raw)  # No decoder matched
end

# HTTP multipart: body_binary contains original when custom decoder was invoked
case part.body do
  {:custom, decoded} ->
    replay(part.body_binary)  # Original for playback
  _ ->
    :skip
end

Warning: keep_binary: true doubles memory for decoded content.

See PcapFileEx.Flows.Decoder module for complete documentation and decoder templates.

%PcapFileEx.Header{
  version_major: 2,
  version_minor: 4,
  snaplen: 65535,
  datalink: "ethernet",
  ts_resolution: "microsecond",
  endianness: "little"
}

Generating Test Files

Use the included test scripts to generate both PCAP and PCAPNG files with known traffic:

cd test/fixtures
./capture_test_traffic.sh

This generates:

  • sample.pcap - Legacy PCAP format
  • sample.pcapng - Next-generation PCAPNG format

Both files contain the same HTTP traffic for consistent testing.

For large benchmark datasets that mix TCP and UDP across multiple interfaces:

cd test/fixtures
./capture_heavy_traffic.sh --duration 120 --interfaces lo0,en0

This produces large_capture.pcapng (and optionally large_capture.pcap) plus logs detailing the generated HTTP/UDP load.

Or use dumpcap directly:

# PCAPNG format (default)
dumpcap -i any -w capture.pcapng -c 100

# PCAP format (legacy)
dumpcap -i any -w capture.pcap -c 100 -P

See test/fixtures/README.md for more details.

Benchmarks

Benchee benchmarks quantify parsing throughput (packets per second) and filter performance.

  1. Generate a large capture (see capture_heavy_traffic.sh above) or provide your own path.
  2. Install dependencies: mix deps.get
  3. Run the benchmarks:
mix run bench/pcap_parsing.exs
# or specify a custom capture
PCAP_BENCH_FILE=/path/to/capture.pcapng mix run bench/pcap_parsing.exs

Benchmarks cover:

  • Streaming parse throughput with and without automatic decoder attachment
  • UDP-only filtering performance
  • HTTP POST filtering using application-level decoding

Benchee reports iterations-per-second (IPS), average/median runtimes, and memory usage for each scenario. Adjust the capture size, duration, or Benchee options inside bench/pcap_parsing.exs to explore additional workloads.

Architecture

PcapFileEx is a hybrid Elixir/Rust project:

  • Elixir Layer (lib/) - Public API, structs, and Stream protocol
  • Rust Layer (native/pcap_file_ex/) - Fast binary parsing via NIFs
  • Underlying Parser - Wraps the pcap-file Rust crate

This architecture provides:

  • Performance - Rust handles intensive binary parsing
  • Safety - Rustler ensures memory safety across the FFI boundary
  • Ergonomics - Idiomatic Elixir API with proper structs and typespecs

Performance

Streaming allows processing of arbitrarily large PCAP files with minimal memory usage:

# Process a 10GB file with constant memory usage
PcapFileEx.stream!("huge_10gb.pcap")
|> Stream.filter(&interesting?/1)
|> Stream.map(&analyze/1)
|> Enum.take(1000)

Roadmap

Completed Features

  • [x] PCAP format reading
  • [x] PCAPNG format reading
  • [x] Automatic format detection
  • [x] Lazy streaming API
  • [x] Type-safe structs
  • [x] Statistics and analysis
  • [x] Packet filtering DSL
  • [x] File validation
  • [x] Comprehensive tests (352 tests: 227 example-based, 109 property-based, 16 doctests)
  • [x] Property-based testing with StreamData for edge case coverage
  • [x] High-performance pre-filtering in Rust layer
  • [x] HTTP/DNS protocol decoding
  • [x] Nanosecond timestamp precision support
  • [x] Multi-file timeline merge - Chronologically merge multiple PCAP/PCAPNG files with nanosecond precision, interface remapping, source annotation, and clock validation
  • [x] PCAP/PCAPNG writer API - Create, export, filter, and convert captures with format auto-detection, timestamp manipulation, and streaming writes (v0.4.0)
  • [x] HTTP/2 cleartext analysis - Reconstruct HTTP/2 (h2c) request/response exchanges with HPACK header decompression
  • [x] Traffic Flows API - Unified API to identify and group traffic by protocol (HTTP/1, HTTP/2, UDP) with timeline playback support

Planned Features

  • [ ] Display filter → PreFilter compiler - Convert Wireshark-style display filters into PreFilter tuples for familiar syntax
  • [ ] Telemetry hooks - Emit :telemetry events for packet decode, HTTP parsing, and PreFilter hits for observability
  • [ ] Higher-level protocol decoders - TLS, DNS (enhanced) decoders as optional dependencies

Troubleshooting

Tests failing: "No such device" error

Symptoms:

Error: Interface 'lo0' not found

Cause: Interface name mismatch between platforms.

Solution:

On macOS, loopback is lo0. On Linux, it's lo. The scripts auto-detect this, but if you're specifying interfaces manually:

# List available interfaces
cd test/fixtures
./capture_test_traffic.sh --list-interfaces

# Use specific interface
./capture_test_traffic.sh --interfaces en0  # macOS ethernet
./capture_test_traffic.sh --interfaces eth0  # Linux ethernet

Tests failing: "Permission denied" error

Symptoms:

dumpcap: You don't have permission to capture on that device

Cause: dumpcap requires elevated privileges for packet capture.

macOS Solutions

Option 1: Install via Homebrew (Recommended)

brew install wireshark

Wireshark includes ChmodBPF, which grants packet capture permissions automatically.

Option 2: Grant Terminal Permission

  1. Open System Preferences
  2. Go to Security & Privacy → Privacy → Input Monitoring
  3. Click the lock to make changes
  4. Add Terminal.app (or iTerm.app)

Verify it works:

dumpcap -D  # Should list interfaces without error

Linux Solutions

Option 1: Wireshark Group (Recommended)

# Configure Wireshark for non-root capture
sudo dpkg-reconfigure wireshark-common  # Select "Yes"

# Add your user to the wireshark group
sudo usermod -aG wireshark $USER

# Activate the group (or logout/login)
newgrp wireshark

# Verify it works
dumpcap -D  # Should list interfaces without error

Option 2: Set Capabilities Manually

# Give dumpcap specific capabilities
sudo setcap cap_net_raw,cap_net_admin=eip $(which dumpcap)

# Verify
dumpcap -D

Option 3: Run with sudo (Least Secure)

cd test/fixtures
sudo ./capture_test_traffic.sh

This works but requires entering your password and running the entire script as root.

Tests skipped: "Missing dumpcap"

If dumpcap isn't installed, tests that require generated fixtures will be skipped. This is normal.

To fix, install dumpcap (see Development Setup above) and run:

mix test.fixtures

Fixture generation fails

Debug steps:

  1. Check dumpcap is in PATH:

    which dumpcap
    dumpcap -v
    
  2. Check permissions:

    dumpcap -D  # Should list interfaces
    
  3. Try manual generation:

    cd test/fixtures
    ./capture_test_traffic.sh --list-interfaces
    ./capture_test_traffic.sh
    
  4. Check Python is available:

    python3 --version
    
  5. Look at script output: The capture scripts provide detailed error messages.

Still Having Issues?

Contributing

Contributions are welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes with tests
  4. Submit a pull request

Testing

PcapFileEx has a comprehensive test suite including property-based tests:

# Run all tests (303 tests total)
mix test

# Run only property-based tests (94 properties)
mix test test/property_test/

# Run specific property test file
mix test test/property_test/timestamp_property_test.exs

# Generate test capture file
cd test/fixtures
./capture_test_traffic.sh sample.pcapng

Property-Based Testing

The library uses StreamData for property-based testing, automatically testing thousands of edge cases:

Test Coverage:

  • Timestamp operations (18 properties) - Comparison transitivity, diff commutativity, monotonicity
  • Packet structures (14 properties) - Invariants like orig_len >= data_size, timestamp validity
  • Filter operations (20 properties) - Count preservation, idempotence, composition correctness
  • Stream behaviors (16 properties) - Lazy evaluation, filter equivalence, pagination
  • Decoding robustness (13 properties) - Never raises, endpoint validation, protocol consistency
  • Edge cases - Boundary timestamps (epoch, year 2038), truncated packets, empty streams

Environment-Aware:

  • Local development: 100 iterations per property (~0.9s)
  • CI environment: 1000 iterations per property (set CI=true)

Example property test:

# From test/property_test/timestamp_property_test.exs
property "timestamp comparison is transitive" do
  check all ts1 <- timestamp_generator(),
            ts2 <- timestamp_generator(),
            ts3 <- timestamp_generator() do
    # If ts1 < ts2 and ts2 < ts3, then ts1 < ts3
    if Timestamp.compare(ts1, ts2) == :lt and
       Timestamp.compare(ts2, ts3) == :lt do
      assert Timestamp.compare(ts1, ts3) == :lt
    end
  end
end

See test/property_test/ for all property tests and test/support/generators.ex for reusable generators.

License

MIT License - See LICENSE for details.

Credits