RAG Systems

0 of 13 lessons completed

Hybrid RAG: Combining Multiple Approaches

Hybrid RAG architectures combine multiple retrieval strategies, knowledge sources, and generation techniques to create more robust and capable systems. This lesson covers ensemble RAG, Agentic RAG, and advanced architectural patterns.

Why Hybrid Approaches?

No single retrieval method works best for all queries. Hybrid systems combine complementary approaches:

  • Lexical + Semantic - Cover both exact matches and conceptual similarity
  • Multiple Knowledge Sources - Internal docs + web + databases
  • Different Chunk Sizes - Fine-grained for facts, coarse for context
  • Static + Dynamic - Pre-indexed docs + real-time web search

Ensemble Retrieval

Combine results from multiple retrievers using score fusion:

Reciprocal Rank Fusion (RRF)

from dataclasses import dataclass
from collections import defaultdict

@dataclass
class RetrievalResult:
    doc_id: str
    text: str
    score: float
    source: str

class EnsembleRetriever:
    def __init__(self, retrievers: list, weights: list[float] = None):
        self.retrievers = retrievers
        self.weights = weights or [1.0] * len(retrievers)
    
    def reciprocal_rank_fusion(
        self,
        ranked_lists: list[list[RetrievalResult]],
        k: int = 60
    ) -> list[RetrievalResult]:
        """
        Combine ranked lists using RRF.
        RRF_score(d) = Σ weight_i / (k + rank_i(d))
        """
        doc_scores = defaultdict(float)
        doc_info = {}
        
        for weight, ranked_list in zip(self.weights, ranked_lists):
            for rank, result in enumerate(ranked_list, 1):
                doc_scores[result.doc_id] += weight / (k + rank)
                if result.doc_id not in doc_info:
                    doc_info[result.doc_id] = result
        
        # Sort by combined score
        sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
        
        return [
            RetrievalResult(
                doc_id=doc_id,
                text=doc_info[doc_id].text,
                score=score,
                source="ensemble"
            )
            for doc_id, score in sorted_docs
        ]
    
    def retrieve(self, query: str, top_k: int = 10) -> list[RetrievalResult]:
        # Gather results from all retrievers
        all_results = []
        for retriever in self.retrievers:
            results = retriever.search(query, k=top_k * 2)
            all_results.append(results)
        
        # Fuse with RRF
        fused = self.reciprocal_rank_fusion(all_results)
        return fused[:top_k]

Linear Score Fusion

def linear_fusion(
    results_list: list[list[dict]],
    weights: list[float],
    normalize: bool = True
) -> list[dict]:
    """
    Combine results using weighted linear combination.
    """
    # Normalize scores within each result set
    normalized_results = []
    for results in results_list:
        if not results:
            normalized_results.append([])
            continue
        
        scores = [r["score"] for r in results]
        min_s, max_s = min(scores), max(scores)
        range_s = max_s - min_s or 1
        
        if normalize:
            for r in results:
                r["norm_score"] = (r["score"] - min_s) / range_s
        else:
            for r in results:
                r["norm_score"] = r["score"]
        
        normalized_results.append(results)
    
    # Combine scores
    doc_scores = defaultdict(lambda: {"score": 0, "doc": None})
    
    for weight, results in zip(weights, normalized_results):
        for r in results:
            doc_scores[r["id"]]["score"] += weight * r["norm_score"]
            if doc_scores[r["id"]]["doc"] is None:
                doc_scores[r["id"]]["doc"] = r
    
    # Sort and return
    sorted_docs = sorted(doc_scores.values(), key=lambda x: x["score"], reverse=True)
    return [{"**d["doc"], "final_score": d["score"]} for d in sorted_docs]

Multi-Index RAG

Use different indices for different types of content:

class MultiIndexRetriever:
    """Query multiple specialized indices."""
    
    def __init__(self):
        self.indices = {
            "documents": DocumentIndex(),      # General docs
            "code": CodeIndex(),               # Code snippets
            "faq": FAQIndex(),                 # Q&A pairs
            "api_docs": APIDocIndex(),         # API documentation
        }
        
        self.router = QueryRouter()
    
    def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
        # Route query to relevant indices
        relevant_indices = self.router.route(query)
        
        all_results = []
        for index_name in relevant_indices:
            index = self.indices[index_name]
            results = index.search(query, k=top_k)
            
            # Tag results with source
            for r in results:
                r["source_index"] = index_name
            
            all_results.extend(results)
        
        # Deduplicate and rank
        return self._merge_and_rank(all_results, top_k)
    
    def _merge_and_rank(self, results: list[dict], top_k: int) -> list[dict]:
        # Deduplicate by content similarity
        seen_content = set()
        unique_results = []
        
        for r in results:
            content_hash = hash(r["text"][:100])
            if content_hash not in seen_content:
                seen_content.add(content_hash)
                unique_results.append(r)
        
        # Sort by score
        unique_results.sort(key=lambda x: x.get("score", 0), reverse=True)
        return unique_results[:top_k]


class QueryRouter:
    """Route queries to appropriate indices."""
    
    def __init__(self, llm=None):
        self.llm = llm
    
    def route(self, query: str) -> list[str]:
        # Simple keyword-based routing
        query_lower = query.lower()
        
        indices = ["documents"]  # Always search general docs
        
        if any(kw in query_lower for kw in ["code", "function", "class", "error", "bug"]):
            indices.append("code")
        
        if any(kw in query_lower for kw in ["how do i", "how to", "what is"]):
            indices.append("faq")
        
        if any(kw in query_lower for kw in ["api", "endpoint", "request", "response"]):
            indices.append("api_docs")
        
        return indices

Agentic RAG

Agentic RAG uses an LLM as an intelligent agent that decides when and how to retrieve, enabling adaptive multi-step retrieval.

Key Characteristics

  • Query Analysis - Agent decides if retrieval is needed
  • Source Selection - Chooses which knowledge sources to query
  • Iterative Retrieval - Can retrieve multiple times based on findings
  • Tool Integration - Combines retrieval with other tools (calculators, APIs)

Implementation with Function Calling

from openai import OpenAI
import json

class AgenticRAG:
    def __init__(self, retrievers: dict, llm_client: OpenAI):
        self.retrievers = retrievers
        self.client = llm_client
        
        self.tools = [
            {
                "type": "function",
                "function": {
                    "name": "search_documents",
                    "description": "Search internal documentation",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "description": "Search query"},
                            "top_k": {"type": "integer", "default": 5}
                        },
                        "required": ["query"]
                    }
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "search_web",
                    "description": "Search the web for recent information",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "description": "Search query"}
                        },
                        "required": ["query"]
                    }
                }
            },
            {
                "type": "function",
                "function": {
                    "name": "search_code",
                    "description": "Search code repositories",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "description": "Code search query"},
                            "language": {"type": "string", "description": "Programming language"}
                        },
                        "required": ["query"]
                    }
                }
            }
        ]
    
    def _execute_tool(self, name: str, arguments: dict) -> str:
        if name == "search_documents":
            results = self.retrievers["documents"].search(
                arguments["query"],
                k=arguments.get("top_k", 5)
            )
        elif name == "search_web":
            results = self.retrievers["web"].search(arguments["query"])
        elif name == "search_code":
            results = self.retrievers["code"].search(
                arguments["query"],
                language=arguments.get("language")
            )
        else:
            return f"Unknown tool: {name}"
        
        return json.dumps([{"text": r["text"], "source": r.get("source", "")} for r in results])
    
    def query(self, user_query: str, max_iterations: int = 3) -> str:
        messages = [
            {
                "role": "system",
                "content": """You are a helpful assistant with access to search tools.
Use the tools to find relevant information before answering.
You can search multiple times if needed to gather complete information.
Always cite your sources."""
            },
            {"role": "user", "content": user_query}
        ]
        
        for _ in range(max_iterations):
            response = self.client.chat.completions.create(
                model="gpt-4o",
                messages=messages,
                tools=self.tools,
                tool_choice="auto"
            )
            
            message = response.choices[0].message
            
            if message.tool_calls:
                # Execute tool calls
                messages.append(message)
                
                for tool_call in message.tool_calls:
                    result = self._execute_tool(
                        tool_call.function.name,
                        json.loads(tool_call.function.arguments)
                    )
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "content": result
                    })
            else:
                # No more tool calls, return response
                return message.content
        
        # Max iterations reached, generate final response
        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )
        return response.choices[0].message.content

Multi-Agent RAG

Use specialized agents for different tasks:

class MultiAgentRAG:
    """Multi-agent system for complex RAG tasks."""
    
    def __init__(self, llm):
        self.llm = llm
        
        # Specialized agents
        self.agents = {
            "planner": PlannerAgent(llm),
            "researcher": ResearcherAgent(llm),
            "synthesizer": SynthesizerAgent(llm),
            "fact_checker": FactCheckerAgent(llm)
        }
    
    def query(self, user_query: str) -> dict:
        # Step 1: Planner breaks down the query
        plan = self.agents["planner"].create_plan(user_query)
        
        # Step 2: Researcher gathers information for each sub-task
        research_results = []
        for task in plan["tasks"]:
            result = self.agents["researcher"].research(task)
            research_results.append(result)
        
        # Step 3: Synthesizer combines findings
        synthesis = self.agents["synthesizer"].synthesize(
            query=user_query,
            research=research_results
        )
        
        # Step 4: Fact checker validates claims
        verified = self.agents["fact_checker"].verify(
            response=synthesis,
            sources=research_results
        )
        
        return {
            "response": verified["response"],
            "confidence": verified["confidence"],
            "sources": verified["sources"],
            "unverified_claims": verified.get("unverified", [])
        }


class PlannerAgent:
    def create_plan(self, query: str) -> dict:
        prompt = f"""Break down this complex query into research tasks.
        
Query: {query}

Return as JSON with:
- tasks: list of specific research questions
- dependencies: which tasks depend on others
"""
        response = self.llm.invoke(prompt)
        return json.loads(response.content)


class ResearcherAgent:
    def __init__(self, llm, retrievers: dict):
        self.llm = llm
        self.retrievers = retrievers
    
    def research(self, task: str) -> dict:
        # Multi-source retrieval
        all_results = []
        for name, retriever in self.retrievers.items():
            results = retriever.search(task, k=5)
            for r in results:
                r["source_type"] = name
            all_results.extend(results)
        
        # Synthesize findings for this task
        context = "\n".join([r["text"] for r in all_results[:10]])
        
        prompt = f"""Based on the following information, answer: {task}

Information:
{context}

Provide a focused answer with source citations."""
        
        response = self.llm.invoke(prompt)
        
        return {
            "task": task,
            "answer": response.content,
            "sources": all_results[:10]
        }

Self-RAG and Corrective RAG

Self-RAG

The model reflects on whether retrieval is needed and evaluates retrieved document relevance:

class SelfRAG:
    def __init__(self, llm, retriever):
        self.llm = llm
        self.retriever = retriever
    
    def should_retrieve(self, query: str) -> bool:
        """Decide if retrieval is needed."""
        prompt = f"""Does this query require external information to answer accurately?
Query: {query}
Answer with YES or NO only."""
        
        response = self.llm.invoke(prompt)
        return "YES" in response.content.upper()
    
    def evaluate_relevance(self, query: str, document: str) -> float:
        """Score document relevance to query."""
        prompt = f"""Rate the relevance of this document to the query on a scale of 0-10.
Query: {query}
Document: {document}
Answer with a number only."""
        
        response = self.llm.invoke(prompt)
        try:
            return float(response.content.strip()) / 10
        except:
            return 0.5
    
    def query(self, user_query: str) -> str:
        # Check if retrieval is needed
        if not self.should_retrieve(user_query):
            return self.llm.invoke(user_query).content
        
        # Retrieve and filter by relevance
        results = self.retriever.search(user_query, k=10)
        
        relevant_docs = []
        for doc in results:
            relevance = self.evaluate_relevance(user_query, doc["text"])
            if relevance > 0.5:
                doc["relevance"] = relevance
                relevant_docs.append(doc)
        
        # Generate with relevant context
        context = "\n".join([d["text"] for d in relevant_docs[:5]])
        
        prompt = f"""Answer based on the provided context.

Context:
{context}

Question: {user_query}
"""
        return self.llm.invoke(prompt).content

Corrective RAG (CRAG)

class CorrectiveRAG:
    """CRAG: Corrective Retrieval Augmented Generation."""
    
    def __init__(self, llm, retriever, web_search):
        self.llm = llm
        self.retriever = retriever
        self.web_search = web_search
    
    def evaluate_retrieval(self, query: str, documents: list[dict]) -> str:
        """Evaluate retrieval quality: CORRECT, INCORRECT, or AMBIGUOUS."""
        
        doc_summaries = "\n".join([
            f"Doc {i+1}: {d['text'][:200]}..."
            for i, d in enumerate(documents[:5])
        ])
        
        prompt = f"""Evaluate if these documents can answer the query.

Query: {query}

Documents:
{doc_summaries}

Answer with one of:
- CORRECT: Documents contain relevant information
- INCORRECT: Documents are not relevant  
- AMBIGUOUS: Partially relevant, may need more info"""
        
        response = self.llm.invoke(prompt)
        return response.content.strip().upper()
    
    def query(self, user_query: str) -> str:
        # Initial retrieval
        results = self.retriever.search(user_query, k=10)
        
        # Evaluate retrieval quality
        evaluation = self.evaluate_retrieval(user_query, results)
        
        if "INCORRECT" in evaluation:
            # Fall back to web search
            results = self.web_search.search(user_query)
        elif "AMBIGUOUS" in evaluation:
            # Augment with web search
            web_results = self.web_search.search(user_query)
            results = results + web_results
        
        # Generate response
        context = "\n".join([r["text"] for r in results[:5]])
        
        prompt = f"""Answer based on context. If unsure, say so.

Context:
{context}

Question: {user_query}
"""
        return self.llm.invoke(prompt).content

RAPTOR: Hierarchical RAG

RAPTOR builds a tree of summarized content for multi-level retrieval:

class RAPTOR:
    """Recursive Abstractive Processing for Tree-Organized Retrieval."""
    
    def __init__(self, llm, embedding_model, cluster_size: int = 5):
        self.llm = llm
        self.embedding_model = embedding_model
        self.cluster_size = cluster_size
        self.tree = {}
    
    def build_tree(self, documents: list[dict]):
        """Build hierarchical summary tree."""
        
        # Level 0: Original chunks
        current_level = documents
        level = 0
        self.tree[level] = current_level
        
        while len(current_level) > 1:
            level += 1
            
            # Cluster documents
            clusters = self._cluster_documents(current_level)
            
            # Summarize each cluster
            summaries = []
            for cluster in clusters:
                summary = self._summarize_cluster(cluster)
                summaries.append({
                    "text": summary,
                    "children": cluster
                })
            
            self.tree[level] = summaries
            current_level = summaries
    
    def _cluster_documents(self, documents: list[dict]) -> list[list[dict]]:
        """Cluster documents by embedding similarity."""
        from sklearn.cluster import KMeans
        import numpy as np
        
        embeddings = self.embedding_model.encode([d["text"] for d in documents])
        
        n_clusters = max(1, len(documents) // self.cluster_size)
        kmeans = KMeans(n_clusters=n_clusters)
        labels = kmeans.fit_predict(embeddings)
        
        clusters = [[] for _ in range(n_clusters)]
        for doc, label in zip(documents, labels):
            clusters[label].append(doc)
        
        return [c for c in clusters if c]  # Remove empty clusters
    
    def _summarize_cluster(self, cluster: list[dict]) -> str:
        """Generate summary for a cluster of documents."""
        
        texts = "\n---\n".join([d["text"] for d in cluster])
        
        prompt = f"""Summarize the key information from these documents into a 
coherent paragraph that captures all important details:

{texts}

Summary:"""
        
        response = self.llm.invoke(prompt)
        return response.content
    
    def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
        """Retrieve from multiple levels of the tree."""
        
        all_results = []
        
        # Search each level
        for level, documents in self.tree.items():
            embeddings = self.embedding_model.encode([d["text"] for d in documents])
            query_emb = self.embedding_model.encode(query)
            
            # Find most similar
            from sklearn.metrics.pairwise import cosine_similarity
            similarities = cosine_similarity([query_emb], embeddings)[0]
            
            top_indices = similarities.argsort()[-3:][::-1]
            
            for idx in top_indices:
                all_results.append({
                    "text": documents[idx]["text"],
                    "level": level,
                    "score": similarities[idx]
                })
        
        # Sort by score and return top k
        all_results.sort(key=lambda x: x["score"], reverse=True)
        return all_results[:top_k]

Agentic RAG vs Vanilla RAG

AspectVanilla RAGAgentic RAG
Retrieval DecisionAlways retrieveDecide dynamically
Source SelectionFixed sourcesAgent chooses sources
IterationSingle retrievalMultiple retrieval rounds
ReasoningDirect answerMulti-step reasoning
ComplexitySimple pipelineComplex orchestration
LatencyLowerHigher (multiple LLM calls)
CostLowerHigher (more tokens)

Response Generation and Synthesis

The final stage of RAG is synthesizing retrieved context into a coherent response. This is often overlooked but critically affects quality.

Context Formatting Strategies

def format_context_for_generation(
    documents: list[dict],
    strategy: str = "numbered"
) -> str:
    """Format retrieved documents for LLM context."""
    
    if strategy == "numbered":
        # Number each source for easy citation
        return "\n\n".join([
            f"[{i+1}] {doc['text']}"
            for i, doc in enumerate(documents)
        ])
    
    elif strategy == "xml_tagged":
        # XML tags help models distinguish sources
        return "\n".join([
            f"<document id=\"{i+1}\" source=\"{doc.get('source', 'unknown')}\">\n{doc['text']}\n</document>"
            for i, doc in enumerate(documents)
        ])
    
    elif strategy == "relevance_ordered":
        # Most relevant first (handles "lost in middle" problem)
        sorted_docs = sorted(documents, key=lambda x: x.get('score', 0), reverse=True)
        return "\n\n---\n\n".join([doc['text'] for doc in sorted_docs])
    
    elif strategy == "interleaved":
        # Alternate high and medium relevance to spread attention
        high = documents[:len(documents)//2]
        low = documents[len(documents)//2:]
        interleaved = []
        for h, l in zip(high, low):
            interleaved.extend([h, l])
        return "\n\n".join([doc['text'] for doc in interleaved])

Generation Prompts for RAG

RAG_SYSTEM_PROMPTS = {
    "strict_grounding": """You are a helpful assistant that answers questions 
based ONLY on the provided context. If the answer is not in the context, 
say "I don't have enough information to answer that."

NEVER make up information. ALWAYS cite your sources using [1], [2], etc.""",

    "balanced": """You are a helpful assistant. Answer the question using 
the provided context. You may supplement with general knowledge, but clearly 
distinguish between information from the context (cite with [1], [2]) and 
your own knowledge (prefix with "Generally speaking...").""",

    "synthesis": """You are an expert analyst. Synthesize information from 
the provided sources to give a comprehensive answer. Identify agreements 
and conflicts between sources. Cite each claim with the source number.""",

    "qa_focused": """Answer the question concisely and directly. Use only 
the provided context. Format your response as:
- Direct answer (1-2 sentences)
- Supporting details
- Sources used: [list numbers]"""
}

def generate_response(
    query: str,
    context: str,
    llm,
    prompt_style: str = "strict_grounding"
) -> str:
    system_prompt = RAG_SYSTEM_PROMPTS[prompt_style]
    
    user_prompt = f"""Context:
{context}

Question: {query}

Answer:"""
    
    response = llm.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ]
    )
    
    return response.choices[0].message.content

Handling the "Lost in the Middle" Problem

Research shows LLMs struggle with information in the middle of long contexts. Strategies to mitigate:

def mitigate_lost_in_middle(documents: list[dict]) -> list[dict]:
    """Reorder documents to place most relevant at beginning and end."""
    
    if len(documents) <= 2:
        return documents
    
    # Sort by relevance
    sorted_docs = sorted(documents, key=lambda x: x.get('score', 0), reverse=True)
    
    # Interleave: best at start and end
    result = []
    left, right = 0, len(sorted_docs) - 1
    toggle = True
    
    while left <= right:
        if toggle:
            result.append(sorted_docs[left])
            left += 1
        else:
            result.insert(len(result)//2, sorted_docs[right])
            right -= 1
        toggle = not toggle
    
    return result

# Alternative: Use explicit position markers
def add_position_markers(documents: list[dict]) -> str:
    """Add markers to help model track position."""
    
    n = len(documents)
    formatted = []
    
    for i, doc in enumerate(documents):
        position = "BEGINNING" if i < n//3 else "MIDDLE" if i < 2*n//3 else "END"
        formatted.append(f"[DOCUMENT {i+1} - {position}]\n{doc['text']}")
    
    return "\n\n".join(formatted)

Response Validation and Post-Processing

class ResponseValidator:
    """Validate and post-process RAG responses."""
    
    def __init__(self, llm):
        self.llm = llm
    
    def validate_citations(
        self,
        response: str,
        num_sources: int
    ) -> dict:
        """Check that citations reference valid sources."""
        import re
        
        # Find all citations [1], [2], etc.
        citations = re.findall(r'\[(\d+)\]', response)
        cited_sources = set(int(c) for c in citations)
        
        # Check for invalid citations
        invalid = [c for c in cited_sources if c < 1 or c > num_sources]
        
        return {
            "valid": len(invalid) == 0,
            "cited_sources": list(cited_sources),
            "invalid_citations": invalid,
            "uncited_sources": [i for i in range(1, num_sources+1) if i not in cited_sources]
        }
    
    def check_hallucination(
        self,
        response: str,
        context: str
    ) -> dict:
        """Use LLM to check for potential hallucinations."""
        
        prompt = f"""Compare this response to the context and identify any claims 
that are NOT supported by the context.

Context:
{context}

Response:
{response}

List any unsupported claims (or say "None found"):"""
        
        result = self.llm.invoke(prompt).content
        
        return {
            "has_potential_hallucinations": "none found" not in result.lower(),
            "unsupported_claims": result
        }
    
    def add_disclaimers(
        self,
        response: str,
        validation: dict
    ) -> str:
        """Add appropriate disclaimers based on validation."""
        
        if validation.get("has_potential_hallucinations"):
            response += "\n\n*Note: Some information may require verification.*"
        
        if validation.get("uncited_sources"):
            response += f"\n\n*Additional relevant sources: {validation['uncited_sources']}*"
        
        return response

Streaming RAG Responses

async def stream_rag_response(
    query: str,
    documents: list[dict],
    llm_client
):
    """Stream RAG response for better UX."""
    
    context = format_context_for_generation(documents)
    
    # Stream response
    stream = await llm_client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": RAG_SYSTEM_PROMPTS["strict_grounding"]},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}
        ],
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_response += content
            yield content
    
    # Post-process after streaming complete
    validation = validate_response(full_response, documents)
    if validation["issues"]:
        yield f"\n\n---\n*{validation['disclaimer']}*"

Key Takeaways

  • Ensemble retrieval combines multiple retrievers using RRF or linear fusion
  • Multi-index RAG uses specialized indices for different content types
  • Agentic RAG enables dynamic, multi-step retrieval with tool use
  • Self-RAG adds self-reflection on retrieval necessity and relevance
  • CRAG takes corrective actions when initial retrieval fails
  • RAPTOR builds hierarchical summaries for multi-level retrieval
  • Response synthesis requires careful context formatting and validation
  • Mitigate "lost in middle" by placing key info at start and end
  • Validate responses for citations and potential hallucinations
  • Choose complexity wisely - vanilla RAG is often sufficient

In the next lesson, we'll explore Multimodal RAG for handling images, tables, and complex documents.