View Source KafkaEx.Protocol.Kayrock.Fetch.ResponseHelpers (kafka_ex v1.0.0-rc.1)
Shared utility functions for parsing Fetch responses.
These are low-level utilities used by the version-specific response implementations. Handles both MessageSet (V0-V3) and RecordBatch (V4+) message formats.
Summary
Functions
Builds a Fetch struct from parsed response data.
Checks if the partition response has an error and returns the appropriate result.
Computes the last offset from a list of records.
Converts Kayrock RecordHeader structs to Header structs.
Converts record_set (MessageSet or RecordBatch) to list of Record structs.
Builds an error response for empty responses.
Extracts the first topic and partition response from a Fetch response.
Field extractor for V5+ responses (V5, V6, V7, V8, V9, V10).
Field extractor for V11 responses.
Common parsing logic for fetch responses.
Functions
@spec build_fetch(Keyword.t()) :: KafkaEx.Messages.Fetch.t()
Builds a Fetch struct from parsed response data.
@spec check_error(String.t(), map()) :: {:ok, map()} | {:error, KafkaEx.Client.Error.t()}
Checks if the partition response has an error and returns the appropriate result.
Returns {:ok, partition_response} if no error, {:error, Error.t()} otherwise.
@spec compute_last_offset([KafkaEx.Messages.Fetch.Record.t()]) :: non_neg_integer() | nil
Computes the last offset from a list of records.
@spec convert_headers(list() | nil) :: [KafkaEx.Messages.Header.t()] | nil
Converts Kayrock RecordHeader structs to Header structs.
@spec convert_records(term(), String.t(), non_neg_integer()) :: [ KafkaEx.Messages.Fetch.Record.t() ]
Converts record_set (MessageSet or RecordBatch) to list of Record structs.
@spec empty_response_error() :: {:error, KafkaEx.Client.Error.t()}
Builds an error response for empty responses.
@spec extract_first_partition_response(map()) :: {:ok, String.t(), map()} | {:error, :empty_response}
Extracts the first topic and partition response from a Fetch response.
Returns {:ok, topic, partition_response} or {:error, :empty_response}.
Field extractor for V5+ responses (V5, V6, V7, V8, V9, V10).
These versions share the same response fields:
- throttle_time_ms
- last_stable_offset
- log_start_offset
- aborted_transactions
Field extractor for V11 responses.
V11 adds preferred_read_replica in the partition header (KIP-392).
This tells the consumer which replica it should prefer for reading,
enabling rack-aware consumption.
All other fields remain the same as V5+.
@spec parse_response(map(), (map(), map() -> Keyword.t())) :: {:ok, KafkaEx.Messages.Fetch.t()} | {:error, KafkaEx.Client.Error.t()}
Common parsing logic for fetch responses.
Extracts the first partition response, checks for errors, converts records, and builds the Fetch struct using the provided field extractor function.
The field_extractor function receives {response, partition_resp} and
should return keyword options for additional fields (throttle_time_ms, etc).