Thin payload helper for SnakeBridge that delegates execution to Snakepit.
This module is compile-time agnostic and focuses on building payloads that match the Snakepit Prime runtime contract.
Summary
Functions
Call a Python function.
Calls any Python function dynamically without requiring generated bindings.
Clears the auto-session for the current process.
Returns the current session ID (explicit or auto-generated).
Retrieves a module-level attribute (constant, class, etc.).
Releases and clears the auto-session for the current process.
Stream results from a Python generator/iterator.
Stream results from a Python generator using dynamic dispatch.
Gets the length of a Python iterable (if supported).
Gets the next item from a Python iterator or generator.
Types
@type args() :: list()
@type error_reason() :: Snakepit.Error.t() | Exception.t()
@type module_ref() :: module()
@type opts() :: keyword()
Functions
@spec call(module_ref() | String.t(), function_name() | String.t(), args(), opts()) :: {:ok, term()} | {:error, error_reason()}
Call a Python function.
Parameters
module- Either a generated SnakeBridge module atom OR a Python module path stringfunction- Function name (atom or string)args- Positional arguments (list)opts- Options including kwargs, :idempotent, :runtime (e.g.,:pool_name,:affinity)
Examples
# With generated module
{:ok, result} = SnakeBridge.Runtime.call(Numpy, :mean, [[1,2,3]])
# With string module path (dynamic)
{:ok, result} = SnakeBridge.Runtime.call("numpy", "mean", [[1,2,3]])
{:ok, result} = SnakeBridge.Runtime.call("math", :sqrt, [16])
@spec call_class(module_ref(), function_name(), args(), opts()) :: {:ok, term()} | {:error, error_reason()}
@spec call_dynamic(String.t(), function_name(), args(), opts()) :: {:ok, term()} | {:error, error_reason()}
Calls any Python function dynamically without requiring generated bindings.
This is the no-codegen escape hatch for calling functions that were not scanned during compilation.
@spec call_method(SnakeBridge.Ref.t() | map(), function_name(), args(), opts()) :: {:ok, term()} | {:error, error_reason()}
@spec clear_auto_session() :: :ok
Clears the auto-session for the current process.
Useful for testing or when you want to force a new session.
Does NOT release the session on the Python side - use release_auto_session/0 for that.
@spec current_session() :: String.t()
Returns the current session ID (explicit or auto-generated).
This is useful for debugging or when you need to know which session is active.
@spec get_attr(SnakeBridge.Ref.t(), atom() | String.t(), opts()) :: {:ok, term()} | {:error, error_reason()}
@spec get_module_attr(module_ref() | String.t(), atom() | String.t(), opts()) :: {:ok, term()} | {:error, error_reason()}
Retrieves a module-level attribute (constant, class, etc.).
Parameters
module- Either a generated SnakeBridge module atom OR a Python module path stringattr- Attribute name (atom or string)opts- Runtime options
Examples
# Get math.pi
{:ok, pi} = SnakeBridge.Runtime.get_module_attr("math", "pi")
{:ok, pi} = SnakeBridge.Runtime.get_module_attr("math", :pi)
@spec release_auto_session() :: :ok
Releases and clears the auto-session for the current process.
This releases all refs associated with the session on both Elixir and Python sides.
@spec release_ref(SnakeBridge.Ref.t(), opts()) :: :ok | {:error, error_reason()}
@spec release_session(String.t(), opts()) :: :ok | {:error, error_reason()}
@spec set_attr(SnakeBridge.Ref.t(), atom() | String.t(), term(), opts()) :: {:ok, term()} | {:error, error_reason()}
@spec stream( module_ref() | String.t(), function_name() | String.t(), args(), opts(), (term() -> any()) ) :: :ok | {:ok, :done} | {:error, error_reason()}
Stream results from a Python generator/iterator.
Parameters
module- Either a generated SnakeBridge module atom OR a Python module path stringfunction- Function name (atom or string)args- Positional arguments (list)opts- Options including kwargscallback- Function called for each streamed item
Performance
When called with a generated module atom, this function can use Snakepit's native gRPC streaming for efficient data transfer.
When called with a string module path, this delegates to stream_dynamic/5
which uses RPC-per-item iteration. See stream_dynamic/5 docs for performance
guidance on large streams.
Examples
# With string module path (dynamic, RPC-per-item)
SnakeBridge.Runtime.stream("pandas", "read_csv", ["file.csv"], [chunksize: 100], fn chunk ->
process(chunk)
end)
# With generated module (native streaming when available)
SnakeBridge.Runtime.stream(MyApp.Pandas, :read_csv, ["file.csv"], [chunksize: 100], fn chunk ->
process(chunk)
end)
@spec stream_dynamic(String.t(), String.t(), args(), opts(), (term() -> any())) :: {:ok, :done} | {:error, term()}
Stream results from a Python generator using dynamic dispatch.
Creates a stream reference and iterates via stream_next until exhausted.
Performance Note
Dynamic streaming uses an RPC-per-item approach: each item from the Python
iterator triggers a separate stream_next gRPC call. This is correct and
safe but may be slow for large streams (thousands of items).
For high-throughput streaming workloads, consider:
- Generated streaming wrappers: Use
SnakeBridge.stream/5with compiled modules, which can leverage Snakepit's server-side streaming for better throughput. - Batched iteration: Have Python yield batches of items rather than individual items.
- Dedicated data transfer: For very large datasets, consider writing Python results to files/databases and loading from Elixir.
Dynamic streaming is ideal for convenience and moderate-sized iterables.
@spec stream_len(SnakeBridge.StreamRef.t(), opts()) :: {:ok, non_neg_integer()} | {:error, term()}
Gets the length of a Python iterable (if supported).
@spec stream_next(SnakeBridge.StreamRef.t(), opts()) :: {:ok, term()} | {:error, :stop_iteration} | {:error, error_reason()}
Gets the next item from a Python iterator or generator.
Each call makes a separate RPC to Python. For high-throughput streaming,
see the performance note on stream_dynamic/5.