# `Dspy.Streaming`
[🔗](https://github.com/nshkrdotcom/dspex/blob/v0.11.0/lib/snakebridge_generated/dspy/streaming/__init__.ex#L6)

Submodule bindings for `dspy.streaming`.

## Version

- Requested: 3.1.3
- Observed at generation: 3.1.3

## Runtime Options

All functions accept a `__runtime__` option for controlling execution behavior:

    Dspy.Streaming.some_function(args, __runtime__: [timeout: 120_000])

### Supported runtime options

- `:timeout` - Call timeout in milliseconds (default: 120,000ms / 2 minutes)
- `:timeout_profile` - Use a named profile (`:default`, `:ml_inference`, `:batch_job`, `:streaming`)
- `:stream_timeout` - Timeout for streaming operations (default: 1,800,000ms / 30 minutes)
- `:session_id` - Override the session ID for this call
- `:pool_name` - Target a specific Snakepit pool (multi-pool setups)
- `:affinity` - Override session affinity (`:hint`, `:strict_queue`, `:strict_fail_fast`)

### Timeout Profiles

- `:default` - 2 minute timeout for regular calls
- `:ml_inference` - 10 minute timeout for ML/LLM workloads
- `:batch_job` - Unlimited timeout for long-running jobs
- `:streaming` - 2 minute timeout, 30 minute stream_timeout

### Example with timeout override

    # For a long-running ML inference call
    Dspy.Streaming.predict(data, __runtime__: [timeout_profile: :ml_inference])

    # Or explicit timeout
    Dspy.Streaming.predict(data, __runtime__: [timeout: 600_000])

    # Route to a pool and enforce strict affinity
    Dspy.Streaming.predict(data, __runtime__: [pool_name: :strict_pool, affinity: :strict_queue])

See `SnakeBridge.Defaults` for global timeout configuration.

# `apply_sync_streaming`

```elixir
@spec apply_sync_streaming(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
```

Convert the async streaming generator to a sync generator.

## Parameters

- `async_generator` (term())

## Returns

- `term()`

# `streamify`

```elixir
@spec streamify(term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
```

Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them

all at once. It also provides status messages to the user to indicate the progress of the program, and users
can implement their own status message provider to customize the status messages and what module to generate
status messages for.

## Parameters

- `program` - The DSPy program to wrap with streaming functionality.
- `status_message_provider` - A custom status message generator to use instead of the default one. Users can implement their own status message generator to customize the status messages and what module to generate status messages for.
- `stream_listeners` - A list of stream listeners to capture the streaming output of specific fields of sub predicts in the program. When provided, only the target fields in the target predict will be streamed to the user.
- `include_final_prediction_in_output_stream` - Whether to include the final prediction in the output stream, only useful when `stream_listeners` is provided. If `False`, the final prediction will not be included in the output stream. When the program hit cache, or no listeners captured anything, the final prediction will still be included in the output stream even if this is `False`.
- `is_async_program` - Whether the program is async. If `False`, the program will be wrapped with `asyncify`, otherwise the program will be called with `acall`.
- `async_streaming` - Whether to return an async generator or a sync generator. If `False`, the streaming will be converted to a sync generator.

## Returns

- `term()`

# `streamify`

```elixir
@spec streamify(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term()) :: {:ok, term()} | {:error, Snakepit.Error.t()}
```

# `streamify`

```elixir
@spec streamify(term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
```

# `streamify`

```elixir
@spec streamify(term(), term(), term(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
```

# `streamify`

```elixir
@spec streamify(term(), term(), term(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
```

# `streamify`

```elixir
@spec streamify(term(), term(), term(), boolean(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
@spec streamify(term(), term(), term(), boolean(), boolean(), boolean()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
```

# `streamify`

```elixir
@spec streamify(term(), term(), term(), boolean(), boolean(), boolean(), keyword()) ::
  {:ok, term()} | {:error, Snakepit.Error.t()}
```

# `streaming_response`

```elixir
@spec streaming_response(
  term(),
  keyword()
) :: {:ok, term()} | {:error, Snakepit.Error.t()}
```

Convert a DSPy program output stream to an OpenAI-compatible output stream that can be

used by a service as an API response to a streaming request.

## Parameters

- `streamer` - An async generator that yields values from a DSPy program output stream.

## Returns

- `term()`

---

*Consult [api-reference.md](api-reference.md) for complete listing*
