Msgpack.StreamDecoder (msgpack_elixir v2.0.0)

Decodes a stream of MessagePack binaries into a stream of Elixir terms.

This module is designed to handle large sequences of MessagePack objects that arrive in chunks, such as from a network socket or a large file.

It incrementally parses the incoming binaries and emits complete Elixir terms as they are decoded.

Capabilities

  • Buffering: The module internally buffers data, allowing a single MessagePack object to be split across multiple chunks in the input stream.
  • Error Handling: If the stream finishes while an object is only partially decoded, the last element emitted by the stream will be the tuple {:error, :unexpected_eof}.

This module can be used together with Msgpack.StreamEncoder to create a lazy serialization and deserialization pipeline.

Summary

Types

Options passed to the decoder for each object.

t()

A stream that yields decoded Elixir terms or a final error tuple.

Functions

Lazily decodes an enumerable of MessagePack binaries into a stream of Elixir terms.

Types

opts_t()

@type opts_t() :: keyword()

Options passed to the decoder for each object.

t()

@type t() :: Stream.t(term() | {:error, :unexpected_eof})

A stream that yields decoded Elixir terms or a final error tuple.

The stream will produce any t:term/0 that can be decoded from the input.

If the input enumerable finishes while a term is only partially decoded, the last element in the stream will be {:error, :unexpected_eof}.

Functions

decode(enumerable, opts \\ [])

@spec decode(Enumerable.t(binary()), opts_t()) :: t()

Lazily decodes an enumerable of MessagePack binaries into a stream of Elixir terms.

Parameters

  • enumerable: An Enumerable that yields chunks of a MessagePack binary stream (e.g., f:File.stream/3 or a list of binaries).
  • opts: A keyword list of options passed to the underlying decoder.

Return Value

Returns a lazy Stream that emits Elixir terms as they are decoded.

If the input stream ends with incomplete data, the last item emitted will be an error tuple {:error, :unexpected_eof}.

Options

This function accepts the same options as Msgpack.decode/2, which are applied to the decoding of each object in the stream:

  • :max_depth: Sets a limit on the nesting level of arrays and maps. Defaults to 100.
  • :max_byte_size: Sets a limit on the declared byte size of any single string, binary, array, or map. Defaults to 10_000_000 (10MB).

Examples

Standard Usage

iex> objects = [1, "elixir", true]
iex> stream = Enum.map(objects, &Msgpack.encode!/1)
iex> Msgpack.StreamDecoder.decode(stream) |> Enum.to_list()
[1, "elixir", true]

Handling Incomplete Streams

iex> incomplete_stream = [<<0x91>>] # Array header + no elements
iex> Msgpack.StreamDecoder.decode(incomplete_stream) |> Enum.to_list()
[{:error, :unexpected_eof}]