Dspy.Predict.RLM (DSPex v0.11.0)

Copy Markdown View Source

Experimental: This class may change or be removed in a future release without warning.

Recursive Language Model module.

Uses a sandboxed REPL to let the LLM programmatically explore large contexts
through code execution. The LLM writes Python code to examine data, call
sub-LLMs for semantic analysis, and build up answers iteratively.

The default interpreter is PythonInterpreter (Deno/Pyodide/WASM), but you
can provide any CodeInterpreter implementation (e.g., MockInterpreter, or write a custom one using E2B or Modal).

Note: RLM instances are not thread-safe when using a custom interpreter.
Create separate RLM instances for concurrent use, or use the default
PythonInterpreter which creates a fresh instance per forward() call.

Examples

```python
# Basic usage
rlm = dspy.RLM("context, query -> output", max_iterations=10)
result = rlm(context="...very long text...", query="What is the magic number?")
print(result.output)
```

Summary

Functions

Async version: Use extract module when max iterations reached.

Python method RLM._base_init.

Build the action and extract signatures from templates.

Build REPLVariable list from input arguments with field metadata.

Execute one iteration. Returns Prediction if done, else updated REPLHistory.

Use extract module to get final output when max iterations reached.

Format and truncate REPL output.

Format user-provided tools for inclusion in instructions.

Get output field info for sandbox registration.

Inject execution tools and output fields into an interpreter.

Yield interpreter, creating PythonInterpreter if none provided at init.

Create llm_query and llm_query_batched tools with a fresh call counter.

Normalize tools list to a dict of Tool objects keyed by name.

Create fresh LLM tools and merge with user-provided tools.

Process interpreter result, returning Prediction if final, else updated history.

Validate and parse FinalOutput. Returns (parsed_outputs, None) or (None, error).

Python method RLM._set_lm_usage.

Raise ValueError if required input fields are missing.

Validate user-provided tools have valid names.

Python method RLM.acall.

Async version of forward(). Execute RLM to produce outputs.

Processes a list of dspy.Example instances in parallel using the Parallel module.

Deep copy the module.

Python method RLM.dump_state.

Execute RLM to produce outputs from the given inputs.

Python method RLM.get_lm.

Python method RLM.inspect_history.

Load the saved module. You may also want to check out dspy.load, if you want to

Python method RLM.load_state.

Applies a function to all named predictors.

Unlike PyTorch, handles (non-recursive) lists of parameters too.

Python method RLM.named_predictors.

Find all sub-modules in the module, as well as their names.

Python method RLM.parameters.

Python method RLM.predictors.

Deep copy the module and reset all parameters.

Python method RLM.set_lm.

Types

t()

@opaque t()

Functions

_aexecute_iteration(ref, repl, variables, history, iteration, input_args, output_field_names, opts \\ [])

@spec _aexecute_iteration(
  SnakeBridge.Ref.t(),
  term(),
  [Dspy.Primitives.ReplTypes.REPLVariable.t()],
  Dspy.Primitives.ReplTypes.REPLHistory.t(),
  integer(),
  %{optional(String.t()) => term()},
  [String.t()],
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Async version: Execute one iteration.

Parameters

  • repl (term())
  • variables (list(Dspy.Primitives.ReplTypes.REPLVariable.t()))
  • history (Dspy.Primitives.ReplTypes.REPLHistory.t())
  • iteration (integer())
  • input_args (%{optional(String.t()) => term()})
  • output_field_names (list(String.t()))

Returns

  • term()

_aextract_fallback(ref, variables, history, output_field_names, opts \\ [])

Async version: Use extract module when max iterations reached.

Parameters

  • variables (list(Dspy.Primitives.ReplTypes.REPLVariable.t()))
  • history (Dspy.Primitives.ReplTypes.REPLHistory.t())
  • output_field_names (list(String.t()))

Returns

  • term()

_base_init(ref, opts \\ [])

@spec _base_init(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM._base_init.

Returns

  • term()

_build_signatures(ref, opts \\ [])

@spec _build_signatures(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Build the action and extract signatures from templates.

Returns

  • term()

_build_variables(ref, opts \\ [])

@spec _build_variables(
  SnakeBridge.Ref.t(),
  keyword()
) ::
  {:ok, [Dspy.Primitives.ReplTypes.REPLVariable.t()]}
  | {:error, Snakepit.Error.t()}

Build REPLVariable list from input arguments with field metadata.

Parameters

  • input_args (term())

Returns

  • list(Dspy.Primitives.ReplTypes.REPLVariable.t())

_execute_iteration(ref, repl, variables, history, iteration, input_args, output_field_names, opts \\ [])

@spec _execute_iteration(
  SnakeBridge.Ref.t(),
  term(),
  [Dspy.Primitives.ReplTypes.REPLVariable.t()],
  Dspy.Primitives.ReplTypes.REPLHistory.t(),
  integer(),
  %{optional(String.t()) => term()},
  [String.t()],
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Execute one iteration. Returns Prediction if done, else updated REPLHistory.

Parameters

  • repl (term())
  • variables (list(Dspy.Primitives.ReplTypes.REPLVariable.t()))
  • history (Dspy.Primitives.ReplTypes.REPLHistory.t())
  • iteration (integer())
  • input_args (%{optional(String.t()) => term()})
  • output_field_names (list(String.t()))

Returns

  • term()

_extract_fallback(ref, variables, history, output_field_names, opts \\ [])

Use extract module to get final output when max iterations reached.

Parameters

  • variables (list(Dspy.Primitives.ReplTypes.REPLVariable.t()))
  • history (Dspy.Primitives.ReplTypes.REPLHistory.t())
  • output_field_names (list(String.t()))

Returns

  • term()

_format_output(ref, output, opts \\ [])

@spec _format_output(SnakeBridge.Ref.t(), String.t(), keyword()) ::
  {:ok, String.t()} | {:error, Snakepit.Error.t()}

Format and truncate REPL output.

Parameters

  • output (String.t())

Returns

  • String.t()

_format_tool_docs(ref, tools, opts \\ [])

@spec _format_tool_docs(
  SnakeBridge.Ref.t(),
  %{optional(String.t()) => term()},
  keyword()
) ::
  {:ok, String.t()} | {:error, Snakepit.Error.t()}

Format user-provided tools for inclusion in instructions.

Parameters

  • tools (%{optional(String.t()) => term()})

Returns

  • String.t()

_get_output_fields_info(ref, opts \\ [])

@spec _get_output_fields_info(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, [%{optional(term()) => term()}]} | {:error, Snakepit.Error.t()}

Get output field info for sandbox registration.

Returns

  • list(%{optional(term()) => term()})

_inject_execution_context(ref, interpreter, execution_tools, opts \\ [])

@spec _inject_execution_context(
  SnakeBridge.Ref.t(),
  term(),
  %{optional(String.t()) => term()},
  keyword()
) :: {:ok, nil} | {:error, Snakepit.Error.t()}

Inject execution tools and output fields into an interpreter.

This ensures llm_query, llm_query_batched, and typed FINAL signatures are available, even for user-provided interpreters. Each forward() call gets fresh tools with a fresh call counter, so we must inject on every execution.

Parameters

  • interpreter (term())
  • execution_tools (%{optional(String.t()) => term()})

Returns

  • nil

_interpreter_context(ref, execution_tools, opts \\ [])

@spec _interpreter_context(
  SnakeBridge.Ref.t(),
  %{optional(String.t()) => term()},
  keyword()
) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Yield interpreter, creating PythonInterpreter if none provided at init.

Parameters

  • execution_tools (%{optional(String.t()) => term()})

Returns

  • term()

_make_llm_tools(ref, args, opts \\ [])

@spec _make_llm_tools(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, %{optional(String.t()) => term()}} | {:error, Snakepit.Error.t()}

Create llm_query and llm_query_batched tools with a fresh call counter.

Parameters

  • max_workers (integer() default: 8)

Returns

  • %{optional(String.t()) => term()}

_normalize_tools(ref, tools, opts \\ [])

@spec _normalize_tools(SnakeBridge.Ref.t(), term(), keyword()) ::
  {:ok, %{optional(String.t()) => term()}} | {:error, Snakepit.Error.t()}

Normalize tools list to a dict of Tool objects keyed by name.

Parameters

  • tools (term())

Returns

  • %{optional(String.t()) => term()}

_prepare_execution_tools(ref, opts \\ [])

@spec _prepare_execution_tools(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, %{optional(String.t()) => term()}} | {:error, Snakepit.Error.t()}

Create fresh LLM tools and merge with user-provided tools.

Returns

  • %{optional(String.t()) => term()}

_process_execution_result(ref, pred, result, history, output_field_names, opts \\ [])

@spec _process_execution_result(
  SnakeBridge.Ref.t(),
  term(),
  term(),
  Dspy.Primitives.ReplTypes.REPLHistory.t(),
  [String.t()],
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Process interpreter result, returning Prediction if final, else updated history.

This shared helper reduces duplication between sync and async execution paths.

Parameters

  • pred - The prediction containing reasoning and code attributes
  • result - Result from interpreter.execute() - FinalOutput, list, str, or error string
  • history - Current REPL history
  • output_field_names - List of expected output field names

Returns

  • term()

_process_final_output(ref, result, output_field_names, opts \\ [])

@spec _process_final_output(SnakeBridge.Ref.t(), term(), [String.t()], keyword()) ::
  {:ok, {term(), term()}} | {:error, Snakepit.Error.t()}

Validate and parse FinalOutput. Returns (parsed_outputs, None) or (None, error).

Parameters

  • result (term())
  • output_field_names (list(String.t()))

Returns

  • {term(), term()}

_set_lm_usage(ref, tokens, output, opts \\ [])

@spec _set_lm_usage(
  SnakeBridge.Ref.t(),
  %{optional(String.t()) => term()},
  term(),
  keyword()
) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM._set_lm_usage.

Parameters

  • tokens (%{optional(String.t()) => term()})
  • output (term())

Returns

  • term()

_validate_inputs(ref, input_args, opts \\ [])

@spec _validate_inputs(
  SnakeBridge.Ref.t(),
  %{optional(String.t()) => term()},
  keyword()
) ::
  {:ok, nil} | {:error, Snakepit.Error.t()}

Raise ValueError if required input fields are missing.

Parameters

  • input_args (%{optional(String.t()) => term()})

Returns

  • nil

_validate_tools(ref, tools, opts \\ [])

@spec _validate_tools(
  SnakeBridge.Ref.t(),
  %{optional(String.t()) => term()},
  keyword()
) ::
  {:ok, nil} | {:error, Snakepit.Error.t()}

Validate user-provided tools have valid names.

Parameters

  • tools (%{optional(String.t()) => term()})

Returns

  • nil

acall(ref, args, opts \\ [])

@spec acall(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.acall.

Parameters

  • args (term())
  • kwargs (term())

Returns

  • term()

aforward(ref, opts \\ [])

@spec aforward(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Async version of forward(). Execute RLM to produce outputs.

Raises

Parameters

  • input_args (term())

Returns

  • term()

batch(ref, examples, args, opts \\ [])

@spec batch(SnakeBridge.Ref.t(), [term()], [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Processes a list of dspy.Example instances in parallel using the Parallel module.

Parameters

  • examples - List of dspy.Example instances to process.
  • num_threads - Number of threads to use for parallel processing.
  • max_errors - Maximum number of errors allowed before stopping execution. If None, inherits from dspy.settings.max_errors.
  • return_failed_examples - Whether to return failed examples and exceptions.
  • provide_traceback - Whether to include traceback information in error logs.
  • disable_progress_bar - Whether to display the progress bar.
  • timeout - Seconds before a straggler task is resubmitted. Set to 0 to disable.
  • straggler_limit - Only check for stragglers when this many or fewer tasks remain.

Returns

  • term()

deepcopy(ref, opts \\ [])

@spec deepcopy(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Deep copy the module.

This is a tweak to the default python deepcopy that only deep copies self.parameters(), and for other attributes, we just do the shallow copy.

Returns

  • term()

dump_state(ref, args, opts \\ [])

@spec dump_state(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.dump_state.

Parameters

  • json_mode (term() default: True)

Returns

  • term()

forward(ref, opts \\ [])

@spec forward(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Execute RLM to produce outputs from the given inputs.

Raises

Parameters

  • input_args (term())

Returns

  • term()

get_lm(ref, opts \\ [])

@spec get_lm(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.get_lm.

Returns

  • term()

inspect_history(ref, args, opts \\ [])

@spec inspect_history(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.inspect_history.

Parameters

  • n (integer() default: 1)

Returns

  • term()

load(ref, path, args, opts \\ [])

@spec load(SnakeBridge.Ref.t(), term(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Load the saved module. You may also want to check out dspy.load, if you want to

load an entire program, not just the state for an existing program.

Parameters

  • path - Path to the saved state file, which should be a .json or a .pkl file (type: String.t())
  • allow_pickle - If True, allow loading .pkl files, which can run arbitrary code. This is dangerous and should only be used if you are sure about the source of the file and in a trusted environment. (type: boolean())

Returns

  • term()

load_state(ref, state, opts \\ [])

@spec load_state(SnakeBridge.Ref.t(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.load_state.

Parameters

  • state (term())

Returns

  • term()

map_named_predictors(ref, func, opts \\ [])

@spec map_named_predictors(SnakeBridge.Ref.t(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Applies a function to all named predictors.

Parameters

  • func (term())

Returns

  • term()

named_parameters(ref, opts \\ [])

@spec named_parameters(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Unlike PyTorch, handles (non-recursive) lists of parameters too.

Returns

  • term()

named_predictors(ref, opts \\ [])

@spec named_predictors(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.named_predictors.

Returns

  • term()

named_sub_modules(ref, args, opts \\ [])

@spec named_sub_modules(SnakeBridge.Ref.t(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Find all sub-modules in the module, as well as their names.

Say self.children[4]['key'].sub_module is a sub-module. Then the name will be children[4]['key'].sub_module. But if the sub-module is accessible at different paths, only one of the paths will be returned.

Parameters

  • type_ (term() default: None)
  • skip_compiled (term() default: False)

Returns

  • term()

new(signature, max_iterations, max_llm_calls, max_output_chars, verbose, tools, sub_lm, interpreter, opts \\ [])

@spec new(term(), term(), term(), term(), term(), term(), term(), term(), keyword()) ::
  {:ok, SnakeBridge.Ref.t()} | {:error, Snakepit.Error.t()}

Args:

signature: Defines inputs and outputs. String like "context, query -> answer"

          or a Signature class.
max_iterations: Maximum REPL interaction iterations.
max_llm_calls: Maximum sub-LLM calls (llm_query/llm_query_batched) per execution.
max_output_chars: Maximum characters to include from REPL output.
verbose: Whether to log detailed execution info.
tools: List of tool functions or dspy.Tool objects callable from interpreter code.
      Built-in tools: llm_query(prompt), llm_query_batched(prompts).
sub_lm: LM for llm_query/llm_query_batched. Defaults to dspy.settings.lm.
       Allows using a different (e.g., cheaper) model for sub-queries.
interpreter: CodeInterpreter implementation to use. Defaults to PythonInterpreter.

Parameters

  • signature (term())
  • max_iterations (term())
  • max_llm_calls (term())
  • max_output_chars (term())
  • verbose (term())
  • tools (term())
  • sub_lm (term())
  • interpreter (term())

parameters(ref, opts \\ [])

@spec parameters(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.parameters.

Returns

  • term()

predictors(ref, opts \\ [])

@spec predictors(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.predictors.

Returns

  • term()

reset_copy(ref, opts \\ [])

@spec reset_copy(
  SnakeBridge.Ref.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Deep copy the module and reset all parameters.

Returns

  • term()

save(ref, path, args, opts \\ [])

@spec save(SnakeBridge.Ref.t(), term(), [term()], keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Save the module.

Save the module to a directory or a file. There are two modes:

  • save_program=False: Save only the state of the module to a json or pickle file, based on the value of the file extension.
  • save_program=True: Save the whole module to a directory via cloudpickle, which contains both the state and architecture of the model.

If save_program=True and modules_to_serialize are provided, it will register those modules for serialization with cloudpickle's register_pickle_by_value. This causes cloudpickle to serialize the module by value rather than by reference, ensuring the module is fully preserved along with the saved program. This is useful when you have custom modules that need to be serialized alongside your program. If None, then no modules will be registered for serialization.

We also save the dependency versions, so that the loaded model can check if there is a version mismatch on critical dependencies or DSPy version.

Parameters

  • path - Path to the saved state file, which should be a .json or .pkl file when save_program=False, and a directory when save_program=True. (type: String.t())
  • save_program - If True, save the whole module to a directory via cloudpickle, otherwise only save the state. (type: boolean())
  • modules_to_serialize - A list of modules to serialize with cloudpickle's register_pickle_by_value. If None, then no modules will be registered for serialization. (type: list())

Returns

  • term()

set_lm(ref, lm, opts \\ [])

@spec set_lm(SnakeBridge.Ref.t(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Python method RLM.set_lm.

Parameters

  • lm (term())

Returns

  • term()

tools(ref)

@spec tools(SnakeBridge.Ref.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}