ETL pipeline: prepare -> validate -> transform -> route.
Demonstrates a full data ETL flow with hot-reloadable schemas and per-worker audit logs that prove process isolation beats free-threaded Python:
- prepare (beam) -- reads the current validation schema from ETS and packages each row for Python validation, including schema version
- validate (python) -- validates rows against the schema using
csv_transformer.validate_batch; records each decision in a per-worker audit log - transform (beam) -- applies rename, cast, and default rules to valid rows; passes invalid rows through unchanged
- route (router) -- sends invalid rows to
:invalidand all remaining valid rows to:default
After both batches, the demo queries per-worker audit stats from Python to prove that each worker maintained a consistent, uncorrupted audit log in its own process -- something free-threaded Python cannot guarantee when multiple threads share the same list and counter.
The demo shows schema hot-reload: batch 1 runs against v1 (lenient), then the schema is swapped to v2 (stricter) via a serialized store write, and batch 2 demonstrates that previously-valid rows now fail.
Summary
Functions
Apply transformation rules to valid rows; pass invalid rows through.
Package a raw row with the current schema for Python validation.
Run the data ETL demo, printing formatted results to stdout.
Functions
Apply transformation rules to valid rows; pass invalid rows through.
Valid rows get rename, cast, and default rules applied in Elixir.
Invalid rows retain their validation result payload unchanged so
the router can send them to the :invalid output.
Package a raw row with the current schema for Python validation.
Reads the active schema from the ETS-backed store (lock-free) and
returns the map expected by csv_transformer.validate_batch.
Includes the schema version so the Python audit log can track which
version each row was validated against.
Run the data ETL demo, printing formatted results to stdout.
Demonstrates:
- Batch validation against v1 schema (lenient) with 16 rows
- Hot-reload of the schema to v2 (stricter constraints)
- Re-validation showing rows that now fail under v2 with 16 rows
- Per-worker audit stats proving process isolation keeps logs consistent
- Why this matters vs. free-threaded Python