Nous.StreamNormalizer behaviour (nous v0.13.3)

View Source

Behaviour for stream chunk normalization.

Implementations transform provider-specific stream chunks into normalized Nous stream events:

  • {:text_delta, text} - Incremental text content
  • {:thinking_delta, text} - Incremental reasoning/thinking content
  • {:tool_call_delta, tool_calls} - Tool call information
  • {:finish, reason} - Stream completion signal
  • {:unknown, chunk} - Unrecognized chunk (filtered by default)

Default Implementation

Most OpenAI-compatible providers work with the default implementation:

Nous.StreamNormalizer.OpenAI

Custom Implementation

For providers with unique formats, implement this behaviour:

defmodule MyApp.CustomNormalizer do
  @behaviour Nous.StreamNormalizer

  @impl true
  def normalize_chunk(chunk) do
    # Your custom logic
    [{:text_delta, chunk["custom_field"]}]
  end

  @impl true
  def complete_response?(chunk), do: false

  @impl true
  def convert_complete_response(_chunk), do: []
end

Then configure it on the model:

Nous.new("openai_compatible:custom-model",
  base_url: "http://custom-server/v1",
  stream_normalizer: MyApp.CustomNormalizer
)

Summary

Callbacks

Check if the chunk is a complete (non-streaming) response.

Convert a complete response into stream events.

Normalize a single chunk into a list of stream events.

Functions

Normalize a stream using the specified normalizer module.

Callbacks

complete_response?(chunk)

@callback complete_response?(chunk :: map()) :: boolean()

Check if the chunk is a complete (non-streaming) response.

Some servers return full responses instead of deltas when streaming fails or is disabled.

convert_complete_response(chunk)

@callback convert_complete_response(chunk :: map()) :: [Nous.Types.stream_event()]

Convert a complete response into stream events.

Called when complete_response?/1 returns true. Should return events like: [{:text_delta, content}, {:finish, "stop"}]

normalize_chunk(chunk)

@callback normalize_chunk(chunk :: map()) :: [Nous.Types.stream_event()]

Normalize a single chunk into a list of stream events.

Returns a list because some chunks (like complete responses) may need to emit multiple events.

Functions

normalize(stream, normalizer_mod \\ __MODULE__.OpenAI)

Normalize a stream using the specified normalizer module.

Applies normalization and filters out {:unknown, _} events.