View Source KafkaEx.Protocol.Kayrock.OffsetCommit.RequestHelpers (kafka_ex v1.0.0-rc.1)

Shared helper functions for building OffsetCommit requests across all versions.

This module reduces code duplication while maintaining type safety through protocol dispatch for each version-specific struct.

Summary

Functions

Builds partition data for a topic.

Builds partition data with committed_leader_epoch field.

Builds the topics structure for OffsetCommit requests.

Builds the topics structure with committed_leader_epoch for V6+ requests.

Builds an OffsetCommit v2/v3/v4 request with all fields including retention_time_ms.

Builds an OffsetCommit v5 request (retention_time_ms removed).

Builds an OffsetCommit v6 request (adds committed_leader_epoch per partition).

Builds an OffsetCommit v7+ request (adds group_instance_id + committed_leader_epoch).

Extracts common fields from request options.

Extracts consumer group coordination fields (generation_id, member_id). Used in v1+ versions.

Extracts group_instance_id field for v7+ versions. Returns nil when not provided (nullable field).

Extracts retention_time field for v2+ versions.

Types

@type partition_data() :: %{
  :partition_num => non_neg_integer(),
  :offset => integer(),
  optional(:metadata) => String.t(),
  optional(:timestamp) => integer()
}
@type partition_output() :: %{
  :partition => non_neg_integer(),
  :offset => integer(),
  :metadata => String.t(),
  optional(:timestamp) => integer()
}
@type topic_data() :: {String.t(), [partition_data()]}
@type topic_output() :: %{topic: String.t(), partitions: [partition_output()]}

Functions

Link to this function

build_partitions(partitions, include_timestamp)

View Source
@spec build_partitions([partition_data()], boolean()) :: [partition_output()]

Builds partition data for a topic.

Parameters

  • partitions - List of partition data maps
  • include_timestamp - Whether to include timestamp field

Returns

List of partition maps with offset, metadata, and optionally timestamp

Link to this function

build_partitions_with_leader_epoch(partitions)

View Source
@spec build_partitions_with_leader_epoch([partition_data()]) :: [map()]

Builds partition data with committed_leader_epoch field.

Link to this function

build_topics(opts, include_timestamp \\ false)

View Source
@spec build_topics(Keyword.t(), boolean()) :: [topic_output()]

Builds the topics structure for OffsetCommit requests.

Parameters

  • opts - Keyword list containing :topics key with topic/partition data
  • include_timestamp - Whether to include timestamp field (v0: false, v1: true, v2+: false)

Returns

List of topic maps with partition data

Link to this function

build_topics_with_leader_epoch(opts)

View Source
@spec build_topics_with_leader_epoch(Keyword.t()) :: [map()]

Builds the topics structure with committed_leader_epoch for V6+ requests.

Link to this function

build_v2_v3_request(request_template, opts)

View Source
@spec build_v2_v3_request(
  struct(),
  Keyword.t()
) :: struct()

Builds an OffsetCommit v2/v3/v4 request with all fields including retention_time_ms.

Link to this function

build_v5_request(request_template, opts)

View Source
@spec build_v5_request(
  struct(),
  Keyword.t()
) :: struct()

Builds an OffsetCommit v5 request (retention_time_ms removed).

Link to this function

build_v6_request(request_template, opts)

View Source
@spec build_v6_request(
  struct(),
  Keyword.t()
) :: struct()

Builds an OffsetCommit v6 request (adds committed_leader_epoch per partition).

Link to this function

build_v7_plus_request(request_template, opts)

View Source
@spec build_v7_plus_request(
  struct(),
  Keyword.t()
) :: struct()

Builds an OffsetCommit v7+ request (adds group_instance_id + committed_leader_epoch).

Link to this function

extract_common_fields(opts)

View Source
@spec extract_common_fields(Keyword.t()) :: %{group_id: String.t()}

Extracts common fields from request options.

Link to this function

extract_coordination_fields(opts)

View Source
@spec extract_coordination_fields(Keyword.t()) :: %{
  generation_id: integer(),
  member_id: String.t()
}

Extracts consumer group coordination fields (generation_id, member_id). Used in v1+ versions.

Link to this function

extract_group_instance_id(opts)

View Source
@spec extract_group_instance_id(Keyword.t()) :: String.t() | nil

Extracts group_instance_id field for v7+ versions. Returns nil when not provided (nullable field).

Link to this function

extract_retention_time(opts)

View Source
@spec extract_retention_time(Keyword.t()) :: %{retention_time: integer()}

Extracts retention_time field for v2+ versions.