AgentObs.ReqLLM (agent_obs v0.1.4)

View Source

High-level helpers for instrumenting ReqLLM streaming operations with AgentObs.

This module provides automatic instrumentation wrappers around ReqLLM's streaming API, eliminating the need for manual telemetry instrumentation when using ReqLLM.

Installation

Add :req_llm as a dependency in your mix.exs:

def deps do
  [
    {:agent_obs, "~> 0.1"},
    {:req_llm, "~> 1.0.0-rc.7"}
  ]
end

Features

Usage

Non-Streaming Text Generation

{:ok, response} =
  AgentObs.ReqLLM.trace_generate_text(
    "anthropic:claude-3-5-sonnet",
    [%{role: "user", content: "Hello!"}]
  )

text = ReqLLM.Response.text(response)
usage = ReqLLM.Response.usage(response)

Non-Streaming Structured Data Generation

schema = [
  name: [type: :string, required: true],
  age: [type: :pos_integer, required: true]
]

{:ok, response} =
  AgentObs.ReqLLM.trace_generate_object(
    "anthropic:claude-3-5-sonnet",
    [%{role: "user", content: "Generate a person"}],
    schema
  )

object = ReqLLM.Response.object(response)
#=> %{name: "John Doe", age: 30}

Basic Streaming with Instrumentation

{:ok, stream_response} =
  AgentObs.ReqLLM.trace_stream_text(
    "anthropic:claude-3-5-sonnet",
    [%{role: "user", content: "Hello!"}]
  )

# Stream output in real-time
stream_response.stream
|> Stream.filter(&(&1.type == :content))
|> Stream.each(&IO.write(&1.text))
|> Stream.run()

# Get metadata (automatically instrumented)
tokens = ReqLLM.StreamResponse.usage(stream_response)

With Tool Calls

tools = [
  ReqLLM.Tool.new!(
    name: "calculator",
    description: "Perform calculations",
    parameter_schema: [expression: [type: :string, required: true]],
    callback: &calculator/1
  )
]

{:ok, stream_response} =
  AgentObs.ReqLLM.trace_stream_text(
    "anthropic:claude-3-5-sonnet",
    [%{role: "user", content: "What is 2 + 2?"}],
    tools: tools
  )

# Extract and execute tool calls with instrumentation
tool_calls = ReqLLM.StreamResponse.extract_tool_calls(stream_response)

Enum.each(tool_calls, fn tool_call ->
  tool = Enum.find(tools, & &1.name == tool_call.name)
  {:ok, result} = AgentObs.ReqLLM.trace_tool_execution(tool, tool_call)
end)

Complete Agent Loop Example

defmodule MyAgent do
  def chat(model, message, tools) do
    AgentObs.trace_agent("my_agent", %{input: message}, fn ->
      # First LLM call with instrumentation
      {:ok, stream_response} =
        AgentObs.ReqLLM.trace_stream_text(model,
          [%{role: "user", content: message}],
          tools: tools
        )

      # Collect text and tool calls
      text = ReqLLM.StreamResponse.text(stream_response)
      tool_calls = ReqLLM.StreamResponse.extract_tool_calls(stream_response)

      # Execute tools if any
      tool_results = Enum.map(tool_calls, fn tc ->
        tool = Enum.find(tools, & &1.name == tc.name)
        AgentObs.ReqLLM.trace_tool_execution(tool, tc)
      end)

      {:ok, text, %{
        tools_used: Enum.map(tool_calls, & &1.name),
        iterations: if(tool_calls == [], do: 1, else: 2)
      }}
    end)
  end
end

Comparison with Manual Instrumentation

Without AgentObs.ReqLLM (manual):

AgentObs.trace_llm(model, %{input_messages: messages}, fn ->
  case ReqLLM.stream_text(model, messages) do
    {:ok, stream_response} ->
      # Manually extract metadata
      chunks = Enum.to_list(stream_response.stream)
      text = chunks |> Enum.filter(&(&1.type == :content)) |> Enum.map_join("", & &1.text)
      tokens = ReqLLM.StreamResponse.usage(stream_response)
      tool_calls = ReqLLM.StreamResponse.extract_tool_calls(stream_response)

      {:ok, text, %{
        output_messages: [%{role: "assistant", content: text}],
        tokens: tokens,
        tool_calls: tool_calls
      }}
  end
end)

With AgentObs.ReqLLM (automatic):

{:ok, stream_response} =
  AgentObs.ReqLLM.trace_stream_text(model, messages)

# All metadata automatically captured in telemetry!

Important Notes

  • This module requires :req_llm to be available at runtime
  • Instrumentation happens automatically - spans are created for each LLM/tool call
  • Token usage is extracted after stream completion (non-blocking during streaming)
  • Compatible with all ReqLLM providers (Anthropic, OpenAI, Google, etc.)

See Also

Summary

Functions

Collects the complete stream text with automatic instrumentation metadata.

Collects the complete stream object with automatic instrumentation metadata.

Wraps ReqLLM.generate_text/3 with automatic AgentObs instrumentation.

Wraps ReqLLM.generate_text!/3 with automatic AgentObs instrumentation.

Wraps ReqLLM.stream_text/3 with automatic AgentObs instrumentation.

Wraps tool execution with automatic AgentObs instrumentation.

Functions

collect_stream(stream_response)

@spec collect_stream(struct()) :: map()

Collects the complete stream text with automatic instrumentation metadata.

This is a convenience function that consumes the entire stream and returns both the text and the full instrumentation metadata.

Parameters

  • stream_response - ReqLLM.StreamResponse struct

Returns

A map containing:

  • :text - Complete text content
  • :tokens - Token usage map
  • :tool_calls - List of tool calls (if any)
  • :finish_reason - Completion reason

Examples

{:ok, stream_response} = AgentObs.ReqLLM.trace_stream_text(model, messages)

%{text: text, tokens: tokens, tool_calls: tool_calls} =
  AgentObs.ReqLLM.collect_stream(stream_response)

collect_stream_object(stream_response)

@spec collect_stream_object(struct()) :: map()

Collects the complete stream object with automatic instrumentation metadata.

This is a convenience function for structured data streaming that consumes the entire stream and returns both the object and the full instrumentation metadata.

Parameters

Returns

A map containing:

  • :object - Complete structured object matching the schema
  • :tokens - Token usage map
  • :finish_reason - Completion reason

Examples

{:ok, stream_response} = AgentObs.ReqLLM.trace_stream_object(model, messages, schema)

%{object: object, tokens: tokens} =
  AgentObs.ReqLLM.collect_stream_object(stream_response)

trace_generate_object(model, messages, schema, opts \\ [])

@spec trace_generate_object(term(), list() | struct(), keyword() | map(), keyword()) ::
  {:ok, struct()} | {:error, term()}

Wraps ReqLLM.generate_object/4 with automatic AgentObs instrumentation.

Instruments structured data generation with schema validation and automatically extracts token usage, output object, and other metadata for observability.

Parameters

  • model - Model specification (string like "anthropic:claude-3-5-sonnet" or Model struct)
  • messages - List of message maps or Context
  • schema - Schema definition for structured output (keyword list or map)
  • opts - Options passed to ReqLLM.generate_object/4 (output, mode, etc.)

Returns

  • {:ok, response} - ReqLLM.Response with full metadata and structured object
  • {:error, reason} - Error from ReqLLM

Examples

# Basic usage
schema = [
  name: [type: :string, required: true],
  age: [type: :pos_integer, required: true]
]

{:ok, response} = AgentObs.ReqLLM.trace_generate_object(
  "anthropic:claude-3-5-sonnet",
  [%{role: "user", content: "Generate a person named Alice, age 30"}],
  schema
)

# Extract object from response
object = ReqLLM.Response.object(response)
#=> %{name: "Alice", age: 30}

Telemetry

This function emits standard AgentObs LLM events:

  • [:agent_obs, :llm, :start] - When generation begins
  • [:agent_obs, :llm, :stop] - When generation completes (with tokens and object)
  • [:agent_obs, :llm, :exception] - If an error occurs

trace_generate_object!(model, messages, schema, opts \\ [])

@spec trace_generate_object!(term(), list() | struct(), keyword() | map(), keyword()) ::
  map()

Wraps ReqLLM.generate_object!/4 with automatic AgentObs instrumentation.

Like trace_generate_object/4 but raises on error and returns only the object. This is a convenience function for simple use cases where you only need the object.

Parameters

  • model - Model specification (string like "anthropic:claude-3-5-sonnet" or Model struct)
  • messages - List of message maps or Context
  • schema - Schema definition for structured output (keyword list or map)
  • opts - Options passed to ReqLLM.generate_object!/4 (output, mode, etc.)

Returns

  • Structured object matching the schema
  • Raises on error

Examples

schema = [
  name: [type: :string, required: true],
  age: [type: :pos_integer, required: true]
]

object = AgentObs.ReqLLM.trace_generate_object!(
  "anthropic:claude-3-5-sonnet",
  [%{role: "user", content: "Generate a person"}],
  schema
)
#=> %{name: "John Doe", age: 30}

Telemetry

This function emits standard AgentObs LLM events:

  • [:agent_obs, :llm, :start] - When generation begins
  • [:agent_obs, :llm, :stop] - When generation completes (with tokens and object)
  • [:agent_obs, :llm, :exception] - If an error occurs

trace_generate_text(model, messages, opts \\ [])

@spec trace_generate_text(term(), list() | struct(), keyword()) ::
  {:ok, struct()} | {:error, term()}

Wraps ReqLLM.generate_text/3 with automatic AgentObs instrumentation.

Instruments the LLM text generation call and automatically extracts token usage, output messages, and other metadata for observability. This is the non-streaming version that returns a complete response.

Parameters

  • model - Model specification (string like "anthropic:claude-3-5-sonnet" or Model struct)
  • messages - List of message maps or Context
  • opts - Options passed to ReqLLM.generate_text/3 (tools, temperature, etc.)

Returns

  • {:ok, response} - ReqLLM.Response with full metadata
  • {:error, reason} - Error from ReqLLM

Examples

# Basic usage
{:ok, response} = AgentObs.ReqLLM.trace_generate_text(
  "anthropic:claude-3-5-sonnet",
  [%{role: "user", content: "Hello!"}]
)

# Extract text from response
text = ReqLLM.Response.text(response)

# Access usage metadata
usage = ReqLLM.Response.usage(response)

Telemetry

This function emits standard AgentObs LLM events:

  • [:agent_obs, :llm, :start] - When generation begins
  • [:agent_obs, :llm, :stop] - When generation completes (with tokens)
  • [:agent_obs, :llm, :exception] - If an error occurs

trace_generate_text!(model, messages, opts \\ [])

@spec trace_generate_text!(term(), list() | struct(), keyword()) :: String.t()

Wraps ReqLLM.generate_text!/3 with automatic AgentObs instrumentation.

Like trace_generate_text/3 but raises on error and returns only the text content. This is a convenience function for simple use cases where you only need the text.

Parameters

  • model - Model specification (string like "anthropic:claude-3-5-sonnet" or Model struct)
  • messages - List of message maps or Context
  • opts - Options passed to ReqLLM.generate_text!/3 (tools, temperature, etc.)

Returns

  • Text string from the LLM response
  • Raises on error

Examples

text = AgentObs.ReqLLM.trace_generate_text!(
  "anthropic:claude-3-5-sonnet",
  [%{role: "user", content: "Hello!"}]
)
#=> "Hello! How can I assist you today?"

Telemetry

This function emits standard AgentObs LLM events:

  • [:agent_obs, :llm, :start] - When generation begins
  • [:agent_obs, :llm, :stop] - When generation completes (with tokens)
  • [:agent_obs, :llm, :exception] - If an error occurs

trace_stream_object(model, messages, schema, opts \\ [])

@spec trace_stream_object(term(), list() | struct(), keyword() | map(), keyword()) ::
  {:ok, struct()} | {:error, term()}

Wraps ReqLLM.stream_object/4 with automatic AgentObs instrumentation.

Instruments structured data streaming with schema validation and automatically extracts token usage, output object, and other metadata for observability. Similar to trace_stream_text/3 but for structured data generation.

Parameters

  • model - Model specification (string like "anthropic:claude-3-5-sonnet" or Model struct)
  • messages - List of message maps or Context
  • schema - Schema definition for structured output (keyword list or map)
  • opts - Options passed to ReqLLM.stream_object/4 (output, mode, etc.)

Returns

  • {:ok, stream_response} - ReqLLM.StreamResponse with instrumentation
  • {:error, reason} - Error from ReqLLM

Examples

# Basic usage
schema = [
  name: [type: :string, required: true],
  age: [type: :pos_integer, required: true]
]

{:ok, response} = AgentObs.ReqLLM.trace_stream_object(
  "anthropic:claude-3-5-sonnet",
  [%{role: "user", content: "Generate a person"}],
  schema
)

# Stream the response
response.stream
|> Stream.each(&IO.inspect/1)
|> Stream.run()

# Collect the final object
result = AgentObs.ReqLLM.collect_stream_object(response)
#=> %{object: %{name: "John", age: 30}, tokens: %{...}, ...}

Telemetry

This function emits standard AgentObs LLM events:

  • [:agent_obs, :llm, :start] - When streaming begins
  • [:agent_obs, :llm, :stop] - When streaming completes (with tokens and object)
  • [:agent_obs, :llm, :exception] - If an error occurs

trace_stream_text(model, messages, opts \\ [])

@spec trace_stream_text(term(), list() | struct(), keyword()) ::
  {:ok, struct()} | {:error, term()}

Wraps ReqLLM.stream_text/3 with automatic AgentObs instrumentation.

Instruments the LLM streaming call and automatically extracts token usage, tool calls, and other metadata for observability.

Parameters

  • model - Model specification (string like "anthropic:claude-3-5-sonnet" or Model struct)
  • messages - List of message maps or Context
  • opts - Options passed to ReqLLM.stream_text/3 (tools, temperature, etc.)

Returns

  • {:ok, stream_response} - ReqLLM.StreamResponse with instrumentation
  • {:error, reason} - Error from ReqLLM

Examples

# Basic usage
{:ok, response} = AgentObs.ReqLLM.trace_stream_text(
  "anthropic:claude-3-5-sonnet",
  [%{role: "user", content: "Hello!"}]
)

# With tools
{:ok, response} = AgentObs.ReqLLM.trace_stream_text(
  "anthropic:claude-3-5-sonnet",
  messages,
  tools: [calculator_tool, search_tool]
)

# Stream the response
response.stream
|> Stream.filter(&(&1.type == :content))
|> Stream.each(&IO.write(&1.text))
|> Stream.run()

Telemetry

This function emits standard AgentObs LLM events:

  • [:agent_obs, :llm, :start] - When streaming begins
  • [:agent_obs, :llm, :stop] - When streaming completes (with tokens)
  • [:agent_obs, :llm, :exception] - If an error occurs

trace_tool_execution(tool, tool_call, opts \\ [])

@spec trace_tool_execution(struct(), map(), keyword()) ::
  {:ok, term()} | {:error, term()}

Wraps tool execution with automatic AgentObs instrumentation.

Instruments tool/function execution and captures results for observability.

Parameters

  • tool - ReqLLM.Tool struct
  • tool_call - Tool call map with :name and :arguments
  • opts - Additional options (currently unused)

Returns

  • {:ok, result} - Tool execution result
  • {:error, reason} - Tool execution error

Examples

tool = ReqLLM.Tool.new!(
  name: "calculator",
  description: "Perform calculations",
  parameter_schema: [expression: [type: :string, required: true]],
  callback: &calculator/1
)

tool_call = %{
  name: "calculator",
  arguments: %{"expression" => "2 + 2"}
}

{:ok, result} = AgentObs.ReqLLM.trace_tool_execution(tool, tool_call)
#=> {:ok, 4}

Telemetry

Emits standard AgentObs tool events:

  • [:agent_obs, :tool, :start] - When tool execution begins
  • [:agent_obs, :tool, :stop] - When execution completes
  • [:agent_obs, :tool, :exception] - If tool execution fails