Traffic Flows Analysis Guide

View Source

Overview

The PcapFileEx.Flows module provides a unified API to analyze PCAP files and identify traffic flows by protocol (HTTP/1, HTTP/2, UDP).

Quick Start

# Analyze a PCAP file
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng")

# Access flows by protocol
IO.puts("HTTP/1 flows: #{length(result.http1)}")
IO.puts("HTTP/2 flows: #{length(result.http2)}")
IO.puts("UDP flows: #{length(result.udp)}")

Key Concepts

AnalysisResult

The main result structure containing all flows:

%PcapFileEx.Flows.AnalysisResult{
  flows: %{FlowKey.t() => flow_ref()},   # O(1) lookup map
  http1: [HTTP1.Flow.t()],               # Sorted by first exchange timestamp
  http2: [HTTP2.Flow.t()],               # Sorted by first stream timestamp
  udp: [UDP.Flow.t()],                   # Sorted by first datagram timestamp
  timeline: [TimelineEvent.t()],         # Unified timeline
  stats: Stats.t()                       # Aggregate statistics
}

FlowKey

Stable identity for O(1) flow lookups:

key = PcapFileEx.FlowKey.new(:http2, client_endpoint, server_endpoint)
flow = PcapFileEx.Flows.AnalysisResult.get_flow(result, key)

Flow

Base flow identity with display and authoritative fields:

%PcapFileEx.Flow{
  protocol: :http2,
  from: "web-client",           # Display: hostname (no port)
  server: "api-gateway:8080",   # Display: host:port
  client: "web-client:54321",   # Display: host:port
  server_endpoint: %Endpoint{}, # Authoritative
  client_endpoint: %Endpoint{}  # Authoritative
}

TimelineEvent

For unified playback across protocols:

Enum.each(result.timeline, fn event ->
  data = PcapFileEx.Flows.AnalysisResult.get_event(result, event)

  case data do
    %HTTP1.Exchange{} -> handle_http1(data)
    %HTTP2.Stream{} -> handle_http2(data)
    %UDP.Datagram{} -> handle_udp(data)
  end
end)

Protocol-Specific Flows

HTTP/1 Flows

Enum.each(result.http1, fn flow ->
  IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")

  Enum.each(flow.exchanges, fn exchange ->
    IO.puts("  #{exchange.request.method} #{exchange.request.path}")

    if exchange.complete do
      IO.puts("    -> #{exchange.response.status} (#{exchange.response_delay_ms}ms)")
    end
  end)
end)

HTTP/2 Flows

HTTP/2 uses "streams" to match HTTP/2 spec terminology:

Enum.each(result.http2, fn flow ->
  IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")

  # Complete streams
  Enum.each(flow.streams, fn stream ->
    ex = stream.exchange
    IO.puts("  #{ex.request.method} #{ex.request.path} -> #{ex.response.status}")
    IO.puts("    Response delay: #{stream.response_delay_ms}ms")
  end)

  # Incomplete streams (RST_STREAM, GOAWAY, truncated)
  Enum.each(flow.incomplete, fn inc ->
    IO.puts("  Incomplete stream #{inc.stream_id}: #{inc.reason}")
  end)
end)

UDP Flows

UDP flows are grouped by server (destination) only:

Enum.each(result.udp, fn flow ->
  # UDP flows have from: :any since sources can vary
  IO.puts("UDP to #{flow.flow.server}: #{length(flow.datagrams)} datagrams")

  Enum.each(flow.datagrams, fn dg ->
    IO.puts("  #{dg.from} -> #{dg.to}: #{dg.size} bytes @ +#{dg.relative_offset_ms}ms")
  end)
end)

Playback Timing

HTTP Response Delay

# HTTP/1
exchange.response_delay_ms  # Time from request to response

# HTTP/2
stream.response_delay_ms    # Time from request start to response completion

# Example playback
def playback_http1(exchange) do
  send_request(exchange.request)
  Process.sleep(exchange.response_delay_ms)
  send_response(exchange.response)
end

UDP Relative Offset

# First datagram in flow has relative_offset_ms = 0
datagram.relative_offset_ms  # Offset from flow start

# Example playback
def playback_udp(flow) do
  start_time = System.monotonic_time(:millisecond)

  Enum.each(flow.datagrams, fn dg ->
    elapsed = System.monotonic_time(:millisecond) - start_time
    remaining = dg.relative_offset_ms - elapsed
    if remaining > 0, do: Process.sleep(remaining)

    send_udp(dg.to, dg.payload)
  end)
end

Hosts Mapping

Resolve IP addresses to human-readable hostnames:

hosts = %{
  "192.168.1.10" => "api-gateway",
  "192.168.1.20" => "metrics-collector",
  "192.168.1.30" => "web-client"
}

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# Now flows show friendly names
result.http2
|> Enum.map(fn f -> {f.flow.from, f.flow.server} end)
# => [{"web-client", "api-gateway:8080"}, ...]

Protocol Detection

TCP flows are classified by content inspection:

  • HTTP/2: Connection preface "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
  • HTTP/1: Request methods (GET, POST, etc.) or HTTP/ response
alias PcapFileEx.Flows.ProtocolDetector

ProtocolDetector.detect("GET / HTTP/1.1\r\n")  # => :http1
ProtocolDetector.detect("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")  # => :http2
ProtocolDetector.detect(<<0, 1, 2, 3>>)  # => :unknown

Options

PcapFileEx.Flows.analyze("capture.pcapng",
  hosts_map: %{...},      # IP to hostname mapping
  decode_content: true,   # Decode HTTP bodies (default: true)
  tcp_port: 8080,         # Filter TCP to specific port
  udp_port: 5005          # Filter UDP to specific port
)

Common Patterns

Filter by Client

result.http2
|> Enum.filter(fn f -> f.flow.from == "web-client" end)
|> Enum.flat_map(& &1.streams)

Get All Requests

all_requests =
  result.http1
  |> Enum.flat_map(& &1.exchanges)
  |> Enum.map(& &1.request)

http2_requests =
  result.http2
  |> Enum.flat_map(& &1.streams)
  |> Enum.map(& &1.exchange.request)

Find Errors

# HTTP errors
errors =
  result.http1
  |> Enum.flat_map(& &1.exchanges)
  |> Enum.filter(fn ex -> ex.complete and ex.response.status >= 400 end)

# Incomplete HTTP/2 streams
incomplete =
  result.http2
  |> Enum.flat_map(& &1.incomplete)

Calculate Statistics

# Total bytes across all flows
total_bytes =
  result.http1
  |> Enum.map(& &1.stats.byte_count)
  |> Enum.sum()

# Duration of a flow
flow = hd(result.http2)
IO.puts("Duration: #{flow.stats.duration_ms}ms")

Data Structures

HTTP1.Exchange

%HTTP1.Exchange{
  flow_seq: 0,                   # Index within flow's exchange list
  request: %{
    method: "GET",
    path: "/api/users",
    version: "1.1",
    headers: %{"host" => "api.example.com"},
    body: "",
    decoded_body: nil,
    timestamp: %Timestamp{}
  },
  response: %{
    status: 200,
    reason: "OK",
    version: "1.1",
    headers: %{"content-type" => "application/json"},
    body: "{...}",
    decoded_body: {:json, %{...}},
    timestamp: %Timestamp{}
  },
  start_timestamp: %Timestamp{},
  end_timestamp: %Timestamp{},
  response_delay_ms: 150,
  complete: true
}

HTTP2.Stream

%HTTP2.Stream{
  flow_seq: 0,                   # Index within flow's stream list
  exchange: %HTTP2.Exchange{},   # Full HTTP/2 exchange
  start_timestamp: %Timestamp{}, # Converted from DateTime
  response_delay_ms: 75          # Exchange duration (see Known Limitations)
}

UDP.Datagram

%UDP.Datagram{
  flow_seq: 0,                   # Index within flow's datagram list
  from: %Endpoint{},
  to: %Endpoint{},
  payload: <<...>>,
  timestamp: %Timestamp{},
  relative_offset_ms: 0,         # Offset from flow start
  size: 1024
}

Best Practices

  1. Use FlowKey for lookups - O(1) access instead of iterating

  2. Check complete for HTTP - Incomplete exchanges have nil response

  3. Use streams for HTTP/2 - Matches HTTP/2 spec terminology

  4. Use timeline for playback - Maintains chronological order across protocols

  5. Apply hosts_map early - Makes logs and debugging more readable

  6. Understand flow_seq vs seq_num - flow_seq is the index within a flow's event list; seq_num is only in TimelineEvent for timeline position

Known Limitations

HTTP/1 Timestamp Coarseness

HTTP/1 request/response timestamps use the first TCP segment timestamp for each direction. This means:

  • Multiple pipelined requests share the same start_timestamp
  • response_delay_ms may not reflect true per-request latency for pipelined traffic

Workaround: For precise timing, analyze flows with single request/response exchanges.

HTTP/2 response_delay_ms

HTTP2.Stream.response_delay_ms is the full exchange duration (request start → response complete), not time-to-first-byte (TTFB). For large response bodies, this over-estimates actual response latency.

Workaround: For TTFB approximations, consider using the underlying exchange.start_timestamp and exchange.end_timestamp along with response body size.

FlowKey Host Independence

FlowKey lookups ignore the host field in endpoints. This means you can look up flows using keys built with or without hosts_map applied - both will find the same flow.