Dspy.Streaming (DSPex v0.11.0)

Copy Markdown View Source

Submodule bindings for dspy.streaming.

Version

  • Requested: 3.1.3
  • Observed at generation: 3.1.3

Runtime Options

All functions accept a __runtime__ option for controlling execution behavior:

Dspy.Streaming.some_function(args, __runtime__: [timeout: 120_000])

Supported runtime options

  • :timeout - Call timeout in milliseconds (default: 120,000ms / 2 minutes)
  • :timeout_profile - Use a named profile (:default, :ml_inference, :batch_job, :streaming)
  • :stream_timeout - Timeout for streaming operations (default: 1,800,000ms / 30 minutes)
  • :session_id - Override the session ID for this call
  • :pool_name - Target a specific Snakepit pool (multi-pool setups)
  • :affinity - Override session affinity (:hint, :strict_queue, :strict_fail_fast)

Timeout Profiles

  • :default - 2 minute timeout for regular calls
  • :ml_inference - 10 minute timeout for ML/LLM workloads
  • :batch_job - Unlimited timeout for long-running jobs
  • :streaming - 2 minute timeout, 30 minute stream_timeout

Example with timeout override

# For a long-running ML inference call
Dspy.Streaming.predict(data, __runtime__: [timeout_profile: :ml_inference])

# Or explicit timeout
Dspy.Streaming.predict(data, __runtime__: [timeout: 600_000])

# Route to a pool and enforce strict affinity
Dspy.Streaming.predict(data, __runtime__: [pool_name: :strict_pool, affinity: :strict_queue])

See SnakeBridge.Defaults for global timeout configuration.

Summary

Functions

apply_sync_streaming(async_generator, opts \\ [])

@spec apply_sync_streaming(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Convert the async streaming generator to a sync generator.

Parameters

  • async_generator (term())

Returns

  • term()

streamify(program)

@spec streamify(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them

all at once. It also provides status messages to the user to indicate the progress of the program, and users can implement their own status message provider to customize the status messages and what module to generate status messages for.

Parameters

  • program - The DSPy program to wrap with streaming functionality.
  • status_message_provider - A custom status message generator to use instead of the default one. Users can implement their own status message generator to customize the status messages and what module to generate status messages for.
  • stream_listeners - A list of stream listeners to capture the streaming output of specific fields of sub predicts in the program. When provided, only the target fields in the target predict will be streamed to the user.
  • include_final_prediction_in_output_stream - Whether to include the final prediction in the output stream, only useful when stream_listeners is provided. If False, the final prediction will not be included in the output stream. When the program hit cache, or no listeners captured anything, the final prediction will still be included in the output stream even if this is False.
  • is_async_program - Whether the program is async. If False, the program will be wrapped with asyncify, otherwise the program will be called with acall.
  • async_streaming - Whether to return an async generator or a sync generator. If False, the streaming will be converted to a sync generator.

Returns

  • term()

streamify(program, opts)

@spec streamify(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, opts)

@spec streamify(term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, stream_listeners, opts)

@spec streamify(term(), term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, stream_listeners, include_final_prediction_in_output_stream, opts)

@spec streamify(term(), term(), term(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, stream_listeners, include_final_prediction_in_output_stream, is_async_program, opts)

@spec streamify(term(), term(), term(), boolean(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean(), boolean(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streamify(program, status_message_provider, stream_listeners, include_final_prediction_in_output_stream, is_async_program, async_streaming, opts)

@spec streamify(term(), term(), term(), boolean(), boolean(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}

streaming_response(streamer, opts \\ [])

@spec streaming_response(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}

Convert a DSPy program output stream to an OpenAI-compatible output stream that can be

used by a service as an API response to a streaming request.

Parameters

  • streamer - An async generator that yields values from a DSPy program output stream.

Returns

  • term()