RAG Systems

0 of 13 lessons completed

Implementing the Retrieval Layer

The retrieval layer is where queries meet documents. In this lesson, we'll implement production-ready retrieval systems, covering vector store operations, search patterns, and advanced retrieval techniques.

Vector Store Operations

Core Operations

Every vector store supports these fundamental operations:

  • Insert/Upsert - Add or update vectors with metadata
  • Search - Find k-nearest neighbors to a query vector
  • Delete - Remove vectors by ID or filter
  • Filter - Constrain search by metadata conditions

Implementing with ChromaDB

import chromadb
from chromadb.utils import embedding_functions

class VectorStore:
    def __init__(self, collection_name: str = "documents"):
        self.client = chromadb.PersistentClient(path="./chroma_db")
        
        # Use OpenAI embeddings
        self.ef = embedding_functions.OpenAIEmbeddingFunction(
            model_name="text-embedding-3-small"
        )
        
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=self.ef,
            metadata={"hnsw:space": "cosine"}
        )
    
    def add_documents(self, documents: list[dict]):
        """Add documents with text and metadata."""
        self.collection.add(
            documents=[d["text"] for d in documents],
            metadatas=[d.get("metadata", {}) for d in documents],
            ids=[d["id"] for d in documents]
        )
    
    def search(
        self,
        query: str,
        k: int = 5,
        filter: dict = None
    ) -> list[dict]:
        """Search for similar documents."""
        results = self.collection.query(
            query_texts=[query],
            n_results=k,
            where=filter
        )
        
        return [
            {
                "id": results["ids"][0][i],
                "text": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "distance": results["distances"][0][i]
            }
            for i in range(len(results["ids"][0]))
        ]
    
    def delete(self, ids: list[str] = None, filter: dict = None):
        """Delete documents by ID or filter."""
        if ids:
            self.collection.delete(ids=ids)
        elif filter:
            self.collection.delete(where=filter)

Implementing with Pinecone

from pinecone import Pinecone, ServerlessSpec
from openai import OpenAI

class PineconeVectorStore:
    def __init__(self, index_name: str = "rag-index"):
        self.pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
        self.openai = OpenAI()
        
        # Create index if not exists
        if index_name not in self.pc.list_indexes().names():
            self.pc.create_index(
                name=index_name,
                dimension=1536,  # text-embedding-3-small
                metric="cosine",
                spec=ServerlessSpec(cloud="aws", region="us-east-1")
            )
        
        self.index = self.pc.Index(index_name)
    
    def _embed(self, texts: list[str]) -> list[list[float]]:
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=texts
        )
        return [r.embedding for r in response.data]
    
    def add_documents(self, documents: list[dict], namespace: str = ""):
        """Add documents in batches."""
        batch_size = 100
        embeddings = self._embed([d["text"] for d in documents])
        
        vectors = [
            {
                "id": d["id"],
                "values": emb,
                "metadata": {"text": d["text"], **d.get("metadata", {})}
            }
            for d, emb in zip(documents, embeddings)
        ]
        
        for i in range(0, len(vectors), batch_size):
            self.index.upsert(
                vectors=vectors[i:i+batch_size],
                namespace=namespace
            )
    
    def search(
        self,
        query: str,
        k: int = 5,
        filter: dict = None,
        namespace: str = ""
    ) -> list[dict]:
        query_embedding = self._embed([query])[0]
        
        results = self.index.query(
            vector=query_embedding,
            top_k=k,
            filter=filter,
            namespace=namespace,
            include_metadata=True
        )
        
        return [
            {
                "id": match.id,
                "score": match.score,
                "text": match.metadata.get("text", ""),
                "metadata": match.metadata
            }
            for match in results.matches
        ]

Search Strategies

1. Basic Similarity Search

Return the k most similar documents:

results = vector_store.search(
    query="What is machine learning?",
    k=5
)

2. Maximum Marginal Relevance (MMR)

MMR balances relevance with diversity to avoid redundant results:

def mmr_search(
    query_embedding: list[float],
    candidate_embeddings: list[list[float]],
    k: int = 5,
    lambda_param: float = 0.5
) -> list[int]:
    """
    Maximum Marginal Relevance search.
    
    MMR = λ * sim(q, d) - (1-λ) * max(sim(d, d_selected))
    
    Args:
        query_embedding: Query vector
        candidate_embeddings: List of document vectors
        k: Number of results to return
        lambda_param: Balance between relevance (1) and diversity (0)
    
    Returns:
        Indices of selected documents
    """
    import numpy as np
    from sklearn.metrics.pairwise import cosine_similarity
    
    query = np.array(query_embedding).reshape(1, -1)
    candidates = np.array(candidate_embeddings)
    
    # Relevance to query
    relevance = cosine_similarity(query, candidates)[0]
    
    selected = []
    remaining = list(range(len(candidates)))
    
    for _ in range(k):
        if not remaining:
            break
        
        if not selected:
            # First selection: most relevant
            idx = remaining[np.argmax(relevance[remaining])]
        else:
            # MMR selection
            mmr_scores = []
            for idx in remaining:
                rel = relevance[idx]
                # Similarity to already selected docs
                sim_to_selected = max(
                    cosine_similarity(
                        candidates[idx].reshape(1, -1),
                        candidates[selected]
                    )[0]
                )
                mmr = lambda_param * rel - (1 - lambda_param) * sim_to_selected
                mmr_scores.append((idx, mmr))
            
            idx = max(mmr_scores, key=lambda x: x[1])[0]
        
        selected.append(idx)
        remaining.remove(idx)
    
    return selected

# Usage with LangChain
from langchain_community.vectorstores import Chroma

results = vectorstore.max_marginal_relevance_search(
    query="What is machine learning?",
    k=5,
    fetch_k=20,  # Fetch 20, select 5 with diversity
    lambda_mult=0.5  # Balance relevance and diversity
)

3. Filtered Search

Constrain results by metadata attributes:

# ChromaDB filter syntax
results = collection.query(
    query_texts=["security best practices"],
    n_results=10,
    where={
        "$and": [
            {"department": {"$eq": "engineering"}},
            {"date": {"$gte": "2024-01-01"}},
            {"document_type": {"$in": ["policy", "guide"]}}
        ]
    }
)

# Pinecone filter syntax
results = index.query(
    vector=query_embedding,
    top_k=10,
    filter={
        "department": {"$eq": "engineering"},
        "date": {"$gte": "2024-01-01"},
        "document_type": {"$in": ["policy", "guide"]}
    }
)

# Qdrant filter syntax
from qdrant_client.models import Filter, FieldCondition, MatchValue

results = client.search(
    collection_name="documents",
    query_vector=query_embedding,
    query_filter=Filter(
        must=[
            FieldCondition(key="department", match=MatchValue(value="engineering")),
        ]
    ),
    limit=10
)

Advanced Retrieval Patterns

1. Multi-Query Retrieval

Generate multiple query variations to improve recall:

from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", temperature=0.3)

# Create multi-query retriever
multi_retriever = MultiQueryRetriever.from_llm(
    retriever=base_retriever,
    llm=llm
)

# It generates multiple queries from the original
# "What are the benefits of RAG?" becomes:
# - "What advantages does RAG provide?"
# - "Why should I use retrieval-augmented generation?"
# - "How does RAG improve AI systems?"

results = multi_retriever.invoke("What are the benefits of RAG?")

2. Parent Document Retrieval

Retrieve small chunks but return the full parent document:

from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Store for parent documents
parent_store = InMemoryStore()

# Small chunks for embedding
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)

# Larger chunks to return
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)

retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=parent_store,
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
)

# Add documents (splits into children, stores parents)
retriever.add_documents(documents)

# Search returns parent documents, not small chunks
results = retriever.invoke("What is RAG?")

3. Self-Query Retrieval

Use an LLM to parse natural language queries into structured filters:

from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo

# Define filterable attributes
metadata_field_info = [
    AttributeInfo(
        name="author",
        description="The author of the document",
        type="string"
    ),
    AttributeInfo(
        name="date",
        description="The date the document was published",
        type="string"
    ),
    AttributeInfo(
        name="category",
        description="The category: 'technical', 'business', 'legal'",
        type="string"
    ),
]

retriever = SelfQueryRetriever.from_llm(
    llm=llm,
    vectorstore=vectorstore,
    document_contents="Technical documentation",
    metadata_field_info=metadata_field_info,
)

# Natural language query with implicit filter
results = retriever.invoke(
    "Find technical documents by John Smith from 2024"
)
# LLM extracts: filter = {author: "John Smith", date >= "2024-01-01", category: "technical"}

4. Contextual Compression

Extract only relevant portions from retrieved documents:

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# Compressor extracts relevant sentences
compressor = LLMChainExtractor.from_llm(llm)

compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=base_retriever
)

# Returns only relevant portions of documents
results = compression_retriever.invoke("What is the refund policy?")
# Instead of full page, returns just: "Refunds are processed within 5-7 business days..."

Hybrid Retrieval Implementation

from rank_bm25 import BM25Okapi
import numpy as np

class HybridRetriever:
    def __init__(
        self,
        vector_store,
        documents: list[dict],
        alpha: float = 0.5
    ):
        self.vector_store = vector_store
        self.documents = documents
        self.alpha = alpha  # Weight for semantic vs lexical
        
        # Build BM25 index
        self.tokenized_docs = [
            doc["text"].lower().split()
            for doc in documents
        ]
        self.bm25 = BM25Okapi(self.tokenized_docs)
        
        # Build doc_id to index mapping
        self.id_to_idx = {doc["id"]: i for i, doc in enumerate(documents)}
    
    def search(self, query: str, k: int = 10) -> list[dict]:
        # Semantic search
        semantic_results = self.vector_store.search(query, k=k*2)
        
        # Lexical search (BM25)
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)
        
        # Normalize BM25 scores
        if bm25_scores.max() > 0:
            bm25_scores = bm25_scores / bm25_scores.max()
        
        # Combine scores using RRF
        doc_scores = {}
        
        # Add semantic scores (convert distance to score)
        for rank, result in enumerate(semantic_results, 1):
            doc_id = result["id"]
            doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1 / (60 + rank)
        
        # Add BM25 scores
        ranked_bm25 = np.argsort(bm25_scores)[::-1][:k*2]
        for rank, idx in enumerate(ranked_bm25, 1):
            doc_id = self.documents[idx]["id"]
            doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1 / (60 + rank)
        
        # Sort by combined score
        sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
        
        # Return top k
        results = []
        for doc_id, score in sorted_docs[:k]:
            idx = self.id_to_idx[doc_id]
            results.append({
                "id": doc_id,
                "text": self.documents[idx]["text"],
                "score": score
            })
        
        return results

Ensemble Retrieval with Re-ranking

from sentence_transformers import CrossEncoder

class EnsembleRetriever:
    def __init__(
        self,
        retrievers: list,
        reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    ):
        self.retrievers = retrievers
        self.reranker = CrossEncoder(reranker_model)
    
    def search(
        self,
        query: str,
        k: int = 5,
        fetch_k: int = 20
    ) -> list[dict]:
        # Gather candidates from all retrievers
        all_candidates = {}
        
        for retriever in self.retrievers:
            results = retriever.search(query, k=fetch_k)
            for result in results:
                doc_id = result["id"]
                if doc_id not in all_candidates:
                    all_candidates[doc_id] = result
        
        # Deduplicated candidates
        candidates = list(all_candidates.values())
        
        if len(candidates) == 0:
            return []
        
        # Re-rank with cross-encoder
        pairs = [[query, c["text"]] for c in candidates]
        scores = self.reranker.predict(pairs)
        
        # Sort by reranker score
        ranked = sorted(
            zip(scores, candidates),
            reverse=True,
            key=lambda x: x[0]
        )
        
        # Return top k
        return [
            {**doc, "rerank_score": float(score)}
            for score, doc in ranked[:k]
        ]

RAG in Multi-Turn Chatbots

When building conversational RAG, you need to handle query embedding across dialogue history:

Strategy 1: Embed Latest User Turn Only

def retrieve_for_chat(messages: list[dict], retriever) -> list:
    # Only use the latest user message
    latest_query = messages[-1]["content"]
    return retriever.search(latest_query)

Limitation: Loses context from earlier conversation.

Strategy 2: Concatenate Recent Turns

def retrieve_for_chat(messages: list[dict], retriever, max_turns: int = 3) -> list:
    # Concatenate recent turns
    recent = messages[-max_turns*2:]  # User and assistant messages
    query = " ".join([m["content"] for m in recent if m["role"] == "user"])
    return retriever.search(query)

Strategy 3: Query Reformulation (Recommended)

def reformulate_query(messages: list[dict], llm) -> str:
    """Use LLM to create a standalone query from conversation."""
    
    prompt = """Given the following conversation, reformulate the last user 
question into a standalone question that captures all necessary context.

Conversation:
{conversation}

Standalone question:"""
    
    conversation = "\n".join([
        f"{m['role'].title()}: {m['content']}"
        for m in messages
    ])
    
    response = llm.invoke(prompt.format(conversation=conversation))
    return response.content

# Example:
# User: "What is RAG?"
# Assistant: "RAG is Retrieval Augmented Generation..."
# User: "How does it compare to fine-tuning?"
# 
# Reformulated: "How does Retrieval Augmented Generation (RAG) compare to fine-tuning?"

Production Considerations

1. Caching

from functools import lru_cache
import hashlib

class CachedRetriever:
    def __init__(self, retriever, cache_size: int = 1000):
        self.retriever = retriever
        self._cache = {}
        self.cache_size = cache_size
    
    def _cache_key(self, query: str, k: int, filter: dict = None) -> str:
        content = f"{query}:{k}:{json.dumps(filter, sort_keys=True)}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def search(self, query: str, k: int = 5, filter: dict = None):
        key = self._cache_key(query, k, filter)
        
        if key in self._cache:
            return self._cache[key]
        
        results = self.retriever.search(query, k, filter)
        
        # LRU eviction
        if len(self._cache) >= self.cache_size:
            oldest = next(iter(self._cache))
            del self._cache[oldest]
        
        self._cache[key] = results
        return results

2. Async/Concurrent Retrieval

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncRetriever:
    def __init__(self, retriever):
        self.retriever = retriever
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    async def search(self, query: str, k: int = 5):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: self.retriever.search(query, k)
        )
    
    async def multi_search(self, queries: list[str], k: int = 5):
        """Search multiple queries concurrently."""
        tasks = [self.search(q, k) for q in queries]
        return await asyncio.gather(*tasks)

3. Observability

import time
import logging

class InstrumentedRetriever:
    def __init__(self, retriever):
        self.retriever = retriever
        self.logger = logging.getLogger(__name__)
    
    def search(self, query: str, k: int = 5, **kwargs):
        start = time.time()
        
        try:
            results = self.retriever.search(query, k, **kwargs)
            
            self.logger.info(
                "retrieval_success",
                extra={
                    "query_length": len(query),
                    "k": k,
                    "num_results": len(results),
                    "latency_ms": (time.time() - start) * 1000,
                    "top_score": results[0]["score"] if results else None
                }
            )
            
            return results
            
        except Exception as e:
            self.logger.error(
                "retrieval_error",
                extra={"error": str(e), "query": query[:100]}
            )
            raise

Key Takeaways

  • Understand your vector store's query syntax for filtering and search
  • Use MMR when diversity in results matters
  • Implement hybrid retrieval combining lexical and semantic search
  • Use parent document retrieval for small-to-large context expansion
  • Re-rank with cross-encoders for precision-critical applications
  • Reformulate queries in chatbots to capture conversation context
  • Add caching, async, and observability for production

In the next module, we'll explore advanced RAG techniques including query rewriting, re-ranking, and hybrid RAG architectures.