Cookbook: RAG Pipeline

Copy Markdown View Source

Retrieval-Augmented Generation (RAG) fetches relevant context before generating a response. This recipe implements a RAG pattern using PhoenixAI.Pipeline.

Pattern Overview

User Query  [Search Step]  [Generate Step]  Response
                   error
              Pipeline Halts

Implementation

defmodule MyApp.RAGPipeline do
  use PhoenixAI.Pipeline

  alias PhoenixAI.Message

  step :search do
    fn query ->
      # Search your vector store or database
      case MyApp.VectorStore.search(query, limit: 5) do
        {:ok, []} ->
          {:error, {:no_results, query}}

        {:ok, documents} ->
          context =
            documents
            |> Enum.map_join("\n\n---\n\n", & &1.content)

          {:ok, %{query: query, context: context}}

        {:error, reason} ->
          {:error, {:search_failed, reason}}
      end
    end
  end

  step :generate do
    fn %{query: query, context: context} ->
      messages = [
        %Message{
          role: :system,
          content: """
          You are a helpful assistant. Answer the user's question using ONLY
          the provided context. If the context doesn't contain the answer,
          say so clearly.
          """
        },
        %Message{
          role: :user,
          content: """
          Context:
          #{context}

          Question: #{query}
          """
        }
      ]

      AI.chat(messages, provider: :openai, model: "gpt-4o")
    end
  end
end

Usage

case MyApp.RAGPipeline.run("What is the refund policy?") do
  {:ok, %PhoenixAI.Response{content: answer}} ->
    IO.puts(answer)

  {:error, {:no_results, query}} ->
    IO.puts("No documents found for: #{query}")

  {:error, {:search_failed, reason}} ->
    IO.inspect(reason, label: "Search error")
end

Ad-hoc Version

For simpler use cases, run without a module:

alias PhoenixAI.{Message, Pipeline}

steps = [
  fn query ->
    case MyApp.VectorStore.search(query, limit: 3) do
      {:ok, []} -> {:error, :no_results}
      {:ok, docs} -> {:ok, {query, Enum.map_join(docs, "\n", & &1.content)}}
      {:error, _} = err -> err
    end
  end,
  fn {query, context} ->
    AI.chat(
      [
        %Message{role: :system, content: "Answer using only this context: #{context}"},
        %Message{role: :user, content: query}
      ],
      provider: :openai
    )
  end
]

{:ok, response} = Pipeline.run(steps, "Tell me about the product")

Streaming RAG

Combine RAG with streaming for real-time output:

defmodule MyApp.StreamingRAG do
  alias PhoenixAI.Message

  def run(query, socket_pid) do
    with {:ok, docs} <- MyApp.VectorStore.search(query, limit: 5),
         context <- Enum.map_join(docs, "\n\n", & &1.content) do
      messages = [
        %Message{role: :system, content: "Use this context to answer: #{context}"},
        %Message{role: :user, content: query}
      ]

      AI.stream(messages,
        provider: :openai,
        to: socket_pid
      )
    else
      {:error, reason} -> {:error, reason}
    end
  end
end

Testing

defmodule MyApp.RAGPipelineTest do
  use ExUnit.Case, async: true
  use PhoenixAI.Test

  alias PhoenixAI.Response

  setup do
    # Stub the vector store
    Mox.stub(MyApp.VectorStoreMock, :search, fn _query, _opts ->
      {:ok, [%{content: "Refunds are accepted within 30 days."}]}
    end)

    :ok
  end

  test "returns AI answer when documents are found" do
    set_responses([
      {:ok, %Response{content: "You can get a refund within 30 days."}}
    ])

    assert {:ok, %Response{content: "You can get a refund within 30 days."}} =
             MyApp.RAGPipeline.run("refund policy")
  end

  test "halts when no documents found" do
    Mox.stub(MyApp.VectorStoreMock, :search, fn _query, _opts -> {:ok, []} end)

    assert {:error, {:no_results, "refund policy"}} =
             MyApp.RAGPipeline.run("refund policy")
  end
end