SnakeBridge bindings for dspy.
Version
- Requested: 3.1.3
- Observed at generation: 3.1.3
Runtime Options
All functions accept a __runtime__ option for controlling execution behavior:
Dspy.some_function(args, __runtime__: [timeout: 120_000])Supported runtime options
:timeout- Call timeout in milliseconds (default: 120,000ms / 2 minutes):timeout_profile- Use a named profile (:default,:ml_inference,:batch_job,:streaming):stream_timeout- Timeout for streaming operations (default: 1,800,000ms / 30 minutes):session_id- Override the session ID for this call:pool_name- Target a specific Snakepit pool (multi-pool setups):affinity- Override session affinity (:hint,:strict_queue,:strict_fail_fast)
Timeout Profiles
:default- 2 minute timeout for regular calls:ml_inference- 10 minute timeout for ML/LLM workloads:batch_job- Unlimited timeout for long-running jobs:streaming- 2 minute timeout, 30 minute stream_timeout
Example with timeout override
# For a long-running ML inference call
Dspy.predict(data, __runtime__: [timeout_profile: :ml_inference])
# Or explicit timeout
Dspy.predict(data, __runtime__: [timeout: 600_000])
# Route to a pool and enforce strict affinity
Dspy.predict(data, __runtime__: [pool_name: :strict_pool, affinity: :strict_queue])See SnakeBridge.Defaults for global timeout configuration.
Summary
Functions
Wraps a DSPy program so that it can be called asynchronously. This is useful for running a
Python binding for dspy.bootstrap_trace_data.
Python module attribute dspy.cache.
Python binding for dspy.configure.
Configure the cache for DSPy.
Python binding for dspy.configure_dspy_loggers.
Context manager for temporary configuration changes at the thread level.
Python binding for dspy.disable_litellm_logging.
Disables the DSPyLoggingStream used by event logging APIs throughout DSPy
Python module attribute dspy.DSPY_CACHE.
Python binding for dspy.enable_litellm_logging.
Enables the DSPyLoggingStream used by event logging APIs throughout DSPy
Python binding for dspy.ensure_signature.
Infer a prefix from an attribute name by converting it to a human-readable format.
Python binding for dspy.InputField.
The global history shared across all LMs.
Load saved DSPy model.
Load the settings from a file using cloudpickle.
Returns the most common completion for the target field (or the last field) in the signature.
Create a new Signature subclass with the specified fields and instructions.
Python binding for dspy.OutputField.
Python module attribute dspy.settings.
Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them
Convert an async DSPy module to a sync program.
Context manager for tracking LM usage.
Functions
@spec asyncify( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Wraps a DSPy program so that it can be called asynchronously. This is useful for running a
program in parallel with another task (e.g., another DSPy program).
This implementation propagates the current thread's configuration context to the worker thread.
Parameters
program- The DSPy program to be wrapped for asynchronous execution.
Returns
term()
@spec bootstrap_trace_data(term(), [term()]) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
Python binding for dspy.bootstrap_trace_data.
Parameters
program(term())dataset(list(term()))metric(term() | nil default: None)num_threads(term() default: None)raise_on_error(term() default: True)capture_failed_parses(term() default: False)failure_score(float() default: 0)format_failure_score(float() default: -1)log_format_failures(boolean() default: False)callback_metadata(term() default: None)
Returns
list(term())
@spec bootstrap_trace_data(term(), [term()], keyword()) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(term(), [term()], term() | nil) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data( term(), [term()], term() | nil, term(), term(), term(), float(), keyword() ) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data( term(), [term()], term() | nil, term(), term(), term(), float(), float() ) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data( term(), [term()], term() | nil, term(), term(), term(), float(), float(), keyword() ) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data( term(), [term()], term() | nil, term(), term(), term(), float(), float(), boolean() ) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data( term(), [term()], term() | nil, term(), term(), term(), float(), float(), boolean(), keyword() ) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data( term(), [term()], term() | nil, term(), term(), term(), float(), float(), boolean(), term() ) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec cache() :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python module attribute dspy.cache.
Returns
term()
@spec configure(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for dspy.configure.
Parameters
kwargs(term())
Returns
term()
@spec configure_cache() :: {:ok, term()} | {:error, Snakepit.Error.t()}
Configure the cache for DSPy.
Parameters
enable_disk_cache- Whether to enable on-disk cache.enable_memory_cache- Whether to enable in-memory cache.disk_cache_dir- The directory to store the on-disk cache.disk_size_limit_bytes- The size limit of the on-disk cache.memory_max_entries- The maximum number of entries in the in-memory cache. To allow the cache to grow without bounds, set this parameter tomath.infor a similar value.
Returns
term()
@spec configure_cache(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term(), term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_dspy_loggers( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for dspy.configure_dspy_loggers.
Parameters
root_module_name(term())
Returns
term()
@spec context(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Context manager for temporary configuration changes at the thread level.
Does not affect global configuration. Changes only apply to the current thread. If threads are spawned inside this block using ParallelExecutor, they will inherit these overrides.
Parameters
kwargs(term())
Returns
term()
@spec disable_litellm_logging(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for dspy.disable_litellm_logging.
Returns
term()
@spec disable_logging(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Disables the DSPyLoggingStream used by event logging APIs throughout DSPy
(eprint(), logger.info(), etc), silencing all subsequent event logs.
Returns
term()
@spec dspy_cache() :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python module attribute dspy.DSPY_CACHE.
Returns
term()
@spec enable_litellm_logging(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for dspy.enable_litellm_logging.
Returns
term()
@spec enable_logging(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Enables the DSPyLoggingStream used by event logging APIs throughout DSPy
(eprint(), logger.info(), etc), emitting all subsequent event logs. This
reverses the effects of disable_logging().
Returns
term()
@spec ensure_signature(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for dspy.ensure_signature.
Parameters
signature(term())instructions(term() default: None)
Returns
term()
@spec ensure_signature( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec ensure_signature(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec ensure_signature(term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec infer_prefix( String.t(), keyword() ) :: {:ok, String.t()} | {:error, Snakepit.Error.t()}
Infer a prefix from an attribute name by converting it to a human-readable format.
Examples
"camelCaseText" -> "Camel Case Text" "snake_case_text" -> "Snake Case Text" "text2number" -> "Text 2 Number" "HTMLParser" -> "HTML Parser"
Parameters
attribute_name(String.t())
Returns
String.t()
@spec input_field(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for dspy.InputField.
Parameters
kwargs(term())
Returns
term()
@spec inspect_history() :: {:ok, term()} | {:error, Snakepit.Error.t()}
The global history shared across all LMs.
Parameters
n(integer() default: 1)
Returns
term()
@spec inspect_history(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec inspect_history(integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec inspect_history( integer(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec load(String.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Load saved DSPy model.
This method is used to load a saved DSPy model with save_program=True, i.e., the model is saved with cloudpickle.
Parameters
path- Path to the saved model. (type:String.t())allow_pickle- Whether to allow loading the model with pickle. This is dangerous and should only be used if you are sure you trust the source of the model. (type:boolean())
Returns
term()
@spec load( String.t(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec load(String.t(), boolean()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec load(String.t(), boolean(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec load_settings( String.t(), keyword() ) :: {:ok, %{optional(String.t()) => term()}} | {:error, Snakepit.Error.t()}
Load the settings from a file using cloudpickle.
Parameters
path- The file path to load the settings from.
Returns
%{optional(String.t()) => term()}
@spec majority(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Returns the most common completion for the target field (or the last field) in the signature.
When normalize returns None, that completion is ignored. In case of a tie, earlier completion are prioritized.
Parameters
prediction_or_completions(term())normalize(term() default: <function default_normalize at 0x7ade2240dee0>)field(term() default: None)
Returns
term()
@spec majority( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec majority(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec majority(term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec majority(term(), term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec make_signature(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Create a new Signature subclass with the specified fields and instructions.
Parameters
signature- Either a string in the format "input1, input2 -> output1, output2" or a dictionary mapping field names to tuples of (type, FieldInfo).instructions- Optional string containing instructions/prompt for the signature. If not provided, defaults to a basic description of inputs and outputs.signature_name- Optional string to name the generated Signature subclass. Defaults to "StringSignature".custom_types- Optional dictionary mapping type names to their actual type objects. Useful for resolving custom types that aren't built-ins or in the typing module.
Returns
term()
@spec make_signature( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec make_signature(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec make_signature(term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec make_signature(term(), term(), String.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec output_field(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python binding for dspy.OutputField.
Parameters
kwargs(term())
Returns
term()
@spec settings() :: {:ok, term()} | {:error, Snakepit.Error.t()}
Python module attribute dspy.settings.
Returns
term()
@spec streamify(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them
all at once. It also provides status messages to the user to indicate the progress of the program, and users can implement their own status message provider to customize the status messages and what module to generate status messages for.
Parameters
program- The DSPy program to wrap with streaming functionality.status_message_provider- A custom status message generator to use instead of the default one. Users can implement their own status message generator to customize the status messages and what module to generate status messages for.stream_listeners- A list of stream listeners to capture the streaming output of specific fields of sub predicts in the program. When provided, only the target fields in the target predict will be streamed to the user.include_final_prediction_in_output_stream- Whether to include the final prediction in the output stream, only useful whenstream_listenersis provided. IfFalse, the final prediction will not be included in the output stream. When the program hit cache, or no listeners captured anything, the final prediction will still be included in the output stream even if this isFalse.is_async_program- Whether the program is async. IfFalse, the program will be wrapped withasyncify, otherwise the program will be called withacall.async_streaming- Whether to return an async generator or a sync generator. IfFalse, the streaming will be converted to a sync generator.
Returns
term()
@spec streamify( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec syncify(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Convert an async DSPy module to a sync program.
There are two modes of this function:
in_place=True(recommended): Modify the module in place. But this may not work if you already have aforwardmethod which does different things fromaforward.in_place=False: Return a wrapper module. This changes the module's architecture, but it's more robust.
Parameters
program- The async program to convert, must have anaforwardmethod implemented.in_place- If True, modify the module in place. Otherwise, return a wrapper module.
Returns
term()
@spec syncify( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec syncify(term(), boolean()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec syncify(term(), boolean(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec track_usage(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Context manager for tracking LM usage.
Returns
term()