Deploying RAG systems to production requires careful consideration of scalability, reliability, security, and operational concerns. This lesson covers architecture patterns, deployment strategies, and production best practices.
┌─────────────────┐
│ Load Balancer │
└────────┬─────────┘
│
┌─────────────┴─────────────┐
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ API Server │ │ API Server │
│ (Stateless)│ │ (Stateless)│
└──────┬──────┘ └──────┬──────┘
│ │
└─────────────┬─────────────┘
│
┌───────────────────────┼───────────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Redis │ │ Vector │ │ LLM API │
│ Cache │ │ Database │ │ Gateway │
└─────────────┘ └─────────────┘ └─────────────┘# Example microservices structure
"""
rag-system/
├── services/
│ ├── api-gateway/ # Request routing, auth, rate limiting
│ ├── embedding-service/ # Embedding generation
│ ├── retrieval-service/ # Vector search and retrieval
│ ├── reranking-service/ # Re-ranking candidates
│ ├── generation-service/ # LLM orchestration
│ └── ingestion-service/ # Document processing
├── shared/
│ ├── schemas/ # Pydantic models
│ ├── clients/ # Service clients
│ └── utils/ # Shared utilities
├── infrastructure/
│ ├── docker/
│ ├── kubernetes/
│ └── terraform/
└── docker-compose.yml
"""
# API Gateway (FastAPI)
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
app = FastAPI(title="RAG API Gateway")
class QueryRequest(BaseModel):
query: str
top_k: int = 5
filters: dict = None
class QueryResponse(BaseModel):
answer: str
sources: list[dict]
latency_ms: float
@app.post("/v1/query", response_model=QueryResponse)
async def query(
request: QueryRequest,
api_key: str = Depends(verify_api_key)
):
# Rate limiting
await check_rate_limit(api_key)
# Call internal services
async with httpx.AsyncClient() as client:
# Embedding
emb_response = await client.post(
"http://embedding-service/embed",
json={"text": request.query}
)
# Retrieval
ret_response = await client.post(
"http://retrieval-service/search",
json={
"embedding": emb_response.json()["embedding"],
"top_k": request.top_k,
"filters": request.filters
}
)
# Generation
gen_response = await client.post(
"http://generation-service/generate",
json={
"query": request.query,
"contexts": ret_response.json()["documents"]
}
)
return QueryResponse(**gen_response.json())# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-api
labels:
app: rag-api
spec:
replicas: 3
selector:
matchLabels:
app: rag-api
template:
metadata:
labels:
app: rag-api
spec:
containers:
- name: rag-api
image: myregistry/rag-api:v1.0.0
ports:
- containerPort: 8000
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: rag-secrets
key: openai-api-key
- name: VECTOR_DB_URL
valueFrom:
configMapKeyRef:
name: rag-config
key: vector-db-url
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: rag-api
spec:
selector:
app: rag-api
ports:
- port: 80
targetPort: 8000
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: rag-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: rag-api
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70from pydantic_settings import BaseSettings
from functools import lru_cache
class RAGSettings(BaseSettings):
# API Keys
openai_api_key: str
cohere_api_key: str = None
# Vector Database
vector_db_url: str
vector_db_collection: str = "documents"
# Model Configuration
embedding_model: str = "text-embedding-3-small"
llm_model: str = "gpt-4o"
reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
# Performance
retrieval_top_k: int = 10
rerank_top_k: int = 5
max_context_tokens: int = 4000
# Caching
redis_url: str = "redis://localhost:6379"
cache_ttl_seconds: int = 3600
# Rate Limiting
rate_limit_requests: int = 100
rate_limit_window_seconds: int = 60
class Config:
env_file = ".env"
env_prefix = "RAG_"
@lru_cache()
def get_settings() -> RAGSettings:
return RAGSettings()import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit
class ResilientRAG:
"""RAG system with production-grade error handling."""
def __init__(self, config: RAGSettings):
self.config = config
self.fallback_responses = {
"retrieval_failed": "I'm having trouble accessing our knowledge base. Please try again.",
"generation_failed": "I couldn't generate a response. Please try again.",
"timeout": "The request took too long. Please try with a simpler question."
}
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def embed_with_retry(self, text: str):
"""Retry embedding on transient failures."""
return await self.embedder.embed(text)
@circuit(failure_threshold=5, recovery_timeout=30)
async def retrieve_with_circuit_breaker(self, embedding, k: int):
"""Circuit breaker for vector DB."""
return await self.retriever.search(embedding, k=k)
async def query(self, user_query: str) -> dict:
try:
# Embedding with timeout
try:
embedding = await asyncio.wait_for(
self.embed_with_retry(user_query),
timeout=5.0
)
except asyncio.TimeoutError:
return {"answer": self.fallback_responses["timeout"], "error": True}
# Retrieval with circuit breaker
try:
documents = await self.retrieve_with_circuit_breaker(
embedding,
k=self.config.retrieval_top_k
)
except Exception as e:
return {"answer": self.fallback_responses["retrieval_failed"], "error": True}
# Generation with fallback
try:
response = await asyncio.wait_for(
self.generate(user_query, documents),
timeout=30.0
)
except (asyncio.TimeoutError, Exception):
# Try with fewer documents
try:
response = await self.generate(user_query, documents[:2])
except:
return {"answer": self.fallback_responses["generation_failed"], "error": True}
return {"answer": response, "sources": documents, "error": False}
except Exception as e:
logging.error(f"Unexpected error: {e}")
return {"answer": "An unexpected error occurred.", "error": True}from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader
import secrets
import hashlib
# API Key Authentication
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(api_key_header)) -> str:
"""Verify API key and return associated user."""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
user = await db.get_user_by_api_key_hash(key_hash)
if not user or not user.is_active:
raise HTTPException(status_code=403, detail="Invalid API key")
return user.id
# Input Sanitization
import re
def sanitize_query(query: str) -> str:
"""Sanitize user input to prevent injection attacks."""
# Remove potential prompt injection patterns
injection_patterns = [
r"ignore previous instructions",
r"disregard above",
r"system:",
r"\[INST\]",
]
sanitized = query
for pattern in injection_patterns:
sanitized = re.sub(pattern, "", sanitized, flags=re.IGNORECASE)
# Limit length
return sanitized[:2000].strip()
# Document Access Control
class SecureRetriever:
"""Retriever with access control."""
async def search(
self,
query: str,
user_id: str,
k: int = 10
) -> list[dict]:
# Get user's accessible document IDs
accessible_docs = await self.get_user_permissions(user_id)
# Add access control filter to query
results = await self.vector_store.search(
query,
k=k,
filter={"document_id": {"$in": accessible_docs}}
)
return resultsfrom prometheus_client import Counter, Histogram, Gauge
import structlog
# Metrics
rag_queries_total = Counter(
'rag_queries_total',
'Total RAG queries',
['status', 'model']
)
rag_latency = Histogram(
'rag_query_latency_seconds',
'RAG query latency',
['stage'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
rag_tokens = Counter(
'rag_tokens_total',
'Total tokens used',
['type'] # input, output
)
# Structured Logging
logger = structlog.get_logger()
class ObservableRAG:
async def query(self, user_query: str, user_id: str) -> dict:
request_id = str(uuid.uuid4())
log = logger.bind(request_id=request_id, user_id=user_id)
log.info("query_started", query_length=len(user_query))
with rag_latency.labels(stage="total").time():
try:
# Embedding
with rag_latency.labels(stage="embedding").time():
embedding = await self.embed(user_query)
# Retrieval
with rag_latency.labels(stage="retrieval").time():
documents = await self.retrieve(embedding)
log.info("retrieval_complete", num_documents=len(documents))
# Generation
with rag_latency.labels(stage="generation").time():
response = await self.generate(user_query, documents)
rag_queries_total.labels(status="success", model=self.model).inc()
rag_tokens.labels(type="input").inc(response["input_tokens"])
rag_tokens.labels(type="output").inc(response["output_tokens"])
log.info(
"query_complete",
input_tokens=response["input_tokens"],
output_tokens=response["output_tokens"]
)
return response
except Exception as e:
rag_queries_total.labels(status="error", model=self.model).inc()
log.error("query_failed", error=str(e))
raise# .github/workflows/deploy.yml
name: Deploy RAG System
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-asyncio
- name: Run tests
run: pytest tests/ -v
- name: Run RAG evaluation
run: python scripts/evaluate_rag.py --dataset eval_data.json
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: docker build -t rag-api:$GITHUB_SHA .
- name: Push to registry
run: |
docker tag rag-api:$GITHUB_SHA myregistry/rag-api:$GITHUB_SHA
docker push myregistry/rag-api:$GITHUB_SHA
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/rag-api \
rag-api=myregistry/rag-api:$GITHUB_SHAfrom celery import Celery
import boto3
celery_app = Celery('rag_tasks', broker='redis://localhost:6379')
@celery_app.task
def process_document(document_id: str, s3_key: str):
"""Async document ingestion task."""
try:
# Download from S3
s3 = boto3.client('s3')
document = s3.get_object(Bucket='documents', Key=s3_key)
content = document['Body'].read()
# Extract text
text = extract_text(content)
# Chunk
chunks = chunk_document(text)
# Generate embeddings
embeddings = embed_batch(chunks)
# Store in vector DB
vector_store.upsert(
ids=[f"{document_id}_{i}" for i in range(len(chunks))],
embeddings=embeddings,
documents=chunks,
metadata=[{"document_id": document_id, "chunk_idx": i} for i in range(len(chunks))]
)
# Update status
db.update_document_status(document_id, "indexed")
except Exception as e:
db.update_document_status(document_id, "failed", error=str(e))
raise
# API endpoint to trigger ingestion
@app.post("/v1/documents")
async def ingest_document(file: UploadFile):
document_id = str(uuid.uuid4())
# Upload to S3
s3_key = f"uploads/{document_id}/{file.filename}"
s3.upload_fileobj(file.file, 'documents', s3_key)
# Queue processing
process_document.delay(document_id, s3_key)
return {"document_id": document_id, "status": "processing"}Pre-Deployment:
□ Run full evaluation suite on staging
□ Load test with expected traffic patterns
□ Security audit (API keys, access control, input validation)
□ Review error handling and fallbacks
□ Set up monitoring dashboards and alerts
Infrastructure:
□ Configure auto-scaling rules
□ Set up health checks and readiness probes
□ Configure load balancer and SSL
□ Set up Redis cluster for caching
□ Configure vector database replication
Observability:
□ Structured logging to central system
□ Prometheus metrics for latency, errors, tokens
□ Distributed tracing for request flow
□ Alerting for error rates and latency spikes
Operations:
□ Document runbook for common issues
□ Set up on-call rotation
□ Configure backup and disaster recovery
□ Plan for zero-downtime deploymentsCongratulations! You've completed the RAG Systems course. You now have the knowledge to design, build, and deploy production-grade RAG systems.