Streaming Guide

Copy Markdown

This guide covers how to use EncodingRs.Decoder for processing large files and streaming data in multibyte encodings like Shift_JIS, GBK, Big5, and EUC-JP.

The Problem

When processing files in chunks, multibyte characters can be split across chunk boundaries:

File content: "Hello世界" (in Shift_JIS)
Bytes: <<72, 101, 108, 108, 111, 144, 162, 138, 69>>
                                   
                            Chunk boundary here

Chunk 1: <<72, 101, 108, 108, 111, 144>>   "Hello" + incomplete byte
Chunk 2: <<162, 138, 69>>                   incomplete byte + "界"

Using one-shot EncodingRs.decode/2 on each chunk independently produces corrupted output with replacement characters ().

The Solution

EncodingRs.Decoder maintains state between chunks, buffering incomplete byte sequences until they can be completed.

Real-World Examples

Processing a Large CSV File

defmodule CsvProcessor do
  def process_shift_jis_csv(path) do
    path
    |> File.stream!([], 64 * 1024)  # 64KB chunks
    |> EncodingRs.Decoder.stream("shift_jis")
    |> Enum.join()
    |> String.split("\n")
    |> Enum.map(&String.split(&1, ","))
  end
end

Streaming HTTP Response

defmodule HttpClient do
  def fetch_and_decode(url, encoding) do
    # Using Req or similar HTTP client with streaming
    Req.get!(url, into: fn {:data, chunk}, acc ->
      {:cont, [chunk | acc]}
    end).body
    |> Enum.reverse()
    |> EncodingRs.Decoder.stream(encoding)
    |> Enum.join()
  end
end

Processing with Error Tracking

defmodule DataImporter do
  require Logger

  def import_with_validation(path, encoding) do
    {content, had_errors} =
      path
      |> File.stream!([], 8192)
      |> EncodingRs.Decoder.stream_with_errors(encoding)
      |> Enum.reduce({"", false}, fn {chunk, errors}, {acc, had_any} ->
        if errors do
          Logger.warning("Invalid bytes detected in chunk")
        end
        {acc <> chunk, had_any or errors}
      end)

    if had_errors do
      Logger.warning("File contained invalid byte sequences")
    end

    content
  end
end

GenServer for Continuous Stream Processing

defmodule StreamProcessor do
  use GenServer

  def start_link(encoding) do
    GenServer.start_link(__MODULE__, encoding)
  end

  def process_chunk(pid, chunk, is_last \\ false) do
    GenServer.call(pid, {:process, chunk, is_last})
  end

  @impl true
  def init(encoding) do
    {:ok, decoder} = EncodingRs.Decoder.new(encoding)
    {:ok, %{decoder: decoder, buffer: ""}}
  end

  @impl true
  def handle_call({:process, chunk, is_last}, _from, state) do
    {:ok, output, had_errors} =
      EncodingRs.Decoder.decode_chunk(state.decoder, chunk, is_last)

    {:reply, {:ok, output, had_errors}, state}
  end
end

# Usage
{:ok, pid} = StreamProcessor.start_link("gbk")
{:ok, out1, _} = StreamProcessor.process_chunk(pid, chunk1)
{:ok, out2, _} = StreamProcessor.process_chunk(pid, chunk2)
{:ok, out3, _} = StreamProcessor.process_chunk(pid, chunk3, true)

Converting File Encoding

defmodule EncodingConverter do
  @doc """
  Convert a file from one encoding to another.
  """
  def convert(input_path, output_path, from_encoding, to_encoding) do
    input_path
    |> File.stream!([], 64 * 1024)
    |> EncodingRs.Decoder.stream(from_encoding)
    |> Stream.map(fn chunk ->
      {:ok, encoded} = EncodingRs.encode(chunk, to_encoding)
      encoded
    end)
    |> Stream.into(File.stream!(output_path))
    |> Stream.run()
  end
end

# Convert Shift_JIS to UTF-8
EncodingConverter.convert("input.txt", "output.txt", "shift_jis", "utf-8")

Choosing Chunk Size

The chunk size affects memory usage and performance:

Chunk SizeMemoryPerformanceUse Case
4KBLowMore overheadMemory-constrained
64KBMediumGood balanceGeneral purpose
256KB+HigherLess overheadLarge files, fast storage
# Memory-constrained environment
File.stream!(path, [], 4096)

# General purpose (recommended)
File.stream!(path, [], 64 * 1024)

# High-throughput processing
File.stream!(path, [], 256 * 1024)

When to Use Streaming vs One-Shot

ScenarioApproach
Small files (<1MB)EncodingRs.decode/2
Large filesEncodingRs.Decoder.stream/2
Files > 100MBEncodingRs.Decoder.stream/2 (avoids input size limit)
Network streamsEncodingRs.Decoder
Unknown sizeEncodingRs.Decoder.stream/2
Memory-constrainedEncodingRs.Decoder.stream/2
Untrusted inputEncodingRs.Decoder.stream/2 (bounded chunk sizes)

Input Size Limit

One-shot operations (EncodingRs.decode/2, EncodingRs.encode/2) enforce a configurable maximum input size (default 100MB) to prevent excessive memory allocation. Inputs exceeding this limit return {:error, :input_too_large}.

The streaming decoder is not affected by this limit at the file level because each chunk is validated independently. As long as your chunk size is below the limit (and it should be — 64KB to 256KB is typical), the streaming API can process files of any size.

If you need to one-shot decode inputs larger than 100MB, you can adjust the limit at runtime:

# In config/runtime.exs
config :encoding_rs, max_input_size: 500 * 1024 * 1024

# Or disable entirely for trusted inputs
config :encoding_rs, max_input_size: :infinity

See EncodingRs.max_input_size/0 for details.

Common Encodings

RegionCommon Encodings
Japaneseshift_jis, euc-jp, iso-2022-jp
Chinese (Simplified)gbk, gb18030
Chinese (Traditional)big5
Koreaneuc-kr
Western Europeanwindows-1252, iso-8859-1
Cyrillicwindows-1251, koi8-r

Tips

  1. Always flush: Pass is_last: true for the final chunk to flush any buffered bytes.

  2. Don't share decoders: Each decoder maintains mutable state. Don't share across processes.

  3. Check for errors: Use stream_with_errors/2 if you need to know about invalid byte sequences.

  4. BOM detection: For files with BOMs, detect and strip first:

    content = File.read!(path)
    {encoding, data} = case EncodingRs.detect_and_strip_bom(content) do
      {:ok, enc, rest} -> {enc, rest}
      {:error, :no_bom} -> {"utf-8", content}  # default
    end
    {:ok, decoded} = EncodingRs.decode(data, encoding)