What is Chunking?
Chunking is a feature in Apache Pulsar that allows large messages to be split into smaller pieces (chunks) for transmission. This is particularly useful when your messages exceed the broker's maximum message size limit (typically 5MB by default), and you want to handle large payloads without hitting broker or network constraints.
When a producer sends a large message with chunking enabled, it automatically splits the message into multiple chunks. The consumer then reassembles these chunks back into the original message before delivering it to the application layer.
How Chunking Works
Producer Side
When a producer is configured with chunking enabled:
{:ok, producer} = Pulsar.start_producer(
"my-topic",
chunking_enabled: true,
max_message_size: 1024 # Split messages larger than 1KB
)Large messages are automatically:
- Split into chunks of
max_message_sizebytes - Each chunk is assigned a unique UUID and sequence number
- Chunks are sent to the broker individually
- Each chunk consumes one flow control permit
Consumer Side
The consumer automatically handles chunk assembly:
- Chunk Reception: Each chunk arrives as a separate broker message
- Buffering: Chunks are buffered in memory until all chunks for a message arrive
- Assembly: Once complete, chunks are reassembled into the original payload
- Delivery: The complete message is delivered to your
handle_message/2callback
def handle_message(%Pulsar.Message{} = message, state) do
# message.payload contains the complete, reassembled payload
# message.chunk_metadata indicates if this was a chunked message
case message.chunk_metadata do
%{chunked: true, complete: true, num_chunks: n} ->
IO.puts("Received complete chunked message with #{n} chunks")
%{chunked: true, complete: false, error: reason} ->
IO.puts("Received incomplete chunked message: #{reason}")
nil ->
IO.puts("Received non-chunked message")
end
{:ok, state}
endChunked Message Metadata
The Pulsar.Message struct provides information about chunked messages:
chunk_metadata- Contains chunking information:chunked: true- Indicates this was a chunked messagecomplete: true|false- Whether all chunks were receiveduuid- Unique identifier for the chunked messagenum_chunks- Total number of chunks (for complete messages)received_chunks- Number of chunks received (for incomplete messages)error- Reason for incompleteness (if incomplete)
For chunked messages, some fields contain data from all chunks:
command- List of commands from all chunksmetadata- List of metadata from all chunksbroker_metadata- List of broker metadata from all chunksmessage_id_to_ack- List of all chunk message IDs
For non-chunked messages, these fields contain single values.
Configuration Options
Warning
Batching and chunking cannot be enabled simultaneously on a producer. When chunking_enabled: true is set, batching is automatically disabled. These features are mutually exclusive because they represent different strategies for message transmission.
Producer Configuration
producers: [
my_producer: [
topic: "my-topic",
chunking_enabled: true, # Enable chunking (default: false)
max_message_size: 1024 * 1024 # Split messages larger than 1MB (default: 5MB)
]
]Consumer Configuration
consumers: [
my_consumer: [
topic: "my-topic",
subscription_name: "my-sub",
callback_module: MyConsumer,
# Chunking-related options:
max_pending_chunked_messages: 10, # Max concurrent chunked messages (default: 10)
expire_incomplete_chunked_message_after: 60_000, # Timeout in ms (default: 60s)
chunk_cleanup_interval: 30_000 # Cleanup check interval in ms (default: 30s)
]
]Configuration Details
max_pending_chunked_messages: Maximum number of incomplete chunked messages to buffer simultaneously. If this limit is reached and a new chunked message arrives, the oldest incomplete message is evicted and delivered as incomplete witherror: :queue_full.expire_incomplete_chunked_message_after: How long to wait for all chunks before timing out. Expired messages are delivered as incomplete witherror: :expired.chunk_cleanup_interval: How often to check for and clean up expired chunked messages. Set tonilto disable automatic cleanup (not recommended for production).
Handling Incomplete Chunks
Chunks may not complete for several reasons:
- Expiration: Not all chunks arrived within the timeout period
- Queue overflow: Too many concurrent chunked messages
Incomplete chunks are delivered to your callback with complete: false:
def handle_message(%Pulsar.Message{chunk_metadata: %{complete: false, error: reason, received_chunks: n}}, state) do
Logger.warning("Incomplete chunk: #{reason}, received #{n} chunks")
# Return error to trigger redelivery
{:error, :incomplete_chunk, state}
endFlow Control and Permits
Flow control permits are only decremented when messages are assembled and delivered to your callback:
- Individual chunks arriving: No permits are decremented yet
- Chunked message completed: Decrements N permits (where N = number of chunks)
- Chunked message expired/evicted: Decrements M permits (where M = number of chunks received)
- Non-chunked message: Decrements 1 permit
The Pulsar.Message.num_broker_messages/1 helper returns the correct permit count:
# Non-chunked message
Pulsar.Message.num_broker_messages(message) # => 1
# Complete chunked message with 3 chunks
Pulsar.Message.num_broker_messages(message) # => 3
# Incomplete chunked message with 2 out of 3 chunks received
Pulsar.Message.num_broker_messages(message) # => 2This ensures that flow control accurately reflects the number of broker messages consumed, regardless of whether messages are chunked or not.
Helper Functions
The Pulsar.Message module provides helpers for working with chunked messages:
# Check if message is chunked
Pulsar.Message.chunked?(message) # => true for chunked, false otherwise
# Check if chunked message is complete
Pulsar.Message.complete?(message) # => true if complete, false if incomplete
# Get maximum redelivery count (max across all chunks for chunked messages)
redelivery_count = Pulsar.Message.redelivery_count(message)
# Get number of broker messages consumed (for flow control)
num_permits = Pulsar.Message.num_broker_messages(message)Example: Complete Chunked Message Flow
# Producer sends large message
{:ok, producer} = Pulsar.start_producer(
"large-files",
chunking_enabled: true,
max_message_size: 1024 * 1024 # 1MB chunks
)
# Send 5MB file
large_payload = File.read!("large_file.dat") # 5MB
{:ok, _msg_id} = Pulsar.send(producer, large_payload)
# Producer automatically splits into 5 chunks
# Consumer receives and assembles
defmodule MyConsumer do
use Pulsar.Consumer.Callback
def handle_message(%Pulsar.Message{} = message, state) do
if Pulsar.Message.chunked?(message) and Pulsar.Message.complete?(message) do
# message.payload contains complete 5MB file
num_chunks = message.chunk_metadata.num_chunks
IO.puts("Received complete file in #{num_chunks} chunks")
process_file(message.payload)
{:ok, state}
else
# Regular non-chunked message or incomplete chunked message
{:ok, state}
end
end
endTelemetry Events
The consumer emits telemetry events for chunk lifecycle:
[:pulsar, :consumer, :chunk, :received]- When a chunk is received[:pulsar, :consumer, :chunk, :complete]- When all chunks are assembled[:pulsar, :consumer, :chunk, :discarded]- When a chunked message is discarded[:pulsar, :consumer, :chunk, :expired]- When a chunked message expires
See the Telemetry documentation for more details on monitoring chunked messages.