View Source KafkaEx.Protocol.Kayrock.ListOffsets.ResponseHelpers (kafka_ex v1.0.0-rc.1)
Shared helper functions for parsing ListOffsets responses across all versions.
Version differences:
- V0: Uses
offsetsarray (returns first offset or 0) - V1: Uses single
offsetfield - V2-V3: Uses single
offsetfield withtimestamp - V4-V5: Same as V2 but adds
leader_epochin partition responses
Summary
Functions
Extracts offset from V0 response (offsets array).
Extracts offset from V1 response (single offset field, no timestamp).
Extracts offset from V2 response (single offset field with timestamp).
Extracts offset from V4+ response (single offset field with timestamp and leader_epoch).
Parses a ListOffsets response using the provided offset extractor function.
Types
@type error_tuple() :: {atom(), String.t(), non_neg_integer()}
Functions
@spec extract_v0_offset(String.t(), map()) :: {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()}
Extracts offset from V0 response (offsets array).
@spec extract_v1_offset(String.t(), map()) :: {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()}
Extracts offset from V1 response (single offset field, no timestamp).
@spec extract_v2_offset(String.t(), map()) :: {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()}
Extracts offset from V2 response (single offset field with timestamp).
@spec extract_v4_offset(String.t(), map()) :: {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()}
Extracts offset from V4+ response (single offset field with timestamp and leader_epoch).
V4 adds leader_epoch to the partition response, indicating the epoch of the leader
at the time the offset was committed. A value of -1 means the leader epoch is unknown.
This extractor is also used for V5 which has the same response schema.
@spec parse_response(map(), (String.t(), map() -> {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()})) :: {:ok, [KafkaEx.Messages.Offset.t()]} | {:error, any()}
Parses a ListOffsets response using the provided offset extractor function.
The offset_extractor takes a partition response map and returns either:
{:ok, data}where data is a map with :partition, :offset, :error_code, and optionally :timestamp{:error, {error_code, topic, partition}}