Streaming
View SourceThis guide covers working with Python generators from Erlang.
Overview
Python generators allow processing large datasets or infinite sequences efficiently by yielding values one at a time. erlang_python supports streaming these values back to Erlang.
Generator Expressions
The simplest way to stream is with generator expressions:
%% Stream squares of numbers 0-9
{ok, Squares} = py:stream_eval(<<"(x**2 for x in range(10))">>).
%% Squares = [0,1,4,9,16,25,36,49,64,81]
%% Stream uppercase characters
{ok, Upper} = py:stream_eval(<<"(c.upper() for c in 'hello')">>).
%% Upper = [<<"H">>,<<"E">>,<<"L">>,<<"L">>,<<"O">>]
%% Stream filtered values
{ok, Evens} = py:stream_eval(<<"(x for x in range(20) if x % 2 == 0)">>).
%% Evens = [0,2,4,6,8,10,12,14,16,18]Iterator Objects
Any Python iterator can be streamed:
%% Stream from range
{ok, Numbers} = py:stream_eval(<<"iter(range(5))">>).
%% Numbers = [0,1,2,3,4]
%% Stream dictionary items
{ok, Items} = py:stream_eval(<<"iter({'a': 1, 'b': 2}.items())">>).
%% Items = [{<<"a">>, 1}, {<<"b">>, 2}]Generator Functions
Define generator functions with yield:
%% Define a generator function
ok = py:exec(<<"
def fibonacci(n):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
">>).
%% Stream from it
%% Note: Functions defined with exec are local to the worker that executes them.
%% Subsequent calls may go to different workers in the pool.
{ok, Fib} = py:stream('__main__', fibonacci, [10]).
%% Fib = [0,1,1,2,3,5,8,13,21,34]For reliable inline generators, use lambda with walrus operator (Python 3.8+):
%% Fibonacci using inline lambda - works reliably across workers
{ok, Fib} = py:stream_eval(<<"(lambda: ((fib := [0, 1]), [fib.append(fib[-1] + fib[-2]) for _ in range(8)], iter(fib))[-1])()">>).
%% Fib = [0,1,1,2,3,5,8,13,21,34]Streaming Protocol
Internally, streaming uses these messages:
{py_chunk, Ref, Value} %% Each yielded value
{py_end, Ref} %% Generator exhausted
{py_error, Ref, Error} %% Exception occurredYou can build custom streaming consumers:
start_stream(Code) ->
Ref = make_ref(),
py_pool:request({stream_eval, Ref, self(), Code, #{}}),
process_stream(Ref).
process_stream(Ref) ->
receive
{py_chunk, Ref, Value} ->
io:format("Got: ~p~n", [Value]),
process_stream(Ref);
{py_end, Ref} ->
io:format("Done~n");
{py_error, Ref, Error} ->
io:format("Error: ~p~n", [Error])
after 30000 ->
io:format("Timeout~n")
end.Memory Considerations
- Values are collected into a list by
stream_eval/1,2 - For large datasets, consider processing chunks as they arrive
- Generators are garbage collected after exhaustion
Use Cases
Data Processing Pipelines
%% Process file lines (if defined in Python)
{ok, Lines} = py:stream(mymodule, read_lines, [<<"data.txt">>]).
%% Transform each line
Results = [process_line(L) || L <- Lines].Infinite Sequences
%% Define infinite counter
ok = py:exec(<<"
def counter():
n = 0
while True:
yield n
n += 1
">>).
%% Take first 100 (use your own take function)
%% Can't use stream/3 directly for infinite - need custom handlingBatch Processing
%% Process in batches
ok = py:exec(<<"
def batches(data, size):
for i in range(0, len(data), size):
yield data[i:i+size]
">>).