Traffic Flows Analysis Guide
View SourceOverview
The PcapFileEx.Flows module provides a unified API to analyze PCAP files and identify traffic flows by protocol (HTTP/1, HTTP/2, UDP).
Quick Start
# Analyze a PCAP file
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng")
# Access flows by protocol
IO.puts("HTTP/1 flows: #{length(result.http1)}")
IO.puts("HTTP/2 flows: #{length(result.http2)}")
IO.puts("UDP flows: #{length(result.udp)}")Key Concepts
AnalysisResult
The main result structure containing all flows:
%PcapFileEx.Flows.AnalysisResult{
flows: %{FlowKey.t() => flow_ref()}, # O(1) lookup map
http1: [HTTP1.Flow.t()], # Sorted by first exchange timestamp
http2: [HTTP2.Flow.t()], # Sorted by first stream timestamp
udp: [UDP.Flow.t()], # Sorted by first datagram timestamp
timeline: [TimelineEvent.t()], # Unified timeline
stats: Stats.t() # Aggregate statistics
}FlowKey
Stable identity for O(1) flow lookups:
key = PcapFileEx.FlowKey.new(:http2, client_endpoint, server_endpoint)
flow = PcapFileEx.Flows.AnalysisResult.get_flow(result, key)Flow
Base flow identity with display and authoritative fields:
%PcapFileEx.Flow{
protocol: :http2,
from: "web-client", # Display: hostname (no port)
server: "api-gateway:8080", # Display: host:port
client: "web-client:54321", # Display: host:port
server_endpoint: %Endpoint{}, # Authoritative
client_endpoint: %Endpoint{} # Authoritative
}TimelineEvent
For unified playback across protocols:
Enum.each(result.timeline, fn event ->
data = PcapFileEx.Flows.AnalysisResult.get_event(result, event)
case data do
%HTTP1.Exchange{} -> handle_http1(data)
%HTTP2.Stream{} -> handle_http2(data)
%UDP.Datagram{} -> handle_udp(data)
end
end)Protocol-Specific Flows
HTTP/1 Flows
Enum.each(result.http1, fn flow ->
IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")
Enum.each(flow.exchanges, fn exchange ->
IO.puts(" #{exchange.request.method} #{exchange.request.path}")
if exchange.complete do
IO.puts(" -> #{exchange.response.status} (#{exchange.response_delay_ms}ms)")
end
end)
end)HTTP/2 Flows
HTTP/2 uses "streams" to match HTTP/2 spec terminology:
Enum.each(result.http2, fn flow ->
IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")
# Complete streams
Enum.each(flow.streams, fn stream ->
ex = stream.exchange
IO.puts(" #{ex.request.method} #{ex.request.path} -> #{ex.response.status}")
IO.puts(" Response delay: #{stream.response_delay_ms}ms")
end)
# Incomplete streams (RST_STREAM, GOAWAY, truncated)
Enum.each(flow.incomplete, fn inc ->
IO.puts(" Incomplete stream #{inc.stream_id}: #{inc.reason}")
end)
end)UDP Flows
UDP flows are grouped by server (destination) only:
Enum.each(result.udp, fn flow ->
# UDP flows have from: :any since sources can vary
IO.puts("UDP to #{flow.flow.server}: #{length(flow.datagrams)} datagrams")
Enum.each(flow.datagrams, fn dg ->
IO.puts(" #{dg.from} -> #{dg.to}: #{dg.size} bytes @ +#{dg.relative_offset_ms}ms")
end)
end)Playback Timing
HTTP Response Delay
# HTTP/1
exchange.response_delay_ms # Time from request to response
# HTTP/2
stream.response_delay_ms # Time from request start to response completion
# Example playback
def playback_http1(exchange) do
send_request(exchange.request)
Process.sleep(exchange.response_delay_ms)
send_response(exchange.response)
endUDP Relative Offset
# First datagram in flow has relative_offset_ms = 0
datagram.relative_offset_ms # Offset from flow start
# Example playback
def playback_udp(flow) do
start_time = System.monotonic_time(:millisecond)
Enum.each(flow.datagrams, fn dg ->
elapsed = System.monotonic_time(:millisecond) - start_time
remaining = dg.relative_offset_ms - elapsed
if remaining > 0, do: Process.sleep(remaining)
send_udp(dg.to, dg.payload)
end)
endHosts Mapping
Resolve IP addresses to human-readable hostnames:
hosts = %{
"192.168.1.10" => "api-gateway",
"192.168.1.20" => "metrics-collector",
"192.168.1.30" => "web-client"
}
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)
# Now flows show friendly names
result.http2
|> Enum.map(fn f -> {f.flow.from, f.flow.server} end)
# => [{"web-client", "api-gateway:8080"}, ...]Protocol Detection
TCP flows are classified by content inspection:
- HTTP/2: Connection preface
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" - HTTP/1: Request methods (
GET,POST, etc.) orHTTP/response
alias PcapFileEx.Flows.ProtocolDetector
ProtocolDetector.detect("GET / HTTP/1.1\r\n") # => :http1
ProtocolDetector.detect("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") # => :http2
ProtocolDetector.detect(<<0, 1, 2, 3>>) # => :unknownOptions
PcapFileEx.Flows.analyze("capture.pcapng",
hosts_map: %{...}, # IP to hostname mapping
decode_content: true, # Decode HTTP bodies (default: true)
tcp_port: 8080, # Filter TCP to specific port
udp_port: 5005 # Filter UDP to specific port
)Common Patterns
Filter by Client
result.http2
|> Enum.filter(fn f -> f.flow.from == "web-client" end)
|> Enum.flat_map(& &1.streams)Get All Requests
all_requests =
result.http1
|> Enum.flat_map(& &1.exchanges)
|> Enum.map(& &1.request)
http2_requests =
result.http2
|> Enum.flat_map(& &1.streams)
|> Enum.map(& &1.exchange.request)Find Errors
# HTTP errors
errors =
result.http1
|> Enum.flat_map(& &1.exchanges)
|> Enum.filter(fn ex -> ex.complete and ex.response.status >= 400 end)
# Incomplete HTTP/2 streams
incomplete =
result.http2
|> Enum.flat_map(& &1.incomplete)Calculate Statistics
# Total bytes across all flows
total_bytes =
result.http1
|> Enum.map(& &1.stats.byte_count)
|> Enum.sum()
# Duration of a flow
flow = hd(result.http2)
IO.puts("Duration: #{flow.stats.duration_ms}ms")Data Structures
HTTP1.Exchange
%HTTP1.Exchange{
flow_seq: 0, # Index within flow's exchange list
request: %{
method: "GET",
path: "/api/users",
version: "1.1",
headers: %{"host" => "api.example.com"},
body: "",
decoded_body: nil,
timestamp: %Timestamp{}
},
response: %{
status: 200,
reason: "OK",
version: "1.1",
headers: %{"content-type" => "application/json"},
body: "{...}",
decoded_body: {:json, %{...}},
timestamp: %Timestamp{}
},
start_timestamp: %Timestamp{},
end_timestamp: %Timestamp{},
response_delay_ms: 150,
complete: true
}HTTP2.Stream
%HTTP2.Stream{
flow_seq: 0, # Index within flow's stream list
exchange: %HTTP2.Exchange{}, # Full HTTP/2 exchange
start_timestamp: %Timestamp{}, # Converted from DateTime
response_delay_ms: 75 # Exchange duration (see Known Limitations)
}UDP.Datagram
%UDP.Datagram{
flow_seq: 0, # Index within flow's datagram list
from: %Endpoint{},
to: %Endpoint{},
payload: <<...>>,
timestamp: %Timestamp{},
relative_offset_ms: 0, # Offset from flow start
size: 1024
}Best Practices
Use
FlowKeyfor lookups - O(1) access instead of iteratingCheck
completefor HTTP - Incomplete exchanges havenilresponseUse
streamsfor HTTP/2 - Matches HTTP/2 spec terminologyUse timeline for playback - Maintains chronological order across protocols
Apply hosts_map early - Makes logs and debugging more readable
Understand
flow_seqvsseq_num-flow_seqis the index within a flow's event list;seq_numis only in TimelineEvent for timeline position
Known Limitations
HTTP/1 Timestamp Coarseness
HTTP/1 request/response timestamps use the first TCP segment timestamp for each direction. This means:
- Multiple pipelined requests share the same
start_timestamp response_delay_msmay not reflect true per-request latency for pipelined traffic
Workaround: For precise timing, analyze flows with single request/response exchanges.
HTTP/2 response_delay_ms
HTTP2.Stream.response_delay_ms is the full exchange duration (request start → response complete), not time-to-first-byte (TTFB). For large response bodies, this over-estimates actual response latency.
Workaround: For TTFB approximations, consider using the underlying exchange.start_timestamp and exchange.end_timestamp along with response body size.
FlowKey Host Independence
FlowKey lookups ignore the host field in endpoints. This means you can look up flows using keys built with or without hosts_map applied - both will find the same flow.