Manages chunked message assembly for Pulsar consumers.
This module handles the buffering and reassembly of message chunks that arrive from the broker. It tracks multiple concurrent chunked messages and accepts chunks in any order, assembling the complete payload once all chunks are received.
Summary
Functions
Adds a chunk to an existing context.
Returns the age of the context in milliseconds.
Returns a list of all message IDs for chunks received so far.
Assembles all chunks into a complete payload.
Checks if all chunks have been received.
Checks if a context has expired based on the given threshold.
Creates a new chunked message context from any chunk.
Pops all expired contexts from a map.
Pops the oldest context from a map.
Types
@type t() :: %Pulsar.Consumer.ChunkedMessageContext{ broker_metadatas: [term()], chunk_message_ids: %{required(non_neg_integer()) => term()}, chunks: %{required(non_neg_integer()) => binary()}, commands: [term()], created_at: integer(), first_chunk_message_id: term(), last_chunk_message_id: term(), metadatas: [term()], num_chunks_from_msg: non_neg_integer(), received_chunks: non_neg_integer(), total_chunk_msg_size: non_neg_integer(), uuid: String.t() }
Functions
@spec add_chunk( t(), command :: term(), metadata :: term(), payload :: binary(), broker_metadata :: term() ) :: {:ok, t()}
Adds a chunk to an existing context.
Accepts chunks in any order. If a chunk with the same chunk_id already exists,
it is replaced (idempotent). Returns {:ok, updated_context}.
@spec age_ms(t()) :: non_neg_integer()
Returns the age of the context in milliseconds.
Returns a list of all message IDs for chunks received so far.
Assembles all chunks into a complete payload.
Returns the binary payload with all chunks concatenated in order.
Checks if all chunks have been received.
@spec expired?(t(), non_neg_integer()) :: boolean()
Checks if a context has expired based on the given threshold.
Parameters
ctx- The chunked message contextexpiration_threshold_ms- Maximum age in milliseconds
Returns true if the context is older than the threshold.
@spec new( command :: term(), metadata :: term(), payload :: binary(), broker_metadata :: term() ) :: {:ok, t()}
Creates a new chunked message context from any chunk.
Accepts the first chunk received for a chunked message, regardless of its chunk_id.
Returns {:ok, context} with the new context.
@spec pop_expired(%{optional(String.t()) => t()}, non_neg_integer()) :: {[{String.t(), t()}], %{optional(String.t()) => t()}}
Pops all expired contexts from a map.
Returns a tuple {expired_list, remaining_map} where:
expired_listis a list of{uuid, context}tuples that have expiredremaining_mapis a map of contexts that have not expired
@spec pop_oldest(%{optional(String.t()) => t()}) :: {{String.t(), t()}, %{optional(String.t()) => t()}} | {nil, %{optional(String.t()) => t()}}
Pops the oldest context from a map.
Returns a tuple {{uuid, context}, remaining_map} with the oldest context and
remaining contexts, or {nil, contexts} if the map is empty.