Nous.StreamNormalizer behaviour (nous v0.13.3)
View SourceBehaviour for stream chunk normalization.
Implementations transform provider-specific stream chunks into normalized Nous stream events:
{:text_delta, text}- Incremental text content{:thinking_delta, text}- Incremental reasoning/thinking content{:tool_call_delta, tool_calls}- Tool call information{:finish, reason}- Stream completion signal{:unknown, chunk}- Unrecognized chunk (filtered by default)
Default Implementation
Most OpenAI-compatible providers work with the default implementation:
Nous.StreamNormalizer.OpenAICustom Implementation
For providers with unique formats, implement this behaviour:
defmodule MyApp.CustomNormalizer do
@behaviour Nous.StreamNormalizer
@impl true
def normalize_chunk(chunk) do
# Your custom logic
[{:text_delta, chunk["custom_field"]}]
end
@impl true
def complete_response?(chunk), do: false
@impl true
def convert_complete_response(_chunk), do: []
endThen configure it on the model:
Nous.new("openai_compatible:custom-model",
base_url: "http://custom-server/v1",
stream_normalizer: MyApp.CustomNormalizer
)
Summary
Callbacks
Check if the chunk is a complete (non-streaming) response.
Convert a complete response into stream events.
Normalize a single chunk into a list of stream events.
Functions
Normalize a stream using the specified normalizer module.
Callbacks
Check if the chunk is a complete (non-streaming) response.
Some servers return full responses instead of deltas when streaming fails or is disabled.
@callback convert_complete_response(chunk :: map()) :: [Nous.Types.stream_event()]
Convert a complete response into stream events.
Called when complete_response?/1 returns true.
Should return events like:
[{:text_delta, content}, {:finish, "stop"}]
@callback normalize_chunk(chunk :: map()) :: [Nous.Types.stream_event()]
Normalize a single chunk into a list of stream events.
Returns a list because some chunks (like complete responses) may need to emit multiple events.