View Source KafkaEx.Protocol.Kayrock.Produce.RequestHelpers (kafka_ex v1.0.0-rc.1)

Shared helper functions for building Produce requests across all versions.

Handles the complexity of different message formats:

  • V0-V2: MessageSet format (legacy)
  • V3+: RecordBatch format (modern with headers support)

Summary

Functions

Builds a MessageSet from a list of messages (V0-V2 format).

Builds a RecordBatch from a list of messages (V3+ format).

Builds RecordHeader structs from a list of {key, value} tuples.

Builds a Produce request for V0-V2 using MessageSet format.

Builds a Produce request for V3+ using RecordBatch format.

Converts compression type atom to RecordBatch attributes byte.

Extracts common fields from request options.

Functions

Link to this function

build_message_set(messages, compression)

View Source
@spec build_message_set([map()], atom()) :: Kayrock.MessageSet.t()

Builds a MessageSet from a list of messages (V0-V2 format).

Each message in the list should be a map with:

  • :value (required) - The message value
  • :key (optional) - The message key

Note: Timestamps and headers are NOT supported in MessageSet format.

Link to this function

build_record_batch(messages, compression)

View Source
@spec build_record_batch([map()], atom()) :: Kayrock.RecordBatch.t()

Builds a RecordBatch from a list of messages (V3+ format).

Each message in the list should be a map with:

  • :value (required) - The message value
  • :key (optional) - The message key
  • :timestamp (optional) - Message timestamp in milliseconds
  • :headers (optional) - List of {key, value} header tuples
Link to this function

build_record_headers(headers)

View Source
@spec build_record_headers(nil | [{String.t(), binary()}]) :: [
  Kayrock.RecordBatch.RecordHeader.t()
]

Builds RecordHeader structs from a list of {key, value} tuples.

Link to this function

build_request_v0_v2(request_template, opts)

View Source
@spec build_request_v0_v2(map(), Keyword.t()) :: map()

Builds a Produce request for V0-V2 using MessageSet format.

MessageSet format is used for Kafka < 0.11 and doesn't support headers.

Link to this function

build_request_v3_plus(request_template, opts)

View Source
@spec build_request_v3_plus(map(), Keyword.t()) :: map()

Builds a Produce request for V3+ using RecordBatch format.

RecordBatch format supports headers, per-record timestamps, and transactional producers.

Link to this function

compression_to_attributes(atom)

View Source
@spec compression_to_attributes(atom()) :: non_neg_integer()

Converts compression type atom to RecordBatch attributes byte.

Link to this function

extract_common_fields(opts)

View Source
@spec extract_common_fields(Keyword.t()) :: %{
  topic: String.t(),
  partition: non_neg_integer(),
  messages: [map()],
  acks: integer(),
  timeout: non_neg_integer(),
  compression: atom()
}

Extracts common fields from request options.