Submodule bindings for dspy.streaming.
Version
- Requested: 3.1.3
- Observed at generation: 3.1.3
Runtime Options
All functions accept a __runtime__ option for controlling execution behavior:
Dspy.Streaming.some_function(args, __runtime__: [timeout: 120_000])Supported runtime options
:timeout- Call timeout in milliseconds (default: 120,000ms / 2 minutes):timeout_profile- Use a named profile (:default,:ml_inference,:batch_job,:streaming):stream_timeout- Timeout for streaming operations (default: 1,800,000ms / 30 minutes):session_id- Override the session ID for this call:pool_name- Target a specific Snakepit pool (multi-pool setups):affinity- Override session affinity (:hint,:strict_queue,:strict_fail_fast)
Timeout Profiles
:default- 2 minute timeout for regular calls:ml_inference- 10 minute timeout for ML/LLM workloads:batch_job- Unlimited timeout for long-running jobs:streaming- 2 minute timeout, 30 minute stream_timeout
Example with timeout override
# For a long-running ML inference call
Dspy.Streaming.predict(data, __runtime__: [timeout_profile: :ml_inference])
# Or explicit timeout
Dspy.Streaming.predict(data, __runtime__: [timeout: 600_000])
# Route to a pool and enforce strict affinity
Dspy.Streaming.predict(data, __runtime__: [pool_name: :strict_pool, affinity: :strict_queue])See SnakeBridge.Defaults for global timeout configuration.
Summary
Functions
Convert the async streaming generator to a sync generator.
Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them
Convert a DSPy program output stream to an OpenAI-compatible output stream that can be
Functions
@spec apply_sync_streaming( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Convert the async streaming generator to a sync generator.
Parameters
async_generator(term())
Returns
term()
@spec streamify(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them
all at once. It also provides status messages to the user to indicate the progress of the program, and users can implement their own status message provider to customize the status messages and what module to generate status messages for.
Parameters
program- The DSPy program to wrap with streaming functionality.status_message_provider- A custom status message generator to use instead of the default one. Users can implement their own status message generator to customize the status messages and what module to generate status messages for.stream_listeners- A list of stream listeners to capture the streaming output of specific fields of sub predicts in the program. When provided, only the target fields in the target predict will be streamed to the user.include_final_prediction_in_output_stream- Whether to include the final prediction in the output stream, only useful whenstream_listenersis provided. IfFalse, the final prediction will not be included in the output stream. When the program hit cache, or no listeners captured anything, the final prediction will still be included in the output stream even if this isFalse.is_async_program- Whether the program is async. IfFalse, the program will be wrapped withasyncify, otherwise the program will be called withacall.async_streaming- Whether to return an async generator or a sync generator. IfFalse, the streaming will be converted to a sync generator.
Returns
term()
@spec streamify( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), keyword()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streaming_response( term(), keyword() ) :: {:ok, term()} | {:error, Snakepit.Error.t()}
Convert a DSPy program output stream to an OpenAI-compatible output stream that can be
used by a service as an API response to a streaming request.
Parameters
streamer- An async generator that yields values from a DSPy program output stream.
Returns
term()