Dspy (DSPex v0.11.0)

Copy Markdown View Source

SnakeBridge bindings for dspy.

Version

  • Requested: 3.1.3
  • Observed at generation: 3.1.3

Runtime Options

All functions accept a __runtime__ option for controlling execution behavior:

Dspy.some_function(args, __runtime__: [timeout: 120_000])

Supported runtime options

  • :timeout - Call timeout in milliseconds (default: 120,000ms / 2 minutes)
  • :timeout_profile - Use a named profile (:default, :ml_inference, :batch_job, :streaming)
  • :stream_timeout - Timeout for streaming operations (default: 1,800,000ms / 30 minutes)
  • :session_id - Override the session ID for this call
  • :pool_name - Target a specific Snakepit pool (multi-pool setups)
  • :affinity - Override session affinity (:hint, :strict_queue, :strict_fail_fast)

Timeout Profiles

  • :default - 2 minute timeout for regular calls
  • :ml_inference - 10 minute timeout for ML/LLM workloads
  • :batch_job - Unlimited timeout for long-running jobs
  • :streaming - 2 minute timeout, 30 minute stream_timeout

Example with timeout override

# For a long-running ML inference call
Dspy.predict(data, __runtime__: [timeout_profile: :ml_inference])

# Or explicit timeout
Dspy.predict(data, __runtime__: [timeout: 600_000])

# Route to a pool and enforce strict affinity
Dspy.predict(data, __runtime__: [pool_name: :strict_pool, affinity: :strict_queue])

See SnakeBridge.Defaults for global timeout configuration.

Summary

Functions

Wraps a DSPy program so that it can be called asynchronously. This is useful for running a

Python binding for dspy.bootstrap_trace_data.

Python module attribute dspy.cache.

Python binding for dspy.configure.

Configure the cache for DSPy.

Python binding for dspy.configure_dspy_loggers.

Context manager for temporary configuration changes at the thread level.

Python binding for dspy.disable_litellm_logging.

Disables the DSPyLoggingStream used by event logging APIs throughout DSPy

Python module attribute dspy.DSPY_CACHE.

Python binding for dspy.enable_litellm_logging.

Enables the DSPyLoggingStream used by event logging APIs throughout DSPy

Python binding for dspy.ensure_signature.

Infer a prefix from an attribute name by converting it to a human-readable format.

Python binding for dspy.InputField.

The global history shared across all LMs.

Load saved DSPy model.

Load the settings from a file using cloudpickle.

Returns the most common completion for the target field (or the last field) in the signature.

Create a new Signature subclass with the specified fields and instructions.

Python binding for dspy.OutputField.

Python module attribute dspy.settings.

Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them

Convert an async DSPy module to a sync program.

Context manager for tracking LM usage.

Functions

asyncify(program, opts \\ [])

@spec asyncify(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Wraps a DSPy program so that it can be called asynchronously. This is useful for running a

program in parallel with another task (e.g., another DSPy program).

This implementation propagates the current thread's configuration context to the worker thread.

Parameters

  • program - The DSPy program to be wrapped for asynchronous execution.

Returns

  • term()

bootstrap_trace_data(program, dataset)

@spec bootstrap_trace_data(term(), [term()]) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}

Python binding for dspy.bootstrap_trace_data.

Parameters

  • program (term())
  • dataset (list(term()))
  • metric (term() | nil default: None)

  • num_threads (term() default: None)
  • raise_on_error (term() default: True)
  • capture_failed_parses (term() default: False)
  • failure_score (float() default: 0)
  • format_failure_score (float() default: -1)
  • log_format_failures (boolean() default: False)
  • callback_metadata (term() default: None)

Returns

  • list(term())

bootstrap_trace_data(program, dataset, opts)

@spec bootstrap_trace_data(term(), [term()], keyword()) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(term(), [term()], term() | nil) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}

bootstrap_trace_data(program, dataset, metric, opts)

@spec bootstrap_trace_data(term(), [term()], term() | nil, keyword()) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(term(), [term()], term() | nil, term()) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}

bootstrap_trace_data(program, dataset, metric, num_threads, opts)

@spec bootstrap_trace_data(term(), [term()], term() | nil, term(), keyword()) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(term(), [term()], term() | nil, term(), term()) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}

bootstrap_trace_data(program, dataset, metric, num_threads, raise_on_error, opts)

@spec bootstrap_trace_data(term(), [term()], term() | nil, term(), term(), keyword()) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(term(), [term()], term() | nil, term(), term(), term()) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}

bootstrap_trace_data(program, dataset, metric, num_threads, raise_on_error, capture_failed_parses, opts)

@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  keyword()
) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  float()
) ::
  {:ok, [term()]} | {:error, Snakepit.Error.t()}

bootstrap_trace_data(program, dataset, metric, num_threads, raise_on_error, capture_failed_parses, failure_score, opts)

@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  float(),
  keyword()
) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  float(),
  float()
) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}

bootstrap_trace_data(program, dataset, metric, num_threads, raise_on_error, capture_failed_parses, failure_score, format_failure_score, opts)

@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  float(),
  float(),
  keyword()
) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  float(),
  float(),
  boolean()
) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}

bootstrap_trace_data(program, dataset, metric, num_threads, raise_on_error, capture_failed_parses, failure_score, format_failure_score, log_format_failures, opts)

@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  float(),
  float(),
  boolean(),
  keyword()
) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}
@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  float(),
  float(),
  boolean(),
  term()
) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}

bootstrap_trace_data(program, dataset, metric, num_threads, raise_on_error, capture_failed_parses, failure_score, format_failure_score, log_format_failures, callback_metadata, opts)

@spec bootstrap_trace_data(
  term(),
  [term()],
  term() | nil,
  term(),
  term(),
  term(),
  float(),
  float(),
  boolean(),
  term(),
  keyword()
) :: {:ok, [term()]} | {:error, Snakepit.Error.t()}

cache()

@spec cache() :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python module attribute dspy.cache.

Returns

  • term()

configure(opts \\ [])

@spec configure(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python binding for dspy.configure.

Parameters

  • kwargs (term())

Returns

  • term()

configure_cache()

@spec configure_cache() :: {:ok, term()} | {:error, Snakepit.Error.t()}

Configure the cache for DSPy.

Parameters

  • enable_disk_cache - Whether to enable on-disk cache.
  • enable_memory_cache - Whether to enable in-memory cache.
  • disk_cache_dir - The directory to store the on-disk cache.
  • disk_size_limit_bytes - The size limit of the on-disk cache.
  • memory_max_entries - The maximum number of entries in the in-memory cache. To allow the cache to grow without bounds, set this parameter to math.inf or a similar value.

Returns

  • term()

configure_cache(opts)

@spec configure_cache(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

configure_cache(enable_disk_cache, opts)

@spec configure_cache(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

configure_cache(enable_disk_cache, enable_memory_cache, opts)

@spec configure_cache(term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term(), term(), term()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

configure_cache(enable_disk_cache, enable_memory_cache, disk_cache_dir, opts)

@spec configure_cache(term(), term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term(), term(), term(), term()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

configure_cache(enable_disk_cache, enable_memory_cache, disk_cache_dir, disk_size_limit_bytes, opts)

@spec configure_cache(term(), term(), term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec configure_cache(term(), term(), term(), term(), integer()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

configure_cache(enable_disk_cache, enable_memory_cache, disk_cache_dir, disk_size_limit_bytes, memory_max_entries, opts)

@spec configure_cache(term(), term(), term(), term(), integer(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

configure_dspy_loggers(root_module_name, opts \\ [])

@spec configure_dspy_loggers(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python binding for dspy.configure_dspy_loggers.

Parameters

  • root_module_name (term())

Returns

  • term()

context(opts \\ [])

@spec context(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Context manager for temporary configuration changes at the thread level.

Does not affect global configuration. Changes only apply to the current thread. If threads are spawned inside this block using ParallelExecutor, they will inherit these overrides.

Parameters

  • kwargs (term())

Returns

  • term()

disable_litellm_logging(opts \\ [])

@spec disable_litellm_logging(keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Python binding for dspy.disable_litellm_logging.

Returns

  • term()

disable_logging(opts \\ [])

@spec disable_logging(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Disables the DSPyLoggingStream used by event logging APIs throughout DSPy

(eprint(), logger.info(), etc), silencing all subsequent event logs.

Returns

  • term()

dspy_cache()

@spec dspy_cache() :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python module attribute dspy.DSPY_CACHE.

Returns

  • term()

enable_litellm_logging(opts \\ [])

@spec enable_litellm_logging(keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

Python binding for dspy.enable_litellm_logging.

Returns

  • term()

enable_logging(opts \\ [])

@spec enable_logging(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Enables the DSPyLoggingStream used by event logging APIs throughout DSPy

(eprint(), logger.info(), etc), emitting all subsequent event logs. This reverses the effects of disable_logging().

Returns

  • term()

ensure_signature(signature)

@spec ensure_signature(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python binding for dspy.ensure_signature.

Parameters

  • signature (term())
  • instructions (term() default: None)

Returns

  • term()

ensure_signature(signature, opts)

@spec ensure_signature(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec ensure_signature(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

ensure_signature(signature, instructions, opts)

@spec ensure_signature(term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

infer_prefix(attribute_name, opts \\ [])

@spec infer_prefix(
  String.t(),
  keyword()
) :: {:ok, String.t()} | {:error, Snakepit.Error.t()}

Infer a prefix from an attribute name by converting it to a human-readable format.

Examples

"camelCaseText" -> "Camel Case Text" "snake_case_text" -> "Snake Case Text" "text2number" -> "Text 2 Number" "HTMLParser" -> "HTML Parser"

Parameters

  • attribute_name (String.t())

Returns

  • String.t()

input_field(opts \\ [])

@spec input_field(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python binding for dspy.InputField.

Parameters

  • kwargs (term())

Returns

  • term()

inspect_history()

@spec inspect_history() :: {:ok, term()} | {:error, Snakepit.Error.t()}

The global history shared across all LMs.

Parameters

  • n (integer() default: 1)

Returns

  • term()

inspect_history(opts)

@spec inspect_history(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec inspect_history(integer()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

inspect_history(n, opts)

@spec inspect_history(
  integer(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

load(path)

@spec load(String.t()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Load saved DSPy model.

This method is used to load a saved DSPy model with save_program=True, i.e., the model is saved with cloudpickle.

Parameters

  • path - Path to the saved model. (type: String.t())
  • allow_pickle - Whether to allow loading the model with pickle. This is dangerous and should only be used if you are sure you trust the source of the model. (type: boolean())

Returns

  • term()

load(path, opts)

@spec load(
  String.t(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec load(String.t(), boolean()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

load(path, allow_pickle, opts)

@spec load(String.t(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

load_settings(path, opts \\ [])

@spec load_settings(
  String.t(),
  keyword()
) :: {:ok, %{optional(String.t()) => term()}} | {:error, Snakepit.Error.t()}

Load the settings from a file using cloudpickle.

Parameters

  • path - The file path to load the settings from.

Returns

  • %{optional(String.t()) => term()}

majority(prediction_or_completions)

@spec majority(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Returns the most common completion for the target field (or the last field) in the signature.

When normalize returns None, that completion is ignored. In case of a tie, earlier completion are prioritized.

Parameters

  • prediction_or_completions (term())
  • normalize (term() default: <function default_normalize at 0x7ade2240dee0>)
  • field (term() default: None)

Returns

  • term()

majority(prediction_or_completions, opts)

@spec majority(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec majority(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

majority(prediction_or_completions, normalize, opts)

@spec majority(term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec majority(term(), term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

majority(prediction_or_completions, normalize, field, opts)

@spec majority(term(), term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

make_signature(signature)

@spec make_signature(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Create a new Signature subclass with the specified fields and instructions.

Parameters

  • signature - Either a string in the format "input1, input2 -> output1, output2" or a dictionary mapping field names to tuples of (type, FieldInfo).
  • instructions - Optional string containing instructions/prompt for the signature. If not provided, defaults to a basic description of inputs and outputs.
  • signature_name - Optional string to name the generated Signature subclass. Defaults to "StringSignature".
  • custom_types - Optional dictionary mapping type names to their actual type objects. Useful for resolving custom types that aren't built-ins or in the typing module.

Returns

  • term()

make_signature(signature, opts)

@spec make_signature(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec make_signature(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

make_signature(signature, instructions, opts)

@spec make_signature(term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec make_signature(term(), term(), String.t()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

make_signature(signature, instructions, signature_name, opts)

@spec make_signature(term(), term(), String.t(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec make_signature(term(), term(), String.t(), term()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

make_signature(signature, instructions, signature_name, custom_types, opts)

@spec make_signature(term(), term(), String.t(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

output_field(opts \\ [])

@spec output_field(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python binding for dspy.OutputField.

Parameters

  • kwargs (term())

Returns

  • term()

settings()

@spec settings() :: {:ok, term()} | {:error, Snakepit.Error.t()}

Python module attribute dspy.settings.

Returns

  • term()

streamify(program)

@spec streamify(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them

all at once. It also provides status messages to the user to indicate the progress of the program, and users can implement their own status message provider to customize the status messages and what module to generate status messages for.

Parameters

  • program - The DSPy program to wrap with streaming functionality.
  • status_message_provider - A custom status message generator to use instead of the default one. Users can implement their own status message generator to customize the status messages and what module to generate status messages for.
  • stream_listeners - A list of stream listeners to capture the streaming output of specific fields of sub predicts in the program. When provided, only the target fields in the target predict will be streamed to the user.
  • include_final_prediction_in_output_stream - Whether to include the final prediction in the output stream, only useful when stream_listeners is provided. If False, the final prediction will not be included in the output stream. When the program hit cache, or no listeners captured anything, the final prediction will still be included in the output stream even if this is False.
  • is_async_program - Whether the program is async. If False, the program will be wrapped with asyncify, otherwise the program will be called with acall.
  • async_streaming - Whether to return an async generator or a sync generator. If False, the streaming will be converted to a sync generator.

Returns

  • term()

streamify(program, opts)

@spec streamify(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, opts)

@spec streamify(term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, stream_listeners, opts)

@spec streamify(term(), term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, stream_listeners, include_final_prediction_in_output_stream, opts)

@spec streamify(term(), term(), term(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, stream_listeners, include_final_prediction_in_output_stream, is_async_program, opts)

@spec streamify(term(), term(), term(), boolean(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean(), boolean(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, stream_listeners, include_final_prediction_in_output_stream, is_async_program, async_streaming, opts)

@spec streamify(term(), term(), term(), boolean(), boolean(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

syncify(program)

@spec syncify(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Convert an async DSPy module to a sync program.

There are two modes of this function:

  • in_place=True (recommended): Modify the module in place. But this may not work if you already have a forward method which does different things from aforward.
  • in_place=False: Return a wrapper module. This changes the module's architecture, but it's more robust.

Parameters

  • program - The async program to convert, must have an aforward method implemented.
  • in_place - If True, modify the module in place. Otherwise, return a wrapper module.

Returns

  • term()

syncify(program, opts)

@spec syncify(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec syncify(term(), boolean()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

syncify(program, in_place, opts)

@spec syncify(term(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

track_usage(opts \\ [])

@spec track_usage(keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Context manager for tracking LM usage.

Returns

  • term()