View Source KafkaEx.Protocol.Kayrock.Fetch.ResponseHelpers (kafka_ex v1.0.0-rc.1)

Shared utility functions for parsing Fetch responses.

These are low-level utilities used by the version-specific response implementations. Handles both MessageSet (V0-V3) and RecordBatch (V4+) message formats.

Summary

Functions

Builds a Fetch struct from parsed response data.

Checks if the partition response has an error and returns the appropriate result.

Computes the last offset from a list of records.

Converts Kayrock RecordHeader structs to Header structs.

Converts record_set (MessageSet or RecordBatch) to list of Record structs.

Builds an error response for empty responses.

Extracts the first topic and partition response from a Fetch response.

Field extractor for V5+ responses (V5, V6, V7, V8, V9, V10).

Field extractor for V11 responses.

Common parsing logic for fetch responses.

Functions

@spec build_fetch(Keyword.t()) :: KafkaEx.Messages.Fetch.t()

Builds a Fetch struct from parsed response data.

Link to this function

check_error(topic, partition_response)

View Source
@spec check_error(String.t(), map()) ::
  {:ok, map()} | {:error, KafkaEx.Client.Error.t()}

Checks if the partition response has an error and returns the appropriate result.

Returns {:ok, partition_response} if no error, {:error, Error.t()} otherwise.

Link to this function

compute_last_offset(records)

View Source
@spec compute_last_offset([KafkaEx.Messages.Fetch.Record.t()]) ::
  non_neg_integer() | nil

Computes the last offset from a list of records.

Link to this function

convert_headers(headers)

View Source
@spec convert_headers(list() | nil) :: [KafkaEx.Messages.Header.t()] | nil

Converts Kayrock RecordHeader structs to Header structs.

Link to this function

convert_records(record_batches, topic, partition)

View Source
@spec convert_records(term(), String.t(), non_neg_integer()) :: [
  KafkaEx.Messages.Fetch.Record.t()
]

Converts record_set (MessageSet or RecordBatch) to list of Record structs.

@spec empty_response_error() :: {:error, KafkaEx.Client.Error.t()}

Builds an error response for empty responses.

Link to this function

extract_first_partition_response(arg1)

View Source
@spec extract_first_partition_response(map()) ::
  {:ok, String.t(), map()} | {:error, :empty_response}

Extracts the first topic and partition response from a Fetch response.

Returns {:ok, topic, partition_response} or {:error, :empty_response}.

Link to this function

extract_v5_plus_fields(response, partition_resp)

View Source
@spec extract_v5_plus_fields(map(), map()) :: Keyword.t()

Field extractor for V5+ responses (V5, V6, V7, V8, V9, V10).

These versions share the same response fields:

  • throttle_time_ms
  • last_stable_offset
  • log_start_offset
  • aborted_transactions
Link to this function

extract_v11_fields(response, partition_resp)

View Source
@spec extract_v11_fields(map(), map()) :: Keyword.t()

Field extractor for V11 responses.

V11 adds preferred_read_replica in the partition header (KIP-392). This tells the consumer which replica it should prefer for reading, enabling rack-aware consumption.

All other fields remain the same as V5+.

Link to this function

parse_response(response, field_extractor)

View Source
@spec parse_response(map(), (map(), map() -> Keyword.t())) ::
  {:ok, KafkaEx.Messages.Fetch.t()} | {:error, KafkaEx.Client.Error.t()}

Common parsing logic for fetch responses.

Extracts the first partition response, checks for errors, converts records, and builds the Fetch struct using the provided field extractor function.

The field_extractor function receives {response, partition_resp} and should return keyword options for additional fields (throttle_time_ms, etc).