Pulsar.Consumer.ChunkedMessageContext (Pulsar v2.8.6)

Copy Markdown View Source

Manages chunked message assembly for Pulsar consumers.

This module handles the buffering and reassembly of message chunks that arrive from the broker. It tracks multiple concurrent chunked messages and accepts chunks in any order, assembling the complete payload once all chunks are received.

Summary

Functions

Returns the age of the context in milliseconds.

Returns a list of all message IDs for chunks received so far.

Assembles all chunks into a complete payload.

Checks if all chunks have been received.

Checks if a context has expired based on the given threshold.

Creates a new chunked message context from any chunk.

Pops all expired contexts from a map.

Pops the oldest context from a map.

Types

t()

@type t() :: %Pulsar.Consumer.ChunkedMessageContext{
  broker_metadatas: [term()],
  chunk_message_ids: %{required(non_neg_integer()) => term()},
  chunks: %{required(non_neg_integer()) => binary()},
  commands: [term()],
  created_at: integer(),
  first_chunk_message_id: term(),
  last_chunk_message_id: term(),
  metadatas: [term()],
  num_chunks_from_msg: non_neg_integer(),
  received_chunks: non_neg_integer(),
  total_chunk_msg_size: non_neg_integer(),
  uuid: String.t()
}

Functions

add_chunk(ctx, command, metadata, payload, broker_metadata)

@spec add_chunk(
  t(),
  command :: term(),
  metadata :: term(),
  payload :: binary(),
  broker_metadata :: term()
) :: {:ok, t()}

Adds a chunk to an existing context.

Accepts chunks in any order. If a chunk with the same chunk_id already exists, it is replaced (idempotent). Returns {:ok, updated_context}.

age_ms(ctx)

@spec age_ms(t()) :: non_neg_integer()

Returns the age of the context in milliseconds.

all_message_ids(ctx)

@spec all_message_ids(t()) :: [term()]

Returns a list of all message IDs for chunks received so far.

assemble_payload(ctx)

@spec assemble_payload(t()) :: binary()

Assembles all chunks into a complete payload.

Returns the binary payload with all chunks concatenated in order.

complete?(ctx)

@spec complete?(t()) :: boolean()

Checks if all chunks have been received.

expired?(ctx, expiration_threshold_ms)

@spec expired?(t(), non_neg_integer()) :: boolean()

Checks if a context has expired based on the given threshold.

Parameters

  • ctx - The chunked message context
  • expiration_threshold_ms - Maximum age in milliseconds

Returns true if the context is older than the threshold.

new(command, metadata, payload, broker_metadata)

@spec new(
  command :: term(),
  metadata :: term(),
  payload :: binary(),
  broker_metadata :: term()
) ::
  {:ok, t()}

Creates a new chunked message context from any chunk.

Accepts the first chunk received for a chunked message, regardless of its chunk_id. Returns {:ok, context} with the new context.

pop_expired(contexts, expiration_threshold_ms)

@spec pop_expired(%{optional(String.t()) => t()}, non_neg_integer()) ::
  {[{String.t(), t()}], %{optional(String.t()) => t()}}

Pops all expired contexts from a map.

Returns a tuple {expired_list, remaining_map} where:

  • expired_list is a list of {uuid, context} tuples that have expired
  • remaining_map is a map of contexts that have not expired

pop_oldest(contexts)

@spec pop_oldest(%{optional(String.t()) => t()}) ::
  {{String.t(), t()}, %{optional(String.t()) => t()}}
  | {nil, %{optional(String.t()) => t()}}

Pops the oldest context from a map.

Returns a tuple {{uuid, context}, remaining_map} with the oldest context and remaining contexts, or {nil, contexts} if the map is empty.