View Source KafkaEx.Protocol.Kayrock.Fetch.RequestHelpers (kafka_ex v1.0.0-rc.1)

Shared utility functions for building Fetch requests.

These are low-level utilities used by the version-specific request implementations.

Summary

Functions

Adds current_leader_epoch to partition requests for V9+.

Adds isolation_level field for V4+ requests.

Adds max_bytes field for V3+ requests.

Adds rack_id for V11+ requests.

Adds session fields for V7+ requests (incremental fetch).

Builds a Fetch request for V7+ versions with all applicable fields.

Builds the topics structure for Fetch request.

Extracts common fields from request options.

Populates a request struct with common fields.

Functions

Link to this function

add_current_leader_epoch(request, opts, api_version)

View Source
@spec add_current_leader_epoch(struct(), Keyword.t(), integer()) :: struct()

Adds current_leader_epoch to partition requests for V9+.

V9 introduces current_leader_epoch in each partition request, allowing brokers to detect stale fetch requests from consumers that have not yet learned about a leader change.

The default value of -1 means "no epoch specified" (unknown).

Link to this function

add_isolation_level(request, opts, api_version)

View Source
@spec add_isolation_level(struct(), Keyword.t(), integer()) :: struct()

Adds isolation_level field for V4+ requests.

Link to this function

add_max_bytes(request, fields, api_version)

View Source
@spec add_max_bytes(struct(), map(), integer()) :: struct()

Adds max_bytes field for V3+ requests.

Link to this function

add_rack_id(request, opts, api_version)

View Source
@spec add_rack_id(struct(), Keyword.t(), integer()) :: struct()

Adds rack_id for V11+ requests.

V11 introduces rack_id at the top level, enabling rack-aware fetch so brokers can prefer replicas closer to the consumer.

The default value is an empty string (no rack specified).

Link to this function

add_session_fields(request, opts, api_version)

View Source
@spec add_session_fields(struct(), Keyword.t(), integer()) :: struct()

Adds session fields for V7+ requests (incremental fetch).

Link to this function

build_request_v7_plus(request, opts, api_version)

View Source
@spec build_request_v7_plus(struct(), Keyword.t(), integer()) :: struct()

Builds a Fetch request for V7+ versions with all applicable fields.

Handles the common pattern of V7+ requests which share:

  • V3+ max_bytes
  • V4+ isolation_level
  • V5+ log_start_offset in partition data
  • V7+ session fields (session_id, session_epoch, forgotten_topics_data)
  • V9+ current_leader_epoch in partition data
  • V11+ rack_id

This helper reduces duplication across V7-V11 request implementations which all share the same base logic.

Link to this function

build_topics(fields, opts)

View Source
@spec build_topics(map(), Keyword.t()) :: [map()]

Builds the topics structure for Fetch request.

For V0-V4:

[%{topic: "topic", partitions: [%{partition: 0, fetch_offset: 0, max_bytes: 1000000}]}]

For V5+:

[%{topic: "topic", partitions: [%{partition: 0, fetch_offset: 0, log_start_offset: 0, max_bytes: 1000000}]}]
Link to this function

extract_common_fields(opts)

View Source
@spec extract_common_fields(Keyword.t()) :: map()

Extracts common fields from request options.

Returns a map with:

  • :topic - The topic to fetch from
  • :partition - The partition to fetch from
  • :offset - The offset to start fetching from
  • :max_bytes - Maximum bytes to fetch per partition
  • :max_wait_time - Maximum time to wait for messages in ms
  • :min_bytes - Minimum bytes to accumulate before returning
Link to this function

populate_request(request, fields, topics)

View Source
@spec populate_request(struct(), map(), [map()]) :: struct()

Populates a request struct with common fields.