Traffic Flows Analysis Guide

View Source

Overview

The PcapFileEx.Flows module provides a unified API to analyze PCAP files and identify traffic flows by protocol (HTTP/1, HTTP/2, UDP).

Quick Start

# Analyze a PCAP file
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng")

# Access flows by protocol
IO.puts("HTTP/1 flows: #{length(result.http1)}")
IO.puts("HTTP/2 flows: #{length(result.http2)}")
IO.puts("UDP flows: #{length(result.udp)}")

Key Concepts

AnalysisResult

The main result structure containing all flows:

%PcapFileEx.Flows.AnalysisResult{
  flows: %{FlowKey.t() => flow_ref()},   # O(1) lookup map
  http1: [HTTP1.Flow.t()],               # Sorted by first exchange timestamp
  http2: [HTTP2.Flow.t()],               # Sorted by first stream timestamp
  udp: [UDP.Flow.t()],                   # Sorted by first datagram timestamp
  timeline: [TimelineEvent.t()],         # Unified timeline
  stats: Stats.t(),                      # Aggregate statistics
  summary: Summary.t()                   # Pre-aggregated traffic for topology
}

FlowKey

Stable identity for O(1) flow lookups:

key = PcapFileEx.FlowKey.new(:http2, client_endpoint, server_endpoint)
flow = PcapFileEx.Flows.AnalysisResult.get_flow(result, key)

Flow

Base flow identity with display and authoritative fields:

%PcapFileEx.Flow{
  protocol: :http2,
  from: "web-client",           # Display: hostname (no port)
  server: "api-gateway:8080",   # Display: host:port
  client: "web-client:54321",   # Display: host:port
  server_endpoint: %Endpoint{}, # Authoritative
  client_endpoint: %Endpoint{}  # Authoritative
}

TimelineEvent

For unified playback across protocols:

Enum.each(result.timeline, fn event ->
  data = PcapFileEx.Flows.AnalysisResult.get_event(result, event)

  case data do
    %HTTP1.Exchange{} -> handle_http1(data)
    %HTTP2.Stream{} -> handle_http2(data)
    %UDP.Datagram{} -> handle_udp(data)
  end
end)

Protocol-Specific Flows

HTTP/1 Flows

Enum.each(result.http1, fn flow ->
  IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")

  Enum.each(flow.exchanges, fn exchange ->
    IO.puts("  #{exchange.request.method} #{exchange.request.path}")

    if exchange.complete do
      IO.puts("    -> #{exchange.response.status} (#{exchange.response_delay_ms}ms)")
    end
  end)
end)

HTTP/2 Flows

HTTP/2 uses "streams" to match HTTP/2 spec terminology:

Enum.each(result.http2, fn flow ->
  IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")

  # Complete streams
  Enum.each(flow.streams, fn stream ->
    ex = stream.exchange
    IO.puts("  #{ex.request.method} #{ex.request.path} -> #{ex.response.status}")
    IO.puts("    Response delay: #{stream.response_delay_ms}ms")
  end)

  # Incomplete streams (RST_STREAM, GOAWAY, truncated)
  Enum.each(flow.incomplete, fn inc ->
    IO.puts("  Incomplete stream #{inc.stream_id}: #{inc.reason}")
  end)
end)

UDP Flows

UDP flows are grouped by server (destination) only:

Enum.each(result.udp, fn flow ->
  # UDP flows have from: :any since sources can vary
  IO.puts("UDP to #{flow.flow.server}: #{length(flow.datagrams)} datagrams")

  Enum.each(flow.datagrams, fn dg ->
    IO.puts("  #{dg.from} -> #{dg.to}: #{dg.size} bytes @ +#{dg.relative_offset_ms}ms")
  end)
end)

Playback Timing

HTTP Response Delay

# HTTP/1
exchange.response_delay_ms  # Time from request to response

# HTTP/2
stream.response_delay_ms    # Time from request start to response completion

# Example playback
def playback_http1(exchange) do
  send_request(exchange.request)
  Process.sleep(exchange.response_delay_ms)
  send_response(exchange.response)
end

UDP Relative Offset

# First datagram in flow has relative_offset_ms = 0
datagram.relative_offset_ms  # Offset from flow start

# Example playback
def playback_udp(flow) do
  start_time = System.monotonic_time(:millisecond)

  Enum.each(flow.datagrams, fn dg ->
    elapsed = System.monotonic_time(:millisecond) - start_time
    remaining = dg.relative_offset_ms - elapsed
    if remaining > 0, do: Process.sleep(remaining)

    send_udp(dg.to, dg.payload)
  end)
end

Hosts Mapping

Resolve IP addresses to human-readable hostnames:

hosts = %{
  "192.168.1.10" => "api-gateway",
  "192.168.1.20" => "metrics-collector",
  "192.168.1.30" => "web-client"
}

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# Now flows show friendly names
result.http2
|> Enum.map(fn f -> {f.flow.from, f.flow.server} end)
# => [{"web-client", "api-gateway:8080"}, ...]

Traffic Summary

The summary field provides pre-aggregated traffic data for network topology visualization:

%Summary{
  udp: [%UDPService{}, ...],    # UDP destinations with per-client stats
  http1: [%HTTPService{}, ...], # HTTP/1 servers with per-client stats
  http2: [%HTTPService{}, ...]  # HTTP/2 servers with per-client stats
}

Use Cases

  • Network diagrams - Show services and connected clients
  • Traffic aggregation - Total bytes/requests per service
  • Client analysis - Which clients connect to which services

Accessing Summary

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# Services sorted by traffic volume (bytes desc)
result.summary.http2
|> Enum.each(fn service ->
  IO.puts("#{service.server_host || service.server}")
  IO.puts("  Total: #{service.total_requests} requests, #{service.total_response_bytes} bytes")
  IO.puts("  Methods: #{inspect(service.methods)}")
  IO.puts("  Status codes: #{inspect(service.status_codes)}")

  Enum.each(service.clients, fn client ->
    IO.puts("  - #{client.client_host || client.client}: #{client.request_count} requests")
  end)
end)

# UDP summary
result.summary.udp
|> Enum.each(fn service ->
  IO.puts("UDP #{service.server_host || service.server}:")
  IO.puts("  Total: #{service.total_packets} packets, #{service.total_bytes} bytes")

  Enum.each(service.clients, fn client ->
    IO.puts("  - #{client.client_host || client.client}: #{client.packet_count} packets")
  end)
end)

Summary Data Structures

HTTPService

%Summary.HTTPService{
  protocol: :http1 | :http2,
  server: "192.168.1.10:8080",       # IP:port string
  server_host: "api-gateway",        # Hostname (from hosts_map)
  clients: [%HTTPClientStats{}, ...],
  total_requests: 150,
  total_responses: 148,
  total_request_bytes: 45000,
  total_response_bytes: 1200000,
  methods: %{"GET" => 100, "POST" => 50},
  status_codes: %{200 => 140, 404 => 5, 500 => 3},
  first_timestamp: %Timestamp{},
  last_timestamp: %Timestamp{}
}

HTTPClientStats

%Summary.HTTPClientStats{
  client: "10.0.0.5",               # Client IP (no port - ephemeral)
  client_host: "web-client",        # Hostname (from hosts_map)
  connection_count: 3,              # TCP connections
  stream_count: 45,                 # HTTP/2 streams (nil for HTTP/1)
  request_count: 45,
  response_count: 44,
  request_bytes: 12000,
  response_bytes: 350000,
  methods: %{"GET" => 40, "POST" => 5},
  status_codes: %{200 => 42, 404 => 2},
  avg_response_time_ms: 75,
  min_response_time_ms: 12,
  max_response_time_ms: 450,
  first_timestamp: %Timestamp{},
  last_timestamp: %Timestamp{}
}

UDPService

%Summary.UDPService{
  server: "192.168.1.20:5005",      # IP:port string
  server_host: "metrics-collector", # Hostname (from hosts_map)
  clients: [%UDPClientStats{}, ...],
  total_packets: 5000,
  total_bytes: 2500000,
  first_timestamp: %Timestamp{},
  last_timestamp: %Timestamp{}
}

UDPClientStats

%Summary.UDPClientStats{
  client: "10.0.0.5",               # Client IP (no port)
  client_host: "sensor-node",       # Hostname (from hosts_map)
  packet_count: 1200,
  total_bytes: 600000,
  avg_size: 500,
  min_size: 64,
  max_size: 1400,
  first_timestamp: %Timestamp{},
  last_timestamp: %Timestamp{}
}

Rendering Summary

Use Summary.Render to generate markdown tables or Mermaid flowcharts:

alias PcapFileEx.Flows.Summary.Render

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# Markdown tables
markdown = Render.to_markdown(result.summary)
IO.puts(markdown)

# Mermaid flowchart
mermaid = Render.to_mermaid(result.summary)
IO.puts(mermaid)

to_markdown/2 Options

Render.to_markdown(summary,
  title: true,           # Add "## HTTP Traffic" / "## UDP Traffic" headers
  humanize_bytes: false, # Format as "1.5 MB" instead of "1500000"
  protocol: :all         # :all, :http1, :http2, or :udp
)

to_mermaid/2 Options

Render.to_mermaid(summary,
  style: :host,          # :host (default) or :service
  direction: :lr,        # :lr (left-right), :tb (top-bottom), :rl, :bt
  group_by: :protocol    # :protocol (subgraphs per protocol) or :none (only for :service)
)

Styles:

  • :host (default) - Unified host nodes, protocol/port on edges. Hosts that act as both client AND server appear as a single node.
  • :service - Each service (host:port) is a separate node, grouped by protocol with Clients subgraph

Example Mermaid Output (style: :host - default)

flowchart LR
  web_client[web-client]
  api_gateway[api-gateway]
  metrics[metrics]
  web_client -->|"HTTP/2 :8080 (45 req)"| api_gateway
  web_client -->|"UDP :5005 (100 pkts)"| api_gateway

The default host-centric view:

  • Uses unified node IDs (hosts that are both client and server appear once)
  • No subgraphs - roles are clear from arrow direction
  • Protocol and port information shown on edges

Example Mermaid Output (style: :service)

Render.to_mermaid(summary, style: :service)
flowchart LR
  subgraph Clients
    c_web_client[web-client]
    c_sensor_1[sensor-1]
  end
  subgraph HTTP/2
    shttp2_0[api-gateway:8080]
  end
  subgraph UDP
    sudp_0[metrics:5005]
  end
  c_web_client -->|"45 req"| shttp2_0
  c_sensor_1 -->|"1200 pkts"| sudp_0

The service-centric view groups nodes by protocol with separate client/server nodes.

Protocol Detection

TCP flows are classified by content inspection:

  • HTTP/2: Connection preface "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
  • HTTP/1: Request methods (GET, POST, etc.) or HTTP/ response
alias PcapFileEx.Flows.ProtocolDetector

ProtocolDetector.detect("GET / HTTP/1.1\r\n")  # => :http1
ProtocolDetector.detect("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")  # => :http2
ProtocolDetector.detect(<<0, 1, 2, 3>>)  # => :unknown

Options

PcapFileEx.Flows.analyze("capture.pcapng",
  hosts_map: %{...},      # IP to hostname mapping
  decode_content: true,   # Decode HTTP bodies (default: true)
  decoders: [...],        # Custom decoder specs (see flows-decoders.md)
  keep_binary: false,     # Preserve original binary after decoding (default: false)
  unwrap_custom: true,    # Return custom decoder results directly (default: true)
  tcp_port: 8080,         # Filter TCP to specific port
  udp_port: 5005          # Filter UDP to specific port
)

unwrap_custom Option

Controls how custom decoder results are returned:

  • unwrap_custom: true (default) - Decoder results returned directly
  • unwrap_custom: false - Results wrapped in {:custom, ...} tuples
# Default behavior: decoder results are unwrapped
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", decoders: [my_decoder])
datagram.payload  # => {:my_telemetry, %{sensor: 1, temp: 23.5}}

# With unwrap_custom: false: wrapped in {:custom, ...}
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng",
  decoders: [my_decoder],
  unwrap_custom: false
)
datagram.payload  # => {:custom, {:my_telemetry, %{sensor: 1, temp: 23.5}}}

See flows-decoders.md for full custom decoder documentation.

Common Patterns

Filter by Client

result.http2
|> Enum.filter(fn f -> f.flow.from == "web-client" end)
|> Enum.flat_map(& &1.streams)

Get All Requests

all_requests =
  result.http1
  |> Enum.flat_map(& &1.exchanges)
  |> Enum.map(& &1.request)

http2_requests =
  result.http2
  |> Enum.flat_map(& &1.streams)
  |> Enum.map(& &1.exchange.request)

Find Errors

# HTTP errors
errors =
  result.http1
  |> Enum.flat_map(& &1.exchanges)
  |> Enum.filter(fn ex -> ex.complete and ex.response.status >= 400 end)

# Incomplete HTTP/2 streams
incomplete =
  result.http2
  |> Enum.flat_map(& &1.incomplete)

Calculate Statistics

# Total bytes across all flows
total_bytes =
  result.http1
  |> Enum.map(& &1.stats.byte_count)
  |> Enum.sum()

# Duration of a flow
flow = hd(result.http2)
IO.puts("Duration: #{flow.stats.duration_ms}ms")

Data Structures

HTTP1.Exchange

%HTTP1.Exchange{
  flow_seq: 0,                   # Index within flow's exchange list
  request: %{
    method: "GET",
    path: "/api/users",
    version: "1.1",
    headers: %{"host" => "api.example.com"},
    body: "",
    decoded_body: nil,
    timestamp: %Timestamp{}
  },
  response: %{
    status: 200,
    reason: "OK",
    version: "1.1",
    headers: %{"content-type" => "application/json"},
    body: "{...}",
    decoded_body: {:json, %{...}},
    timestamp: %Timestamp{}
  },
  start_timestamp: %Timestamp{},
  end_timestamp: %Timestamp{},
  response_delay_ms: 150,
  complete: true
}

HTTP2.Stream

%HTTP2.Stream{
  flow_seq: 0,                   # Index within flow's stream list
  exchange: %HTTP2.Exchange{},   # Full HTTP/2 exchange
  start_timestamp: %Timestamp{}, # Converted from DateTime
  response_delay_ms: 75          # Exchange duration (see Known Limitations)
}

UDP.Datagram

%UDP.Datagram{
  flow_seq: 0,                   # Index within flow's datagram list
  from: %Endpoint{},
  to: %Endpoint{},
  payload: <<...>>,
  timestamp: %Timestamp{},
  relative_offset_ms: 0,         # Offset from flow start
  size: 1024
}

Best Practices

  1. Use FlowKey for lookups - O(1) access instead of iterating

  2. Check complete for HTTP - Incomplete exchanges have nil response

  3. Use streams for HTTP/2 - Matches HTTP/2 spec terminology

  4. Use timeline for playback - Maintains chronological order across protocols

  5. Apply hosts_map early - Makes logs and debugging more readable

  6. Understand flow_seq vs seq_num - flow_seq is the index within a flow's event list; seq_num is only in TimelineEvent for timeline position

Known Limitations

HTTP/1 Timestamp Coarseness

HTTP/1 request/response timestamps use the first TCP segment timestamp for each direction. This means:

  • Multiple pipelined requests share the same start_timestamp
  • response_delay_ms may not reflect true per-request latency for pipelined traffic

Workaround: For precise timing, analyze flows with single request/response exchanges.

HTTP/2 response_delay_ms

HTTP2.Stream.response_delay_ms is the full exchange duration (request start → response complete), not time-to-first-byte (TTFB). For large response bodies, this over-estimates actual response latency.

Workaround: For TTFB approximations, consider using the underlying exchange.start_timestamp and exchange.end_timestamp along with response body size.

FlowKey Host Independence

FlowKey lookups ignore the host field in endpoints. This means you can look up flows using keys built with or without hosts_map applied - both will find the same flow.