View Source KafkaEx.Protocol.Kayrock.Produce.RequestHelpers (kafka_ex v1.0.0-rc.1)
Shared helper functions for building Produce requests across all versions.
Handles the complexity of different message formats:
- V0-V2: MessageSet format (legacy)
- V3+: RecordBatch format (modern with headers support)
Summary
Functions
Builds a MessageSet from a list of messages (V0-V2 format).
Builds a RecordBatch from a list of messages (V3+ format).
Builds RecordHeader structs from a list of {key, value} tuples.
Builds a Produce request for V0-V2 using MessageSet format.
Builds a Produce request for V3+ using RecordBatch format.
Converts compression type atom to RecordBatch attributes byte.
Extracts common fields from request options.
Functions
@spec build_message_set([map()], atom()) :: Kayrock.MessageSet.t()
Builds a MessageSet from a list of messages (V0-V2 format).
Each message in the list should be a map with:
:value(required) - The message value:key(optional) - The message key
Note: Timestamps and headers are NOT supported in MessageSet format.
@spec build_record_batch([map()], atom()) :: Kayrock.RecordBatch.t()
Builds a RecordBatch from a list of messages (V3+ format).
Each message in the list should be a map with:
:value(required) - The message value:key(optional) - The message key:timestamp(optional) - Message timestamp in milliseconds:headers(optional) - List of {key, value} header tuples
@spec build_record_headers(nil | [{String.t(), binary()}]) :: [ Kayrock.RecordBatch.RecordHeader.t() ]
Builds RecordHeader structs from a list of {key, value} tuples.
Builds a Produce request for V0-V2 using MessageSet format.
MessageSet format is used for Kafka < 0.11 and doesn't support headers.
Builds a Produce request for V3+ using RecordBatch format.
RecordBatch format supports headers, per-record timestamps, and transactional producers.
@spec compression_to_attributes(atom()) :: non_neg_integer()
Converts compression type atom to RecordBatch attributes byte.
@spec extract_common_fields(Keyword.t()) :: %{ topic: String.t(), partition: non_neg_integer(), messages: [map()], acks: integer(), timeout: non_neg_integer(), compression: atom() }
Extracts common fields from request options.