Msgpack.StreamDecoder (msgpack_elixir v2.0.0)
Decodes a stream of MessagePack binaries into a stream of Elixir terms.
This module is designed to handle large sequences of MessagePack objects that arrive in chunks, such as from a network socket or a large file.
It incrementally parses the incoming binaries and emits complete Elixir terms as they are decoded.
Capabilities
- Buffering: The module internally buffers data, allowing a single MessagePack object to be split across multiple chunks in the input stream.
- Error Handling: If the stream finishes while an object is only
partially decoded, the last element emitted by the stream will be the tuple
{:error, :unexpected_eof}
.
This module can be used together with Msgpack.StreamEncoder
to create a lazy
serialization and deserialization pipeline.
Summary
Types
Options passed to the decoder for each object.
A stream that yields decoded Elixir terms or a final error tuple.
Functions
Lazily decodes an enumerable of MessagePack binaries into a stream of Elixir terms.
Types
@type opts_t() :: keyword()
Options passed to the decoder for each object.
@type t() :: Stream.t(term() | {:error, :unexpected_eof})
A stream that yields decoded Elixir terms or a final error tuple.
The stream will produce any t:term/0 that can be decoded from the input.
If the input enumerable finishes while a term is only partially decoded, the last element in the stream will be {:error, :unexpected_eof}.
Functions
@spec decode(Enumerable.t(binary()), opts_t()) :: t()
Lazily decodes an enumerable of MessagePack binaries into a stream of Elixir terms.
Parameters
enumerable
: AnEnumerable
that yields chunks of a MessagePack binary stream (e.g.,f:File.stream/3
or a list of binaries).opts
: A keyword list of options passed to the underlying decoder.
Return Value
Returns a lazy Stream
that emits Elixir terms as they are decoded.
If the input stream ends with incomplete data, the last item emitted will be
an error tuple {:error, :unexpected_eof}
.
Options
This function accepts the same options as Msgpack.decode/2
, which are
applied to the decoding of each object in the stream:
:max_depth
: Sets a limit on the nesting level of arrays and maps. Defaults to100
.:max_byte_size
: Sets a limit on the declared byte size of any single string, binary, array, or map. Defaults to10_000_000
(10MB).
Examples
Standard Usage
iex> objects = [1, "elixir", true]
iex> stream = Enum.map(objects, &Msgpack.encode!/1)
iex> Msgpack.StreamDecoder.decode(stream) |> Enum.to_list()
[1, "elixir", true]
Handling Incomplete Streams
iex> incomplete_stream = [<<0x91>>] # Array header + no elements
iex> Msgpack.StreamDecoder.decode(incomplete_stream) |> Enum.to_list()
[{:error, :unexpected_eof}]