Hybrid RAG architectures combine multiple retrieval strategies, knowledge sources, and generation techniques to create more robust and capable systems. This lesson covers ensemble RAG, Agentic RAG, and advanced architectural patterns.
No single retrieval method works best for all queries. Hybrid systems combine complementary approaches:
Combine results from multiple retrievers using score fusion:
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class RetrievalResult:
doc_id: str
text: str
score: float
source: str
class EnsembleRetriever:
def __init__(self, retrievers: list, weights: list[float] = None):
self.retrievers = retrievers
self.weights = weights or [1.0] * len(retrievers)
def reciprocal_rank_fusion(
self,
ranked_lists: list[list[RetrievalResult]],
k: int = 60
) -> list[RetrievalResult]:
"""
Combine ranked lists using RRF.
RRF_score(d) = Σ weight_i / (k + rank_i(d))
"""
doc_scores = defaultdict(float)
doc_info = {}
for weight, ranked_list in zip(self.weights, ranked_lists):
for rank, result in enumerate(ranked_list, 1):
doc_scores[result.doc_id] += weight / (k + rank)
if result.doc_id not in doc_info:
doc_info[result.doc_id] = result
# Sort by combined score
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
return [
RetrievalResult(
doc_id=doc_id,
text=doc_info[doc_id].text,
score=score,
source="ensemble"
)
for doc_id, score in sorted_docs
]
def retrieve(self, query: str, top_k: int = 10) -> list[RetrievalResult]:
# Gather results from all retrievers
all_results = []
for retriever in self.retrievers:
results = retriever.search(query, k=top_k * 2)
all_results.append(results)
# Fuse with RRF
fused = self.reciprocal_rank_fusion(all_results)
return fused[:top_k]def linear_fusion(
results_list: list[list[dict]],
weights: list[float],
normalize: bool = True
) -> list[dict]:
"""
Combine results using weighted linear combination.
"""
# Normalize scores within each result set
normalized_results = []
for results in results_list:
if not results:
normalized_results.append([])
continue
scores = [r["score"] for r in results]
min_s, max_s = min(scores), max(scores)
range_s = max_s - min_s or 1
if normalize:
for r in results:
r["norm_score"] = (r["score"] - min_s) / range_s
else:
for r in results:
r["norm_score"] = r["score"]
normalized_results.append(results)
# Combine scores
doc_scores = defaultdict(lambda: {"score": 0, "doc": None})
for weight, results in zip(weights, normalized_results):
for r in results:
doc_scores[r["id"]]["score"] += weight * r["norm_score"]
if doc_scores[r["id"]]["doc"] is None:
doc_scores[r["id"]]["doc"] = r
# Sort and return
sorted_docs = sorted(doc_scores.values(), key=lambda x: x["score"], reverse=True)
return [{"**d["doc"], "final_score": d["score"]} for d in sorted_docs]Use different indices for different types of content:
class MultiIndexRetriever:
"""Query multiple specialized indices."""
def __init__(self):
self.indices = {
"documents": DocumentIndex(), # General docs
"code": CodeIndex(), # Code snippets
"faq": FAQIndex(), # Q&A pairs
"api_docs": APIDocIndex(), # API documentation
}
self.router = QueryRouter()
def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
# Route query to relevant indices
relevant_indices = self.router.route(query)
all_results = []
for index_name in relevant_indices:
index = self.indices[index_name]
results = index.search(query, k=top_k)
# Tag results with source
for r in results:
r["source_index"] = index_name
all_results.extend(results)
# Deduplicate and rank
return self._merge_and_rank(all_results, top_k)
def _merge_and_rank(self, results: list[dict], top_k: int) -> list[dict]:
# Deduplicate by content similarity
seen_content = set()
unique_results = []
for r in results:
content_hash = hash(r["text"][:100])
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_results.append(r)
# Sort by score
unique_results.sort(key=lambda x: x.get("score", 0), reverse=True)
return unique_results[:top_k]
class QueryRouter:
"""Route queries to appropriate indices."""
def __init__(self, llm=None):
self.llm = llm
def route(self, query: str) -> list[str]:
# Simple keyword-based routing
query_lower = query.lower()
indices = ["documents"] # Always search general docs
if any(kw in query_lower for kw in ["code", "function", "class", "error", "bug"]):
indices.append("code")
if any(kw in query_lower for kw in ["how do i", "how to", "what is"]):
indices.append("faq")
if any(kw in query_lower for kw in ["api", "endpoint", "request", "response"]):
indices.append("api_docs")
return indicesAgentic RAG uses an LLM as an intelligent agent that decides when and how to retrieve, enabling adaptive multi-step retrieval.
from openai import OpenAI
import json
class AgenticRAG:
def __init__(self, retrievers: dict, llm_client: OpenAI):
self.retrievers = retrievers
self.client = llm_client
self.tools = [
{
"type": "function",
"function": {
"name": "search_documents",
"description": "Search internal documentation",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"top_k": {"type": "integer", "default": 5}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "search_web",
"description": "Search the web for recent information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "search_code",
"description": "Search code repositories",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Code search query"},
"language": {"type": "string", "description": "Programming language"}
},
"required": ["query"]
}
}
}
]
def _execute_tool(self, name: str, arguments: dict) -> str:
if name == "search_documents":
results = self.retrievers["documents"].search(
arguments["query"],
k=arguments.get("top_k", 5)
)
elif name == "search_web":
results = self.retrievers["web"].search(arguments["query"])
elif name == "search_code":
results = self.retrievers["code"].search(
arguments["query"],
language=arguments.get("language")
)
else:
return f"Unknown tool: {name}"
return json.dumps([{"text": r["text"], "source": r.get("source", "")} for r in results])
def query(self, user_query: str, max_iterations: int = 3) -> str:
messages = [
{
"role": "system",
"content": """You are a helpful assistant with access to search tools.
Use the tools to find relevant information before answering.
You can search multiple times if needed to gather complete information.
Always cite your sources."""
},
{"role": "user", "content": user_query}
]
for _ in range(max_iterations):
response = self.client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=self.tools,
tool_choice="auto"
)
message = response.choices[0].message
if message.tool_calls:
# Execute tool calls
messages.append(message)
for tool_call in message.tool_calls:
result = self._execute_tool(
tool_call.function.name,
json.loads(tool_call.function.arguments)
)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
else:
# No more tool calls, return response
return message.content
# Max iterations reached, generate final response
response = self.client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return response.choices[0].message.contentUse specialized agents for different tasks:
class MultiAgentRAG:
"""Multi-agent system for complex RAG tasks."""
def __init__(self, llm):
self.llm = llm
# Specialized agents
self.agents = {
"planner": PlannerAgent(llm),
"researcher": ResearcherAgent(llm),
"synthesizer": SynthesizerAgent(llm),
"fact_checker": FactCheckerAgent(llm)
}
def query(self, user_query: str) -> dict:
# Step 1: Planner breaks down the query
plan = self.agents["planner"].create_plan(user_query)
# Step 2: Researcher gathers information for each sub-task
research_results = []
for task in plan["tasks"]:
result = self.agents["researcher"].research(task)
research_results.append(result)
# Step 3: Synthesizer combines findings
synthesis = self.agents["synthesizer"].synthesize(
query=user_query,
research=research_results
)
# Step 4: Fact checker validates claims
verified = self.agents["fact_checker"].verify(
response=synthesis,
sources=research_results
)
return {
"response": verified["response"],
"confidence": verified["confidence"],
"sources": verified["sources"],
"unverified_claims": verified.get("unverified", [])
}
class PlannerAgent:
def create_plan(self, query: str) -> dict:
prompt = f"""Break down this complex query into research tasks.
Query: {query}
Return as JSON with:
- tasks: list of specific research questions
- dependencies: which tasks depend on others
"""
response = self.llm.invoke(prompt)
return json.loads(response.content)
class ResearcherAgent:
def __init__(self, llm, retrievers: dict):
self.llm = llm
self.retrievers = retrievers
def research(self, task: str) -> dict:
# Multi-source retrieval
all_results = []
for name, retriever in self.retrievers.items():
results = retriever.search(task, k=5)
for r in results:
r["source_type"] = name
all_results.extend(results)
# Synthesize findings for this task
context = "\n".join([r["text"] for r in all_results[:10]])
prompt = f"""Based on the following information, answer: {task}
Information:
{context}
Provide a focused answer with source citations."""
response = self.llm.invoke(prompt)
return {
"task": task,
"answer": response.content,
"sources": all_results[:10]
}The model reflects on whether retrieval is needed and evaluates retrieved document relevance:
class SelfRAG:
def __init__(self, llm, retriever):
self.llm = llm
self.retriever = retriever
def should_retrieve(self, query: str) -> bool:
"""Decide if retrieval is needed."""
prompt = f"""Does this query require external information to answer accurately?
Query: {query}
Answer with YES or NO only."""
response = self.llm.invoke(prompt)
return "YES" in response.content.upper()
def evaluate_relevance(self, query: str, document: str) -> float:
"""Score document relevance to query."""
prompt = f"""Rate the relevance of this document to the query on a scale of 0-10.
Query: {query}
Document: {document}
Answer with a number only."""
response = self.llm.invoke(prompt)
try:
return float(response.content.strip()) / 10
except:
return 0.5
def query(self, user_query: str) -> str:
# Check if retrieval is needed
if not self.should_retrieve(user_query):
return self.llm.invoke(user_query).content
# Retrieve and filter by relevance
results = self.retriever.search(user_query, k=10)
relevant_docs = []
for doc in results:
relevance = self.evaluate_relevance(user_query, doc["text"])
if relevance > 0.5:
doc["relevance"] = relevance
relevant_docs.append(doc)
# Generate with relevant context
context = "\n".join([d["text"] for d in relevant_docs[:5]])
prompt = f"""Answer based on the provided context.
Context:
{context}
Question: {user_query}
"""
return self.llm.invoke(prompt).contentclass CorrectiveRAG:
"""CRAG: Corrective Retrieval Augmented Generation."""
def __init__(self, llm, retriever, web_search):
self.llm = llm
self.retriever = retriever
self.web_search = web_search
def evaluate_retrieval(self, query: str, documents: list[dict]) -> str:
"""Evaluate retrieval quality: CORRECT, INCORRECT, or AMBIGUOUS."""
doc_summaries = "\n".join([
f"Doc {i+1}: {d['text'][:200]}..."
for i, d in enumerate(documents[:5])
])
prompt = f"""Evaluate if these documents can answer the query.
Query: {query}
Documents:
{doc_summaries}
Answer with one of:
- CORRECT: Documents contain relevant information
- INCORRECT: Documents are not relevant
- AMBIGUOUS: Partially relevant, may need more info"""
response = self.llm.invoke(prompt)
return response.content.strip().upper()
def query(self, user_query: str) -> str:
# Initial retrieval
results = self.retriever.search(user_query, k=10)
# Evaluate retrieval quality
evaluation = self.evaluate_retrieval(user_query, results)
if "INCORRECT" in evaluation:
# Fall back to web search
results = self.web_search.search(user_query)
elif "AMBIGUOUS" in evaluation:
# Augment with web search
web_results = self.web_search.search(user_query)
results = results + web_results
# Generate response
context = "\n".join([r["text"] for r in results[:5]])
prompt = f"""Answer based on context. If unsure, say so.
Context:
{context}
Question: {user_query}
"""
return self.llm.invoke(prompt).contentRAPTOR builds a tree of summarized content for multi-level retrieval:
class RAPTOR:
"""Recursive Abstractive Processing for Tree-Organized Retrieval."""
def __init__(self, llm, embedding_model, cluster_size: int = 5):
self.llm = llm
self.embedding_model = embedding_model
self.cluster_size = cluster_size
self.tree = {}
def build_tree(self, documents: list[dict]):
"""Build hierarchical summary tree."""
# Level 0: Original chunks
current_level = documents
level = 0
self.tree[level] = current_level
while len(current_level) > 1:
level += 1
# Cluster documents
clusters = self._cluster_documents(current_level)
# Summarize each cluster
summaries = []
for cluster in clusters:
summary = self._summarize_cluster(cluster)
summaries.append({
"text": summary,
"children": cluster
})
self.tree[level] = summaries
current_level = summaries
def _cluster_documents(self, documents: list[dict]) -> list[list[dict]]:
"""Cluster documents by embedding similarity."""
from sklearn.cluster import KMeans
import numpy as np
embeddings = self.embedding_model.encode([d["text"] for d in documents])
n_clusters = max(1, len(documents) // self.cluster_size)
kmeans = KMeans(n_clusters=n_clusters)
labels = kmeans.fit_predict(embeddings)
clusters = [[] for _ in range(n_clusters)]
for doc, label in zip(documents, labels):
clusters[label].append(doc)
return [c for c in clusters if c] # Remove empty clusters
def _summarize_cluster(self, cluster: list[dict]) -> str:
"""Generate summary for a cluster of documents."""
texts = "\n---\n".join([d["text"] for d in cluster])
prompt = f"""Summarize the key information from these documents into a
coherent paragraph that captures all important details:
{texts}
Summary:"""
response = self.llm.invoke(prompt)
return response.content
def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
"""Retrieve from multiple levels of the tree."""
all_results = []
# Search each level
for level, documents in self.tree.items():
embeddings = self.embedding_model.encode([d["text"] for d in documents])
query_emb = self.embedding_model.encode(query)
# Find most similar
from sklearn.metrics.pairwise import cosine_similarity
similarities = cosine_similarity([query_emb], embeddings)[0]
top_indices = similarities.argsort()[-3:][::-1]
for idx in top_indices:
all_results.append({
"text": documents[idx]["text"],
"level": level,
"score": similarities[idx]
})
# Sort by score and return top k
all_results.sort(key=lambda x: x["score"], reverse=True)
return all_results[:top_k]| Aspect | Vanilla RAG | Agentic RAG |
|---|---|---|
| Retrieval Decision | Always retrieve | Decide dynamically |
| Source Selection | Fixed sources | Agent chooses sources |
| Iteration | Single retrieval | Multiple retrieval rounds |
| Reasoning | Direct answer | Multi-step reasoning |
| Complexity | Simple pipeline | Complex orchestration |
| Latency | Lower | Higher (multiple LLM calls) |
| Cost | Lower | Higher (more tokens) |
The final stage of RAG is synthesizing retrieved context into a coherent response. This is often overlooked but critically affects quality.
def format_context_for_generation(
documents: list[dict],
strategy: str = "numbered"
) -> str:
"""Format retrieved documents for LLM context."""
if strategy == "numbered":
# Number each source for easy citation
return "\n\n".join([
f"[{i+1}] {doc['text']}"
for i, doc in enumerate(documents)
])
elif strategy == "xml_tagged":
# XML tags help models distinguish sources
return "\n".join([
f"<document id=\"{i+1}\" source=\"{doc.get('source', 'unknown')}\">\n{doc['text']}\n</document>"
for i, doc in enumerate(documents)
])
elif strategy == "relevance_ordered":
# Most relevant first (handles "lost in middle" problem)
sorted_docs = sorted(documents, key=lambda x: x.get('score', 0), reverse=True)
return "\n\n---\n\n".join([doc['text'] for doc in sorted_docs])
elif strategy == "interleaved":
# Alternate high and medium relevance to spread attention
high = documents[:len(documents)//2]
low = documents[len(documents)//2:]
interleaved = []
for h, l in zip(high, low):
interleaved.extend([h, l])
return "\n\n".join([doc['text'] for doc in interleaved])RAG_SYSTEM_PROMPTS = {
"strict_grounding": """You are a helpful assistant that answers questions
based ONLY on the provided context. If the answer is not in the context,
say "I don't have enough information to answer that."
NEVER make up information. ALWAYS cite your sources using [1], [2], etc.""",
"balanced": """You are a helpful assistant. Answer the question using
the provided context. You may supplement with general knowledge, but clearly
distinguish between information from the context (cite with [1], [2]) and
your own knowledge (prefix with "Generally speaking...").""",
"synthesis": """You are an expert analyst. Synthesize information from
the provided sources to give a comprehensive answer. Identify agreements
and conflicts between sources. Cite each claim with the source number.""",
"qa_focused": """Answer the question concisely and directly. Use only
the provided context. Format your response as:
- Direct answer (1-2 sentences)
- Supporting details
- Sources used: [list numbers]"""
}
def generate_response(
query: str,
context: str,
llm,
prompt_style: str = "strict_grounding"
) -> str:
system_prompt = RAG_SYSTEM_PROMPTS[prompt_style]
user_prompt = f"""Context:
{context}
Question: {query}
Answer:"""
response = llm.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
return response.choices[0].message.contentResearch shows LLMs struggle with information in the middle of long contexts. Strategies to mitigate:
def mitigate_lost_in_middle(documents: list[dict]) -> list[dict]:
"""Reorder documents to place most relevant at beginning and end."""
if len(documents) <= 2:
return documents
# Sort by relevance
sorted_docs = sorted(documents, key=lambda x: x.get('score', 0), reverse=True)
# Interleave: best at start and end
result = []
left, right = 0, len(sorted_docs) - 1
toggle = True
while left <= right:
if toggle:
result.append(sorted_docs[left])
left += 1
else:
result.insert(len(result)//2, sorted_docs[right])
right -= 1
toggle = not toggle
return result
# Alternative: Use explicit position markers
def add_position_markers(documents: list[dict]) -> str:
"""Add markers to help model track position."""
n = len(documents)
formatted = []
for i, doc in enumerate(documents):
position = "BEGINNING" if i < n//3 else "MIDDLE" if i < 2*n//3 else "END"
formatted.append(f"[DOCUMENT {i+1} - {position}]\n{doc['text']}")
return "\n\n".join(formatted)class ResponseValidator:
"""Validate and post-process RAG responses."""
def __init__(self, llm):
self.llm = llm
def validate_citations(
self,
response: str,
num_sources: int
) -> dict:
"""Check that citations reference valid sources."""
import re
# Find all citations [1], [2], etc.
citations = re.findall(r'\[(\d+)\]', response)
cited_sources = set(int(c) for c in citations)
# Check for invalid citations
invalid = [c for c in cited_sources if c < 1 or c > num_sources]
return {
"valid": len(invalid) == 0,
"cited_sources": list(cited_sources),
"invalid_citations": invalid,
"uncited_sources": [i for i in range(1, num_sources+1) if i not in cited_sources]
}
def check_hallucination(
self,
response: str,
context: str
) -> dict:
"""Use LLM to check for potential hallucinations."""
prompt = f"""Compare this response to the context and identify any claims
that are NOT supported by the context.
Context:
{context}
Response:
{response}
List any unsupported claims (or say "None found"):"""
result = self.llm.invoke(prompt).content
return {
"has_potential_hallucinations": "none found" not in result.lower(),
"unsupported_claims": result
}
def add_disclaimers(
self,
response: str,
validation: dict
) -> str:
"""Add appropriate disclaimers based on validation."""
if validation.get("has_potential_hallucinations"):
response += "\n\n*Note: Some information may require verification.*"
if validation.get("uncited_sources"):
response += f"\n\n*Additional relevant sources: {validation['uncited_sources']}*"
return responseasync def stream_rag_response(
query: str,
documents: list[dict],
llm_client
):
"""Stream RAG response for better UX."""
context = format_context_for_generation(documents)
# Stream response
stream = await llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": RAG_SYSTEM_PROMPTS["strict_grounding"]},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}
],
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
# Post-process after streaming complete
validation = validate_response(full_response, documents)
if validation["issues"]:
yield f"\n\n---\n*{validation['disclaimer']}*"In the next lesson, we'll explore Multimodal RAG for handling images, tables, and complex documents.