View Source KlifeProtocol.Messages.Fetch (Klife Protocol v0.5.0)

Kafka protocol Fetch message

Request versions summary:

  • Version 1 is the same as version 0.
  • Starting in Version 2, the requester must be able to handle Kafka Log Message format version 1.
  • Version 3 adds MaxBytes. Starting in version 3, the partition ordering in the request is now relevant. Partitions will be processed in the order they appear in the request.
  • Version 4 adds IsolationLevel. Starting in version 4, the reqestor must be able to handle Kafka log message format version 2.
  • Version 5 adds LogStartOffset to indicate the earliest available offset of partition data that can be consumed.
  • Version 6 is the same as version 5.
  • Version 7 adds incremental fetch request support.
  • Version 8 is the same as version 7.
  • Version 9 adds CurrentLeaderEpoch, as described in KIP-320.
  • Version 10 indicates that we can use the ZStd compression algorithm, as described in KIP-110. Version 12 adds flexible versions support as well as epoch validation through the LastFetchedEpoch field
  • Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
  • Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405)
  • Version 15 adds the ReplicaState which includes new field ReplicaEpoch and the ReplicaId. Also, deprecate the old ReplicaId field and set its default value to -1. (KIP-903)
  • Version 16 is the same as version 15 (KIP-951).

Response versions summary:

  • Version 1 adds throttle time.
  • Version 2 and 3 are the same as version 1.
  • Version 4 adds features for transactional consumption.
  • Version 5 adds LogStartOffset to indicate the earliest available offset of partition data that can be consumed.
  • Starting in version 6, we may return KAFKA_STORAGE_ERROR as an error code.
  • Version 7 adds incremental fetch request support.
  • Starting in version 8, on quota violation, brokers send out responses before throttling.
  • Version 9 is the same as version 8.
  • Version 10 indicates that the response data can use the ZStd compression algorithm, as described in KIP-110. Version 12 adds support for flexible versions, epoch detection through the TruncationOffset field, and leader discovery through the CurrentLeader field
  • Version 13 replaces the topic name field with topic ID (KIP-516).
  • Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405)
  • Version 15 is the same as version 14 (KIP-903).
  • Version 16 adds the 'NodeEndpoints' field (KIP-951).

Link to this section Summary

Functions

Returns the message api key number.

Receive a binary in the kafka wire format and deserialize it into a map.

Returns the current max supported version of this message.

Returns the current min supported version of this message.

Receives a map and serialize it to kafka wire format of the given version.

Link to this section Functions

Returns the message api key number.

Link to this function

deserialize_response(data, version, with_header? \\ true)

View Source

Receive a binary in the kafka wire format and deserialize it into a map.

Response content fields:

  • throttle_time_ms: The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. (int32 | versions 1+)

  • error_code: The top level response error code. (int16 | versions 7+)

  • session_id: The fetch session ID, or 0 if this is not part of a fetch session. (int32 | versions 7+)

  • responses: The response topics. ([]FetchableTopicResponse | versions 0+)

    • topic: The topic name. (string | versions 0-12)

    • topic_id: The unique topic ID (uuid | versions 13+)

    • partitions: The topic partitions. ([]PartitionData | versions 0+)

      • partition_index: The partition index. (int32 | versions 0+)

      • error_code: The error code, or 0 if there was no fetch error. (int16 | versions 0+)

      • high_watermark: The current high water mark. (int64 | versions 0+)

      • last_stable_offset: The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED) (int64 | versions 4+)

      • log_start_offset: The current log start offset. (int64 | versions 5+)

      • diverging_epoch: In case divergence is detected based on the LastFetchedEpoch and FetchOffset in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge (EpochEndOffset | versions 12+)

        • epoch: (int32 | versions 12+)

        • end_offset: (int64 | versions 12+)

      • current_leader: (LeaderIdAndEpoch | versions 12+)

        • leader_id: The ID of the current leader or -1 if the leader is unknown. (int32 | versions 12+)

        • leader_epoch: The latest known leader epoch (int32 | versions 12+)

      • snapshot_id: In the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request. (SnapshotId | versions 12+)

        • end_offset: (int64 | versions 0+)

        • epoch: (int32 | versions 0+)

      • aborted_transactions: The aborted transactions. ([]AbortedTransaction | versions 4+)

        • producer_id: The producer id associated with the aborted transaction. (int64 | versions 4+)

        • first_offset: The first offset in the aborted transaction. (int64 | versions 4+)

      • preferred_read_replica: The preferred read replica for the consumer to use on its next fetch request (int32 | versions 11+)

      • records: The record data. (records | versions 0+)

  • node_endpoints: Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH. ([]NodeEndpoint | versions 16+)

    • node_id: The ID of the associated node. (int32 | versions 16+)

    • host: The node's hostname. (string | versions 16+)

    • port: The node's port. (int32 | versions 16+)

    • rack: The rack of the node, or null if it has not been assigned to a rack. (string | versions 16+)

Returns the current max supported version of this message.

Returns the current min supported version of this message.

Link to this function

serialize_request(map, version)

View Source

Receives a map and serialize it to kafka wire format of the given version.

Input content fields:

  • cluster_id: The clusterId if known. This is used to validate metadata fetches prior to broker registration. (string | versions 12+)

  • replica_id: The broker ID of the follower, of -1 if this request is from a consumer. (int32 | versions 0-14)

  • replica_state: (ReplicaState | versions 15+)

    • replica_id: The replica ID of the follower, or -1 if this request is from a consumer. (int32 | versions 15+)

    • replica_epoch: The epoch of this follower, or -1 if not available. (int64 | versions 15+)

  • max_wait_ms: The maximum time in milliseconds to wait for the response. (int32 | versions 0+)

  • min_bytes: The minimum bytes to accumulate in the response. (int32 | versions 0+)

  • max_bytes: The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored. (int32 | versions 3+)

  • isolation_level: This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records (int8 | versions 4+)

  • session_id: The fetch session ID. (int32 | versions 7+)

  • session_epoch: The fetch session epoch, which is used for ordering requests in a session. (int32 | versions 7+)

  • topics: The topics to fetch. ([]FetchTopic | versions 0+)

    • topic: The name of the topic to fetch. (string | versions 0-12)

    • topic_id: The unique topic ID (uuid | versions 13+)

    • partitions: The partitions to fetch. ([]FetchPartition | versions 0+)

      • partition: The partition index. (int32 | versions 0+)

      • current_leader_epoch: The current leader epoch of the partition. (int32 | versions 9+)

      • fetch_offset: The message offset. (int64 | versions 0+)

      • last_fetched_epoch: The epoch of the last fetched record or -1 if there is none (int32 | versions 12+)

      • log_start_offset: The earliest available offset of the follower replica. The field is only used when the request is sent by the follower. (int64 | versions 5+)

      • partition_max_bytes: The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored. (int32 | versions 0+)

  • forgotten_topics_data: In an incremental fetch request, the partitions to remove. ([]ForgottenTopic | versions 7+)

    • topic: The topic name. (string | versions 7-12)

    • topic_id: The unique topic ID (uuid | versions 13+)

    • partitions: The partitions indexes to forget. ([]int32 | versions 7+)

  • rack_id: Rack ID of the consumer making this request (string | versions 11+)