SnakeBridge.Runtime (SnakeBridge v0.15.0)

Copy Markdown View Source

Thin payload helper for SnakeBridge that delegates execution to Snakepit.

This module is compile-time agnostic and focuses on building payloads that match the Snakepit Prime runtime contract.

Summary

Functions

Calls any Python function dynamically without requiring generated bindings.

Clears the auto-session for the current process.

Returns the current session ID (explicit or auto-generated).

Retrieves a module-level attribute (constant, class, etc.).

Releases and clears the auto-session for the current process.

Stream results from a Python generator/iterator.

Stream results from a Python generator using dynamic dispatch.

Gets the length of a Python iterable (if supported).

Gets the next item from a Python iterator or generator.

Types

args()

@type args() :: list()

error_reason()

@type error_reason() :: Snakepit.Error.t() | Exception.t()

function_name()

@type function_name() :: atom() | String.t()

module_ref()

@type module_ref() :: module()

opts()

@type opts() :: keyword()

Functions

call(module, function, args \\ [], opts \\ [])

@spec call(module_ref() | String.t(), function_name() | String.t(), args(), opts()) ::
  {:ok, term()} | {:error, error_reason()}

Call a Python function.

Parameters

  • module - Either a generated SnakeBridge module atom OR a Python module path string
  • function - Function name (atom or string)
  • args - Positional arguments (list)
  • opts - Options including kwargs, :idempotent, :runtime (e.g., :pool_name, :affinity)

Examples

# With generated module
{:ok, result} = SnakeBridge.Runtime.call(Numpy, :mean, [[1,2,3]])

# With string module path (dynamic)
{:ok, result} = SnakeBridge.Runtime.call("numpy", "mean", [[1,2,3]])
{:ok, result} = SnakeBridge.Runtime.call("math", :sqrt, [16])

call_class(module, function, args \\ [], opts \\ [])

@spec call_class(module_ref(), function_name(), args(), opts()) ::
  {:ok, term()} | {:error, error_reason()}

call_dynamic(module_path, function, args \\ [], opts \\ [])

@spec call_dynamic(String.t(), function_name(), args(), opts()) ::
  {:ok, term()} | {:error, error_reason()}

Calls any Python function dynamically without requiring generated bindings.

This is the no-codegen escape hatch for calling functions that were not scanned during compilation.

call_helper(helper, args \\ [], opts \\ [])

@spec call_helper(String.t(), args(), opts() | map()) ::
  {:ok, term()} | {:error, term()}

call_method(ref, function, args \\ [], opts \\ [])

@spec call_method(SnakeBridge.Ref.t() | map(), function_name(), args(), opts()) ::
  {:ok, term()} | {:error, error_reason()}

clear_auto_session()

@spec clear_auto_session() :: :ok

Clears the auto-session for the current process.

Useful for testing or when you want to force a new session. Does NOT release the session on the Python side - use release_auto_session/0 for that.

current_session()

@spec current_session() :: String.t()

Returns the current session ID (explicit or auto-generated).

This is useful for debugging or when you need to know which session is active.

get_attr(ref, attr, opts \\ [])

@spec get_attr(SnakeBridge.Ref.t(), atom() | String.t(), opts()) ::
  {:ok, term()} | {:error, error_reason()}

get_module_attr(module, attr, opts \\ [])

@spec get_module_attr(module_ref() | String.t(), atom() | String.t(), opts()) ::
  {:ok, term()} | {:error, error_reason()}

Retrieves a module-level attribute (constant, class, etc.).

Parameters

  • module - Either a generated SnakeBridge module atom OR a Python module path string
  • attr - Attribute name (atom or string)
  • opts - Runtime options

Examples

# Get math.pi
{:ok, pi} = SnakeBridge.Runtime.get_module_attr("math", "pi")
{:ok, pi} = SnakeBridge.Runtime.get_module_attr("math", :pi)

release_auto_session()

@spec release_auto_session() :: :ok

Releases and clears the auto-session for the current process.

This releases all refs associated with the session on both Elixir and Python sides.

release_ref(ref, opts \\ [])

@spec release_ref(SnakeBridge.Ref.t(), opts()) :: :ok | {:error, error_reason()}

release_session(session_id, opts \\ [])

@spec release_session(String.t(), opts()) :: :ok | {:error, error_reason()}

set_attr(ref, attr, value, opts \\ [])

@spec set_attr(SnakeBridge.Ref.t(), atom() | String.t(), term(), opts()) ::
  {:ok, term()} | {:error, error_reason()}

stream(module, function, args \\ [], opts \\ [], callback)

@spec stream(
  module_ref() | String.t(),
  function_name() | String.t(),
  args(),
  opts(),
  (term() -> any())
) ::
  :ok | {:ok, :done} | {:error, error_reason()}

Stream results from a Python generator/iterator.

Parameters

  • module - Either a generated SnakeBridge module atom OR a Python module path string
  • function - Function name (atom or string)
  • args - Positional arguments (list)
  • opts - Options including kwargs
  • callback - Function called for each streamed item

Performance

When called with a generated module atom, this function can use Snakepit's native gRPC streaming for efficient data transfer.

When called with a string module path, this delegates to stream_dynamic/5 which uses RPC-per-item iteration. See stream_dynamic/5 docs for performance guidance on large streams.

Examples

# With string module path (dynamic, RPC-per-item)
SnakeBridge.Runtime.stream("pandas", "read_csv", ["file.csv"], [chunksize: 100], fn chunk ->
  process(chunk)
end)

# With generated module (native streaming when available)
SnakeBridge.Runtime.stream(MyApp.Pandas, :read_csv, ["file.csv"], [chunksize: 100], fn chunk ->
  process(chunk)
end)

stream_dynamic(module_path, function, args, opts, callback)

@spec stream_dynamic(String.t(), String.t(), args(), opts(), (term() -> any())) ::
  {:ok, :done} | {:error, term()}

Stream results from a Python generator using dynamic dispatch.

Creates a stream reference and iterates via stream_next until exhausted.

Performance Note

Dynamic streaming uses an RPC-per-item approach: each item from the Python iterator triggers a separate stream_next gRPC call. This is correct and safe but may be slow for large streams (thousands of items).

For high-throughput streaming workloads, consider:

  • Generated streaming wrappers: Use SnakeBridge.stream/5 with compiled modules, which can leverage Snakepit's server-side streaming for better throughput.
  • Batched iteration: Have Python yield batches of items rather than individual items.
  • Dedicated data transfer: For very large datasets, consider writing Python results to files/databases and loading from Elixir.

Dynamic streaming is ideal for convenience and moderate-sized iterables.

stream_len(stream_ref, opts \\ [])

@spec stream_len(SnakeBridge.StreamRef.t(), opts()) ::
  {:ok, non_neg_integer()} | {:error, term()}

Gets the length of a Python iterable (if supported).

stream_next(stream_ref, opts \\ [])

@spec stream_next(SnakeBridge.StreamRef.t(), opts()) ::
  {:ok, term()} | {:error, :stop_iteration} | {:error, error_reason()}

Gets the next item from a Python iterator or generator.

Each call makes a separate RPC to Python. For high-throughput streaming, see the performance note on stream_dynamic/5.