View Source KafkaEx.Protocol.Kayrock.ListOffsets.ResponseHelpers (kafka_ex v1.0.0-rc.1)

Shared helper functions for parsing ListOffsets responses across all versions.

Version differences:

  • V0: Uses offsets array (returns first offset or 0)
  • V1: Uses single offset field
  • V2-V3: Uses single offset field with timestamp
  • V4-V5: Same as V2 but adds leader_epoch in partition responses

Summary

Functions

Extracts offset from V0 response (offsets array).

Extracts offset from V1 response (single offset field, no timestamp).

Extracts offset from V2 response (single offset field with timestamp).

Extracts offset from V4+ response (single offset field with timestamp and leader_epoch).

Parses a ListOffsets response using the provided offset extractor function.

Types

@type error_tuple() :: {atom(), String.t(), non_neg_integer()}

Functions

Link to this function

extract_v0_offset(topic, map)

View Source
@spec extract_v0_offset(String.t(), map()) ::
  {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()}

Extracts offset from V0 response (offsets array).

Link to this function

extract_v1_offset(topic, map)

View Source
@spec extract_v1_offset(String.t(), map()) ::
  {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()}

Extracts offset from V1 response (single offset field, no timestamp).

Link to this function

extract_v2_offset(topic, map)

View Source
@spec extract_v2_offset(String.t(), map()) ::
  {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()}

Extracts offset from V2 response (single offset field with timestamp).

Link to this function

extract_v4_offset(topic, resp)

View Source
@spec extract_v4_offset(String.t(), map()) ::
  {:ok, KafkaEx.Messages.Offset.t()} | {:error, error_tuple()}

Extracts offset from V4+ response (single offset field with timestamp and leader_epoch).

V4 adds leader_epoch to the partition response, indicating the epoch of the leader at the time the offset was committed. A value of -1 means the leader epoch is unknown.

This extractor is also used for V5 which has the same response schema.

Link to this function

parse_response(map, offset_extractor)

View Source
@spec parse_response(map(), (String.t(), map() ->
                         {:ok, KafkaEx.Messages.Offset.t()}
                         | {:error, error_tuple()})) ::
  {:ok, [KafkaEx.Messages.Offset.t()]} | {:error, any()}

Parses a ListOffsets response using the provided offset extractor function.

The offset_extractor takes a partition response map and returns either:

  • {:ok, data} where data is a map with :partition, :offset, :error_code, and optionally :timestamp
  • {:error, {error_code, topic, partition}}