View Source KlifeProtocol.Messages.OffsetFetch (Klife Protocol v0.7.0)

Kafka protocol OffsetFetch message

Request versions summary:

  • In version 0, the request read offsets from ZK.
  • Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
  • Starting in version 2, the request can contain a null topics array to indicate that offsets for all topics should be fetched. It also returns a top level error code for group or coordinator level errors.
  • Version 3, 4, and 5 are the same as version 2.
  • Version 6 is the first flexible version.
  • Version 7 is adding the require stable flag.
  • Version 8 is adding support for fetching offsets for multiple groups at a time.
  • Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used.

Response versions summary:

  • Version 1 is the same as version 0.
  • Version 2 adds a top-level error code.
  • Version 3 adds the throttle time.
  • Starting in version 4, on quota violation, brokers send out responses before throttling.
  • Version 5 adds the leader epoch to the committed offset.
  • Version 6 is the first flexible version.
  • Version 7 adds pending offset commit as new error response on partition level.
  • Version 8 is adding support for fetching offsets for multiple groups
  • Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is the same as version 8 but can return STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors when the new consumer group protocol is used.

Link to this section Summary

Functions

Returns the message api key number.

Receive a binary in the kafka wire format and deserialize it into a map.

Returns the current max supported version of this message.

Returns the current min supported version of this message.

Receives a map and serialize it to kafka wire format of the given version.

Link to this section Functions

Returns the message api key number.

Link to this function

deserialize_response(data, version, with_header? \\ true)

View Source

Receive a binary in the kafka wire format and deserialize it into a map.

Response content fields:

  • throttle_time_ms: The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. (int32 | versions 3+)

  • topics: The responses per topic. ([]OffsetFetchResponseTopic | versions 0-7)

    • name: The topic name. (string | versions 0-7)

    • partitions: The responses per partition ([]OffsetFetchResponsePartition | versions 0-7)

      • partition_index: The partition index. (int32 | versions 0-7)

      • committed_offset: The committed message offset. (int64 | versions 0-7)

      • committed_leader_epoch: The leader epoch. (int32 | versions 5-7)

      • metadata: The partition metadata. (string | versions 0-7)

      • error_code: The error code, or 0 if there was no error. (int16 | versions 0-7)

  • error_code: The top-level error code, or 0 if there was no error. (int16 | versions 2-7)

  • groups: The responses per group id. ([]OffsetFetchResponseGroup | versions 8+)

    • group_id: The group ID. (string | versions 8+)

    • topics: The responses per topic. ([]OffsetFetchResponseTopics | versions 8+)

      • name: The topic name. (string | versions 8+)

      • partitions: The responses per partition ([]OffsetFetchResponsePartitions | versions 8+)

        • partition_index: The partition index. (int32 | versions 8+)

        • committed_offset: The committed message offset. (int64 | versions 8+)

        • committed_leader_epoch: The leader epoch. (int32 | versions 8+)

        • metadata: The partition metadata. (string | versions 8+)

        • error_code: The partition-level error code, or 0 if there was no error. (int16 | versions 8+)

    • error_code: The group-level error code, or 0 if there was no error. (int16 | versions 8+)

Returns the current max supported version of this message.

Returns the current min supported version of this message.

Link to this function

serialize_request(map, version)

View Source

Receives a map and serialize it to kafka wire format of the given version.

Input content fields:

  • group_id: The group to fetch offsets for. (string | versions 0-7)

  • topics: Each topic we would like to fetch offsets for, or null to fetch offsets for all topics. ([]OffsetFetchRequestTopic | versions 0-7)

    • name: The topic name. (string | versions 0-7)

    • partition_indexes: The partition indexes we would like to fetch offsets for. ([]int32 | versions 0-7)

  • groups: Each group we would like to fetch offsets for ([]OffsetFetchRequestGroup | versions 8+)

    • group_id: The group ID. (string | versions 8+)

    • member_id: The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848). (string | versions 9+)

    • member_epoch: The member epoch if using the new consumer protocol (KIP-848). (int32 | versions 9+)

    • topics: Each topic we would like to fetch offsets for, or null to fetch offsets for all topics. ([]OffsetFetchRequestTopics | versions 8+)

      • name: The topic name. (string | versions 8+)

      • partition_indexes: The partition indexes we would like to fetch offsets for. ([]int32 | versions 8+)

  • require_stable: Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions. (bool | versions 7+)