Streaming Guide
Copy MarkdownThis guide covers how to use EncodingRs.Decoder for processing large files and streaming data in multibyte encodings like Shift_JIS, GBK, Big5, and EUC-JP.
The Problem
When processing files in chunks, multibyte characters can be split across chunk boundaries:
File content: "Hello世界" (in Shift_JIS)
Bytes: <<72, 101, 108, 108, 111, 144, 162, 138, 69>>
↑
Chunk boundary here
Chunk 1: <<72, 101, 108, 108, 111, 144>> → "Hello" + incomplete byte
Chunk 2: <<162, 138, 69>> → incomplete byte + "界"Using one-shot EncodingRs.decode/2 on each chunk independently produces corrupted output with replacement characters (�).
The Solution
EncodingRs.Decoder maintains state between chunks, buffering incomplete byte sequences until they can be completed.
Real-World Examples
Processing a Large CSV File
defmodule CsvProcessor do
def process_shift_jis_csv(path) do
path
|> File.stream!([], 64 * 1024) # 64KB chunks
|> EncodingRs.Decoder.stream("shift_jis")
|> Enum.join()
|> String.split("\n")
|> Enum.map(&String.split(&1, ","))
end
endStreaming HTTP Response
defmodule HttpClient do
def fetch_and_decode(url, encoding) do
# Using Req or similar HTTP client with streaming
Req.get!(url, into: fn {:data, chunk}, acc ->
{:cont, [chunk | acc]}
end).body
|> Enum.reverse()
|> EncodingRs.Decoder.stream(encoding)
|> Enum.join()
end
endProcessing with Error Tracking
defmodule DataImporter do
require Logger
def import_with_validation(path, encoding) do
{content, had_errors} =
path
|> File.stream!([], 8192)
|> EncodingRs.Decoder.stream_with_errors(encoding)
|> Enum.reduce({"", false}, fn {chunk, errors}, {acc, had_any} ->
if errors do
Logger.warning("Invalid bytes detected in chunk")
end
{acc <> chunk, had_any or errors}
end)
if had_errors do
Logger.warning("File contained invalid byte sequences")
end
content
end
endGenServer for Continuous Stream Processing
defmodule StreamProcessor do
use GenServer
def start_link(encoding) do
GenServer.start_link(__MODULE__, encoding)
end
def process_chunk(pid, chunk, is_last \\ false) do
GenServer.call(pid, {:process, chunk, is_last})
end
@impl true
def init(encoding) do
{:ok, decoder} = EncodingRs.Decoder.new(encoding)
{:ok, %{decoder: decoder, buffer: ""}}
end
@impl true
def handle_call({:process, chunk, is_last}, _from, state) do
{:ok, output, had_errors} =
EncodingRs.Decoder.decode_chunk(state.decoder, chunk, is_last)
{:reply, {:ok, output, had_errors}, state}
end
end
# Usage
{:ok, pid} = StreamProcessor.start_link("gbk")
{:ok, out1, _} = StreamProcessor.process_chunk(pid, chunk1)
{:ok, out2, _} = StreamProcessor.process_chunk(pid, chunk2)
{:ok, out3, _} = StreamProcessor.process_chunk(pid, chunk3, true)Converting File Encoding
defmodule EncodingConverter do
@doc """
Convert a file from one encoding to another.
"""
def convert(input_path, output_path, from_encoding, to_encoding) do
input_path
|> File.stream!([], 64 * 1024)
|> EncodingRs.Decoder.stream(from_encoding)
|> Stream.map(fn chunk ->
{:ok, encoded} = EncodingRs.encode(chunk, to_encoding)
encoded
end)
|> Stream.into(File.stream!(output_path))
|> Stream.run()
end
end
# Convert Shift_JIS to UTF-8
EncodingConverter.convert("input.txt", "output.txt", "shift_jis", "utf-8")Choosing Chunk Size
The chunk size affects memory usage and performance:
| Chunk Size | Memory | Performance | Use Case |
|---|---|---|---|
| 4KB | Low | More overhead | Memory-constrained |
| 64KB | Medium | Good balance | General purpose |
| 256KB+ | Higher | Less overhead | Large files, fast storage |
# Memory-constrained environment
File.stream!(path, [], 4096)
# General purpose (recommended)
File.stream!(path, [], 64 * 1024)
# High-throughput processing
File.stream!(path, [], 256 * 1024)When to Use Streaming vs One-Shot
| Scenario | Approach |
|---|---|
| Small files (<1MB) | EncodingRs.decode/2 |
| Large files | EncodingRs.Decoder.stream/2 |
| Files > 100MB | EncodingRs.Decoder.stream/2 (avoids input size limit) |
| Network streams | EncodingRs.Decoder |
| Unknown size | EncodingRs.Decoder.stream/2 |
| Memory-constrained | EncodingRs.Decoder.stream/2 |
| Untrusted input | EncodingRs.Decoder.stream/2 (bounded chunk sizes) |
Input Size Limit
One-shot operations (EncodingRs.decode/2, EncodingRs.encode/2) enforce a configurable maximum input size (default 100MB) to prevent excessive memory allocation. Inputs exceeding this limit return {:error, :input_too_large}.
The streaming decoder is not affected by this limit at the file level because each chunk is validated independently. As long as your chunk size is below the limit (and it should be — 64KB to 256KB is typical), the streaming API can process files of any size.
If you need to one-shot decode inputs larger than 100MB, you can adjust the limit at runtime:
# In config/runtime.exs
config :encoding_rs, max_input_size: 500 * 1024 * 1024
# Or disable entirely for trusted inputs
config :encoding_rs, max_input_size: :infinitySee EncodingRs.max_input_size/0 for details.
Common Encodings
| Region | Common Encodings |
|---|---|
| Japanese | shift_jis, euc-jp, iso-2022-jp |
| Chinese (Simplified) | gbk, gb18030 |
| Chinese (Traditional) | big5 |
| Korean | euc-kr |
| Western European | windows-1252, iso-8859-1 |
| Cyrillic | windows-1251, koi8-r |
Tips
Always flush: Pass
is_last: truefor the final chunk to flush any buffered bytes.Don't share decoders: Each decoder maintains mutable state. Don't share across processes.
Check for errors: Use
stream_with_errors/2if you need to know about invalid byte sequences.BOM detection: For files with BOMs, detect and strip first:
content = File.read!(path) {encoding, data} = case EncodingRs.detect_and_strip_bom(content) do {:ok, enc, rest} -> {enc, rest} {:error, :no_bom} -> {"utf-8", content} # default end {:ok, decoded} = EncodingRs.decode(data, encoding)