The retrieval layer is where queries meet documents. In this lesson, we'll implement production-ready retrieval systems, covering vector store operations, search patterns, and advanced retrieval techniques.
Every vector store supports these fundamental operations:
import chromadb
from chromadb.utils import embedding_functions
class VectorStore:
def __init__(self, collection_name: str = "documents"):
self.client = chromadb.PersistentClient(path="./chroma_db")
# Use OpenAI embeddings
self.ef = embedding_functions.OpenAIEmbeddingFunction(
model_name="text-embedding-3-small"
)
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=self.ef,
metadata={"hnsw:space": "cosine"}
)
def add_documents(self, documents: list[dict]):
"""Add documents with text and metadata."""
self.collection.add(
documents=[d["text"] for d in documents],
metadatas=[d.get("metadata", {}) for d in documents],
ids=[d["id"] for d in documents]
)
def search(
self,
query: str,
k: int = 5,
filter: dict = None
) -> list[dict]:
"""Search for similar documents."""
results = self.collection.query(
query_texts=[query],
n_results=k,
where=filter
)
return [
{
"id": results["ids"][0][i],
"text": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"distance": results["distances"][0][i]
}
for i in range(len(results["ids"][0]))
]
def delete(self, ids: list[str] = None, filter: dict = None):
"""Delete documents by ID or filter."""
if ids:
self.collection.delete(ids=ids)
elif filter:
self.collection.delete(where=filter)from pinecone import Pinecone, ServerlessSpec
from openai import OpenAI
class PineconeVectorStore:
def __init__(self, index_name: str = "rag-index"):
self.pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
self.openai = OpenAI()
# Create index if not exists
if index_name not in self.pc.list_indexes().names():
self.pc.create_index(
name=index_name,
dimension=1536, # text-embedding-3-small
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
self.index = self.pc.Index(index_name)
def _embed(self, texts: list[str]) -> list[list[float]]:
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return [r.embedding for r in response.data]
def add_documents(self, documents: list[dict], namespace: str = ""):
"""Add documents in batches."""
batch_size = 100
embeddings = self._embed([d["text"] for d in documents])
vectors = [
{
"id": d["id"],
"values": emb,
"metadata": {"text": d["text"], **d.get("metadata", {})}
}
for d, emb in zip(documents, embeddings)
]
for i in range(0, len(vectors), batch_size):
self.index.upsert(
vectors=vectors[i:i+batch_size],
namespace=namespace
)
def search(
self,
query: str,
k: int = 5,
filter: dict = None,
namespace: str = ""
) -> list[dict]:
query_embedding = self._embed([query])[0]
results = self.index.query(
vector=query_embedding,
top_k=k,
filter=filter,
namespace=namespace,
include_metadata=True
)
return [
{
"id": match.id,
"score": match.score,
"text": match.metadata.get("text", ""),
"metadata": match.metadata
}
for match in results.matches
]Return the k most similar documents:
results = vector_store.search(
query="What is machine learning?",
k=5
)MMR balances relevance with diversity to avoid redundant results:
def mmr_search(
query_embedding: list[float],
candidate_embeddings: list[list[float]],
k: int = 5,
lambda_param: float = 0.5
) -> list[int]:
"""
Maximum Marginal Relevance search.
MMR = λ * sim(q, d) - (1-λ) * max(sim(d, d_selected))
Args:
query_embedding: Query vector
candidate_embeddings: List of document vectors
k: Number of results to return
lambda_param: Balance between relevance (1) and diversity (0)
Returns:
Indices of selected documents
"""
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
query = np.array(query_embedding).reshape(1, -1)
candidates = np.array(candidate_embeddings)
# Relevance to query
relevance = cosine_similarity(query, candidates)[0]
selected = []
remaining = list(range(len(candidates)))
for _ in range(k):
if not remaining:
break
if not selected:
# First selection: most relevant
idx = remaining[np.argmax(relevance[remaining])]
else:
# MMR selection
mmr_scores = []
for idx in remaining:
rel = relevance[idx]
# Similarity to already selected docs
sim_to_selected = max(
cosine_similarity(
candidates[idx].reshape(1, -1),
candidates[selected]
)[0]
)
mmr = lambda_param * rel - (1 - lambda_param) * sim_to_selected
mmr_scores.append((idx, mmr))
idx = max(mmr_scores, key=lambda x: x[1])[0]
selected.append(idx)
remaining.remove(idx)
return selected
# Usage with LangChain
from langchain_community.vectorstores import Chroma
results = vectorstore.max_marginal_relevance_search(
query="What is machine learning?",
k=5,
fetch_k=20, # Fetch 20, select 5 with diversity
lambda_mult=0.5 # Balance relevance and diversity
)Constrain results by metadata attributes:
# ChromaDB filter syntax
results = collection.query(
query_texts=["security best practices"],
n_results=10,
where={
"$and": [
{"department": {"$eq": "engineering"}},
{"date": {"$gte": "2024-01-01"}},
{"document_type": {"$in": ["policy", "guide"]}}
]
}
)
# Pinecone filter syntax
results = index.query(
vector=query_embedding,
top_k=10,
filter={
"department": {"$eq": "engineering"},
"date": {"$gte": "2024-01-01"},
"document_type": {"$in": ["policy", "guide"]}
}
)
# Qdrant filter syntax
from qdrant_client.models import Filter, FieldCondition, MatchValue
results = client.search(
collection_name="documents",
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(key="department", match=MatchValue(value="engineering")),
]
),
limit=10
)Generate multiple query variations to improve recall:
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
# Create multi-query retriever
multi_retriever = MultiQueryRetriever.from_llm(
retriever=base_retriever,
llm=llm
)
# It generates multiple queries from the original
# "What are the benefits of RAG?" becomes:
# - "What advantages does RAG provide?"
# - "Why should I use retrieval-augmented generation?"
# - "How does RAG improve AI systems?"
results = multi_retriever.invoke("What are the benefits of RAG?")Retrieve small chunks but return the full parent document:
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Store for parent documents
parent_store = InMemoryStore()
# Small chunks for embedding
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
# Larger chunks to return
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=parent_store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Add documents (splits into children, stores parents)
retriever.add_documents(documents)
# Search returns parent documents, not small chunks
results = retriever.invoke("What is RAG?")Use an LLM to parse natural language queries into structured filters:
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
# Define filterable attributes
metadata_field_info = [
AttributeInfo(
name="author",
description="The author of the document",
type="string"
),
AttributeInfo(
name="date",
description="The date the document was published",
type="string"
),
AttributeInfo(
name="category",
description="The category: 'technical', 'business', 'legal'",
type="string"
),
]
retriever = SelfQueryRetriever.from_llm(
llm=llm,
vectorstore=vectorstore,
document_contents="Technical documentation",
metadata_field_info=metadata_field_info,
)
# Natural language query with implicit filter
results = retriever.invoke(
"Find technical documents by John Smith from 2024"
)
# LLM extracts: filter = {author: "John Smith", date >= "2024-01-01", category: "technical"}Extract only relevant portions from retrieved documents:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
# Compressor extracts relevant sentences
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
# Returns only relevant portions of documents
results = compression_retriever.invoke("What is the refund policy?")
# Instead of full page, returns just: "Refunds are processed within 5-7 business days..."from rank_bm25 import BM25Okapi
import numpy as np
class HybridRetriever:
def __init__(
self,
vector_store,
documents: list[dict],
alpha: float = 0.5
):
self.vector_store = vector_store
self.documents = documents
self.alpha = alpha # Weight for semantic vs lexical
# Build BM25 index
self.tokenized_docs = [
doc["text"].lower().split()
for doc in documents
]
self.bm25 = BM25Okapi(self.tokenized_docs)
# Build doc_id to index mapping
self.id_to_idx = {doc["id"]: i for i, doc in enumerate(documents)}
def search(self, query: str, k: int = 10) -> list[dict]:
# Semantic search
semantic_results = self.vector_store.search(query, k=k*2)
# Lexical search (BM25)
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
# Normalize BM25 scores
if bm25_scores.max() > 0:
bm25_scores = bm25_scores / bm25_scores.max()
# Combine scores using RRF
doc_scores = {}
# Add semantic scores (convert distance to score)
for rank, result in enumerate(semantic_results, 1):
doc_id = result["id"]
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1 / (60 + rank)
# Add BM25 scores
ranked_bm25 = np.argsort(bm25_scores)[::-1][:k*2]
for rank, idx in enumerate(ranked_bm25, 1):
doc_id = self.documents[idx]["id"]
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1 / (60 + rank)
# Sort by combined score
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
# Return top k
results = []
for doc_id, score in sorted_docs[:k]:
idx = self.id_to_idx[doc_id]
results.append({
"id": doc_id,
"text": self.documents[idx]["text"],
"score": score
})
return resultsfrom sentence_transformers import CrossEncoder
class EnsembleRetriever:
def __init__(
self,
retrievers: list,
reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
):
self.retrievers = retrievers
self.reranker = CrossEncoder(reranker_model)
def search(
self,
query: str,
k: int = 5,
fetch_k: int = 20
) -> list[dict]:
# Gather candidates from all retrievers
all_candidates = {}
for retriever in self.retrievers:
results = retriever.search(query, k=fetch_k)
for result in results:
doc_id = result["id"]
if doc_id not in all_candidates:
all_candidates[doc_id] = result
# Deduplicated candidates
candidates = list(all_candidates.values())
if len(candidates) == 0:
return []
# Re-rank with cross-encoder
pairs = [[query, c["text"]] for c in candidates]
scores = self.reranker.predict(pairs)
# Sort by reranker score
ranked = sorted(
zip(scores, candidates),
reverse=True,
key=lambda x: x[0]
)
# Return top k
return [
{**doc, "rerank_score": float(score)}
for score, doc in ranked[:k]
]When building conversational RAG, you need to handle query embedding across dialogue history:
def retrieve_for_chat(messages: list[dict], retriever) -> list:
# Only use the latest user message
latest_query = messages[-1]["content"]
return retriever.search(latest_query)Limitation: Loses context from earlier conversation.
def retrieve_for_chat(messages: list[dict], retriever, max_turns: int = 3) -> list:
# Concatenate recent turns
recent = messages[-max_turns*2:] # User and assistant messages
query = " ".join([m["content"] for m in recent if m["role"] == "user"])
return retriever.search(query)def reformulate_query(messages: list[dict], llm) -> str:
"""Use LLM to create a standalone query from conversation."""
prompt = """Given the following conversation, reformulate the last user
question into a standalone question that captures all necessary context.
Conversation:
{conversation}
Standalone question:"""
conversation = "\n".join([
f"{m['role'].title()}: {m['content']}"
for m in messages
])
response = llm.invoke(prompt.format(conversation=conversation))
return response.content
# Example:
# User: "What is RAG?"
# Assistant: "RAG is Retrieval Augmented Generation..."
# User: "How does it compare to fine-tuning?"
#
# Reformulated: "How does Retrieval Augmented Generation (RAG) compare to fine-tuning?"from functools import lru_cache
import hashlib
class CachedRetriever:
def __init__(self, retriever, cache_size: int = 1000):
self.retriever = retriever
self._cache = {}
self.cache_size = cache_size
def _cache_key(self, query: str, k: int, filter: dict = None) -> str:
content = f"{query}:{k}:{json.dumps(filter, sort_keys=True)}"
return hashlib.md5(content.encode()).hexdigest()
def search(self, query: str, k: int = 5, filter: dict = None):
key = self._cache_key(query, k, filter)
if key in self._cache:
return self._cache[key]
results = self.retriever.search(query, k, filter)
# LRU eviction
if len(self._cache) >= self.cache_size:
oldest = next(iter(self._cache))
del self._cache[oldest]
self._cache[key] = results
return resultsimport asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncRetriever:
def __init__(self, retriever):
self.retriever = retriever
self.executor = ThreadPoolExecutor(max_workers=4)
async def search(self, query: str, k: int = 5):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.retriever.search(query, k)
)
async def multi_search(self, queries: list[str], k: int = 5):
"""Search multiple queries concurrently."""
tasks = [self.search(q, k) for q in queries]
return await asyncio.gather(*tasks)import time
import logging
class InstrumentedRetriever:
def __init__(self, retriever):
self.retriever = retriever
self.logger = logging.getLogger(__name__)
def search(self, query: str, k: int = 5, **kwargs):
start = time.time()
try:
results = self.retriever.search(query, k, **kwargs)
self.logger.info(
"retrieval_success",
extra={
"query_length": len(query),
"k": k,
"num_results": len(results),
"latency_ms": (time.time() - start) * 1000,
"top_score": results[0]["score"] if results else None
}
)
return results
except Exception as e:
self.logger.error(
"retrieval_error",
extra={"error": str(e), "query": query[:100]}
)
raiseIn the next module, we'll explore advanced RAG techniques including query rewriting, re-ranking, and hybrid RAG architectures.