Custom Decoders for Flows

View Source

Overview

The decoders option in PcapFileEx.Flows.analyze/2 allows you to decode domain-specific binary payloads. Custom decoders transform raw binary data into structured terms based on matching criteria.

When to use custom decoders:

  • UDP datagrams with application-specific protocols (telemetry, gaming, IoT)
  • HTTP bodies with binary content-types (protobuf, custom formats)
  • Multipart parts with 5G SBI protocols (NGAP, NAS, etc.)

Key principle: Custom decoders only apply to binary content. Built-in JSON/text decoding runs first.

Quick Start

# Decode UDP telemetry on port 5005
decoder = %{
  protocol: :udp,
  match: %{port: 5005},
  decoder: &MyTelemetry.decode/1
}

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng",
  decoders: [decoder]
)

# Access decoded payload - decoder results returned directly (v0.5.7+)
datagram = hd(hd(result.udp).datagrams)
case datagram.payload do
  {:my_telemetry, data} -> IO.inspect(data)  # Decoder result directly
  {:decode_error, reason} -> IO.puts("Error: #{inspect(reason)}")
  raw when is_binary(raw) -> IO.puts("No decoder matched")
end

# To get wrapped results like {:custom, value}, use unwrap_custom: false
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng",
  decoders: [decoder],
  unwrap_custom: false
)
# datagram.payload is now {:custom, {:my_telemetry, data}}

Decoder Specification

A decoder spec is a map with three required keys:

%{
  protocol: :udp | :http1 | :http2,  # Filter by protocol
  match: matcher(),                   # Criteria to match
  decoder: decoder_fn() | module()    # Decoding function or module
}

Protocol

Determines which traffic the decoder applies to:

  • :udp - UDP datagrams
  • :http1 - HTTP/1.x bodies and multipart parts
  • :http2 - HTTP/2 bodies and multipart parts

Match Criteria

Match can be a map or function:

# Map matcher - all specified criteria must match
%{
  port: 5005,                              # UDP destination port
  scope: :body | :multipart_part,          # HTTP body vs multipart part
  content_type: "application/x-protobuf",  # Content-Type header
  content_id: "part1",                     # Multipart Content-ID
  method: "POST",                          # HTTP method
  path: "/api/v1"                          # Request path
}

# Function matcher - full control
fn ctx ->
  ctx.port in 5000..6000 and ctx.direction == :datagram
end

Supported match values: | Field | Type | Example | |-------|------|---------| | port | integer, Range, list | 5005, 5000..5100, [5005, 5006] | | content_type | string, Regex, list | "application/json", ~r/vnd\.3gpp\..*/ | | content_id | string, Regex | "ngap-part", ~r/part-\d+/ | | method | string, list | "POST", ["POST", "PUT"] | | path | string, Regex | "/api/users", ~r/\/api\/v\d+\/.*/ | | scope | atom | :body, :multipart_part |

Decoder Types

Arity-1 (Simple)

Receives only payload. Return value is stored directly (unwrapped by default).

decoder = %{
  protocol: :udp,
  match: %{port: 5005},
  decoder: fn payload ->
    # Return any term - stored directly as the payload
    MyParser.parse(payload)
  end
}

# Or use a module function reference
decoder = %{
  protocol: :udp,
  match: %{port: 5005},
  decoder: &MyParser.parse/1
}

Return values:

  • Any term → stored directly (or as {:custom, term} if unwrap_custom: false)
  • {:error, reason} → stored as {:decode_error, reason} (always wrapped)

Arity-2 (Context-Aware)

Receives context and payload. Must return {:ok, term}, {:error, reason}, or :skip.

decoder = %{
  protocol: :http1,
  match: %{scope: :multipart_part, content_type: ~r/vnd\.3gpp\..*/},
  decoder: fn %{content_id: id, path: path}, payload ->
    case MyDecoder.parse(payload) do
      {:ok, data} -> {:ok, %{id: id, path: path, data: data}}
      {:error, reason} -> {:error, {:parse_failed, reason}}
    end
  end
}

Return values:

  • {:ok, term} → stored directly (or as {:custom, term} if unwrap_custom: false)
  • {:error, reason} → stored as {:decode_error, reason} (always wrapped)
  • :skip → try next decoder, or fall back to binary

Module-Based

Implement the PcapFileEx.Flows.Decoder behaviour:

defmodule MyNGAPDecoder do
  @behaviour PcapFileEx.Flows.Decoder

  @impl true
  def decode(%{content_id: id}, payload) do
    case NGAP.parse(payload) do
      {:ok, message} -> {:ok, {:ngap, id, message}}
      {:error, reason} -> {:error, {:ngap_error, reason}}
    end
  end

  # Optional: define fields for display filters
  @impl true
  def fields do
    [
      %{id: "ngap.procedure_code", type: :integer,
        extractor: fn {:ngap, _, msg} -> msg.procedure_code end}
    ]
  end
end

decoder = %{
  protocol: :http1,
  match: %{scope: :multipart_part, content_type: "application/vnd.3gpp.ngap"},
  decoder: MyNGAPDecoder
}

Context Fields

The context passed to arity-2 decoders varies by protocol:

UDP Context

%{
  protocol: :udp,
  direction: :datagram,
  port: 5005,                    # Destination port
  from: %Endpoint{...},          # Source endpoint
  to: %Endpoint{...}             # Destination endpoint
}

HTTP Body Context

%{
  protocol: :http1 | :http2,
  direction: :request | :response,
  scope: :body,
  content_type: "application/x-protobuf",
  headers: %{"content-length" => "1024", ...},
  method: "POST",
  path: "/api/users",
  status: 200                    # Only for responses
}

Multipart Part Context

%{
  protocol: :http1 | :http2,
  direction: :request | :response,
  scope: :multipart_part,
  content_type: "application/vnd.3gpp.ngap",
  content_id: "ngap-part",       # May be nil
  headers: %{...},               # Part's own headers
  method: "POST",                # Parent request method
  path: "/sbi/v1"                # Parent request path
}

Note: For HTTP/2, headers excludes pseudo-headers (:method, :path, :status). Use the dedicated context fields instead.

Decoding Pipeline

HTTP Bodies

  1. Built-in JSON decoder (application/json{:json, term})
  2. Built-in text decoder (text/*{:text, string})
  3. Built-in multipart parser (multipart/*{:multipart, parts})
  4. Custom decoders (binary content only)
  5. Binary fallback ({:binary, payload})

Multipart Parts

Each part follows the same pipeline:

  1. Built-in JSON/text decoders
  2. Custom decoders (binary parts only)
  3. Binary fallback

UDP Datagrams

  1. Custom decoders (first match)
  2. No fallback (payload remains raw binary)

Result Wrapping

By default (unwrap_custom: true), custom decoder results are stored directly without wrapping. Use unwrap_custom: false to get wrapped {:custom, term} results.

Decoder ReturnDefault (unwrap_custom: true)With unwrap_custom: false
{:ok, term} (arity-2)term{:custom, term}
term (arity-1)term{:custom, term}
{:error, reason}{:decode_error, reason}{:decode_error, reason}
Exception raised{:decode_error, %{exception: e, stacktrace: st}}same
:skipFalls through to next decodersame

Pattern match on results (default behavior):

case exchange.response.decoded_body do
  {:my_protocol, data} -> handle_custom(data)     # Decoder result directly
  {:decode_error, reason} -> handle_error(reason) # Errors always wrapped
  {:json, json} -> handle_json(json)              # Built-in decoders unchanged
  {:text, text} -> handle_text(text)
  {:multipart, parts} -> handle_multipart(parts)
  {:binary, raw} -> handle_binary(raw)
end

# With unwrap_custom: false, custom results are wrapped:
case exchange.response.decoded_body do
  {:custom, data} -> handle_custom(data)          # Wrapped in {:custom, ...}
  {:decode_error, reason} -> handle_error(reason)
  # ... same for built-in decoders
end

Examples

UDP Telemetry Decoder

# Decode custom telemetry protocol
telemetry_decoder = %{
  protocol: :udp,
  match: %{port: 5005..5010},  # Port range
  decoder: fn payload ->
    <<sensor_id::16, temperature::float-32, humidity::float-32>> = payload
    %{sensor_id: sensor_id, temp: temperature, humidity: humidity}
  end
}

HTTP Protobuf Decoder

# Decode protobuf bodies
protobuf_decoder = %{
  protocol: :http1,
  match: %{scope: :body, content_type: "application/x-protobuf"},
  decoder: fn %{path: path}, payload ->
    message_type = infer_message_type(path)
    {:ok, Protobuf.decode(message_type, payload)}
  end
}

5G SBI Multipart Decoder

# Decode 3GPP binary parts in SBI multipart
sbi_decoder = %{
  protocol: :http2,
  match: %{scope: :multipart_part, content_type: ~r/application\/vnd\.3gpp\..*/},
  decoder: fn %{content_type: ct, content_id: id}, payload ->
    type = String.replace(ct, "application/vnd.3gpp.", "")
    {:ok, %{type: type, id: id, size: byte_size(payload), data: payload}}
  end
}

Conditional Decoder with :skip

# Try decoding, skip if not our format
conditional_decoder = %{
  protocol: :udp,
  match: %{port: 5005},
  decoder: fn _ctx, payload ->
    case payload do
      <<0xCA, 0xFE, rest::binary>> ->
        {:ok, {:magic_protocol, rest}}
      _ ->
        :skip  # Not our format, try next decoder
    end
  end
}

Error Handling

Decoder errors are stored as {:decode_error, reason}, not raised:

# Check for decode errors in UDP (default unwrap_custom: true)
Enum.each(result.udp, fn flow ->
  Enum.each(flow.datagrams, fn dg ->
    case dg.payload do
      {:decode_error, %{exception: e}} ->
        Logger.error("Decoder crashed: #{inspect(e)}")
      {:decode_error, reason} ->
        Logger.warning("Decode failed: #{inspect(reason)}")
      raw when is_binary(raw) ->
        :ok  # No decoder matched
      decoded ->
        :ok  # Successfully decoded (unwrapped result)
    end
  end)
end)

# Check for decode errors in HTTP multipart
case exchange.response.decoded_body do
  {:multipart, parts} ->
    Enum.each(parts, fn part ->
      case part.body do
        {:decode_error, reason} ->
          Logger.warning("Part #{part.content_id} failed: #{inspect(reason)}")
        _ ->
          :ok
      end
    end)
  _ ->
    :ok
end

Best Practices

  1. Use arity-1 for simple decoders - No context needed, cleaner code

  2. Return :skip for conditional decoders - Let other decoders try

  3. Return {:error, reason} for failures - Don't crash, store the error

  4. Match specifically - Use scope: :multipart_part to avoid matching body

  5. Use Regex for content-type families - ~r/vnd\.3gpp\..*/ matches all 3GPP types

  6. Register for both HTTP/1 and HTTP/2 - If your protocol appears in both:

    http1_decoder = %{protocol: :http1, match: matcher, decoder: decoder}
    http2_decoder = %{protocol: :http2, match: matcher, decoder: decoder}
    decoders: [http1_decoder, http2_decoder]
  7. Implement fields/0 for display filters - Makes decoded data filterable

  8. Normalize headers are lowercase - Match with "content-type", not "Content-Type"

Decoder Priority

Decoders are evaluated in the order they appear in the list. First match wins:

decoders: [
  specific_decoder,   # Checked first
  fallback_decoder    # Only if specific doesn't match
]

For :skip returns, evaluation continues to the next matching decoder.

Binary Preservation

When using custom decoders, you may need both the decoded data (for analysis) and the original binary (for playback/replay). The keep_binary option preserves the original binary alongside the decoded content.

Usage

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng",
  decoders: [my_decoder],
  keep_binary: true  # Preserve original binary
)

UDP Datagrams

When a custom decoder transforms a UDP datagram (default unwrap_custom: true):

datagram = hd(hd(result.udp).datagrams)

case datagram.payload do
  {:decode_error, reason} ->
    # Decoder failed, but binary preserved for debugging
    Logger.error("Decode failed: #{inspect(reason)}")
    if datagram.payload_binary do
      debug_binary(datagram.payload_binary)
    end

  raw when is_binary(raw) ->
    # No decoder matched - raw binary in payload, no payload_binary
    replay(raw)

  decoded_data ->
    # Decoded content for analysis (unwrapped by default)
    IO.inspect(decoded_data)

    # Original binary for playback (only when keep_binary: true)
    if datagram.payload_binary do
      replay(datagram.payload_binary)
    end
end

Key invariants:

  • payload_binary is ONLY set when a custom decoder was invoked AND keep_binary: true
  • :skip returns don't set payload_binary (equivalent to "no decoder matched")
  • When no decoder matches, payload is raw binary, payload_binary is nil

Multipart Parts

For HTTP multipart responses with custom decoders (default unwrap_custom: true):

case exchange.response.decoded_body do
  {:multipart, parts} ->
    Enum.each(parts, fn part ->
      case part.body do
        {:decode_error, reason} ->
          Logger.error("Part decode failed: #{inspect(reason)}")
          # Binary preserved for debugging
          if part.body_binary, do: debug(part.body_binary)

        {:json, _} = json ->
          # Built-in JSON decoder (no body_binary)
          IO.inspect(json)

        {:text, _} = text ->
          # Built-in text decoder (no body_binary)
          IO.inspect(text)

        {:binary, _} = binary ->
          # Built-in binary fallback (no body_binary)
          IO.inspect(binary)

        decoded_data ->
          # Custom decoded content (unwrapped by default)
          IO.inspect(decoded_data)
          # Original binary (only when keep_binary: true)
          if part.body_binary, do: replay(part.body_binary)
      end
    end)
  _ ->
    :ok
end

Playback Helper

@doc "Get raw binary for playback. Returns nil if not preserved."
def get_raw_payload(datagram) do
  case datagram.payload do
    raw when is_binary(raw) -> raw              # No decoder matched
    {:decode_error, _} -> datagram.payload_binary  # Decoder failed
    _decoded -> datagram.payload_binary         # Custom decoded (unwrapped)
  end
end

# Usage
case get_raw_payload(datagram) do
  nil -> raise "Binary not preserved. Use keep_binary: true"
  raw -> send_to_server(raw)
end

Memory Warning

keep_binary: true doubles memory for decoded content:

  • UDP datagram with 1KB payload → ~2KB memory
  • Multipart part with 10KB binary → ~20KB memory

Recommendations:

  • Only use keep_binary: true when playback/replay is needed
  • Default keep_binary: false avoids this overhead
  • For large captures, consider streaming with selective processing