RAG Systems

0 of 13 lessons completed

Deploying RAG Systems to Production

Deploying RAG systems to production requires careful consideration of scalability, reliability, security, and operational concerns. This lesson covers architecture patterns, deployment strategies, and production best practices.

Production Architecture

                         ┌─────────────────┐
                         │   Load Balancer  │
                         └────────┬─────────┘
                                  │
                    ┌─────────────┴─────────────┐
                    │                           │
             ┌──────▼──────┐             ┌──────▼──────┐
             │  API Server │             │  API Server │
             │  (Stateless)│             │  (Stateless)│
             └──────┬──────┘             └──────┬──────┘
                    │                           │
                    └─────────────┬─────────────┘
                                  │
          ┌───────────────────────┼───────────────────────┐
          │                       │                       │
   ┌──────▼──────┐         ┌──────▼──────┐         ┌──────▼──────┐
   │   Redis     │         │   Vector    │         │   LLM API   │
   │   Cache     │         │   Database  │         │   Gateway   │
   └─────────────┘         └─────────────┘         └─────────────┘

Microservices Architecture

# Example microservices structure
"""
rag-system/
├── services/
│   ├── api-gateway/           # Request routing, auth, rate limiting
│   ├── embedding-service/     # Embedding generation
│   ├── retrieval-service/     # Vector search and retrieval
│   ├── reranking-service/     # Re-ranking candidates
│   ├── generation-service/    # LLM orchestration
│   └── ingestion-service/     # Document processing
├── shared/
│   ├── schemas/               # Pydantic models
│   ├── clients/               # Service clients
│   └── utils/                 # Shared utilities
├── infrastructure/
│   ├── docker/
│   ├── kubernetes/
│   └── terraform/
└── docker-compose.yml
"""

# API Gateway (FastAPI)
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel

app = FastAPI(title="RAG API Gateway")

class QueryRequest(BaseModel):
    query: str
    top_k: int = 5
    filters: dict = None

class QueryResponse(BaseModel):
    answer: str
    sources: list[dict]
    latency_ms: float

@app.post("/v1/query", response_model=QueryResponse)
async def query(
    request: QueryRequest,
    api_key: str = Depends(verify_api_key)
):
    # Rate limiting
    await check_rate_limit(api_key)
    
    # Call internal services
    async with httpx.AsyncClient() as client:
        # Embedding
        emb_response = await client.post(
            "http://embedding-service/embed",
            json={"text": request.query}
        )
        
        # Retrieval
        ret_response = await client.post(
            "http://retrieval-service/search",
            json={
                "embedding": emb_response.json()["embedding"],
                "top_k": request.top_k,
                "filters": request.filters
            }
        )
        
        # Generation
        gen_response = await client.post(
            "http://generation-service/generate",
            json={
                "query": request.query,
                "contexts": ret_response.json()["documents"]
            }
        )
    
    return QueryResponse(**gen_response.json())

Kubernetes Deployment

# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rag-api
  labels:
    app: rag-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: rag-api
  template:
    metadata:
      labels:
        app: rag-api
    spec:
      containers:
      - name: rag-api
        image: myregistry/rag-api:v1.0.0
        ports:
        - containerPort: 8000
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: rag-secrets
              key: openai-api-key
        - name: VECTOR_DB_URL
          valueFrom:
            configMapKeyRef:
              name: rag-config
              key: vector-db-url
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: rag-api
spec:
  selector:
    app: rag-api
  ports:
  - port: 80
    targetPort: 8000
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: rag-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: rag-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

Configuration Management

from pydantic_settings import BaseSettings
from functools import lru_cache

class RAGSettings(BaseSettings):
    # API Keys
    openai_api_key: str
    cohere_api_key: str = None
    
    # Vector Database
    vector_db_url: str
    vector_db_collection: str = "documents"
    
    # Model Configuration
    embedding_model: str = "text-embedding-3-small"
    llm_model: str = "gpt-4o"
    reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    
    # Performance
    retrieval_top_k: int = 10
    rerank_top_k: int = 5
    max_context_tokens: int = 4000
    
    # Caching
    redis_url: str = "redis://localhost:6379"
    cache_ttl_seconds: int = 3600
    
    # Rate Limiting
    rate_limit_requests: int = 100
    rate_limit_window_seconds: int = 60
    
    class Config:
        env_file = ".env"
        env_prefix = "RAG_"

@lru_cache()
def get_settings() -> RAGSettings:
    return RAGSettings()

Error Handling and Resilience

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit

class ResilientRAG:
    """RAG system with production-grade error handling."""
    
    def __init__(self, config: RAGSettings):
        self.config = config
        self.fallback_responses = {
            "retrieval_failed": "I'm having trouble accessing our knowledge base. Please try again.",
            "generation_failed": "I couldn't generate a response. Please try again.",
            "timeout": "The request took too long. Please try with a simpler question."
        }
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10)
    )
    async def embed_with_retry(self, text: str):
        """Retry embedding on transient failures."""
        return await self.embedder.embed(text)
    
    @circuit(failure_threshold=5, recovery_timeout=30)
    async def retrieve_with_circuit_breaker(self, embedding, k: int):
        """Circuit breaker for vector DB."""
        return await self.retriever.search(embedding, k=k)
    
    async def query(self, user_query: str) -> dict:
        try:
            # Embedding with timeout
            try:
                embedding = await asyncio.wait_for(
                    self.embed_with_retry(user_query),
                    timeout=5.0
                )
            except asyncio.TimeoutError:
                return {"answer": self.fallback_responses["timeout"], "error": True}
            
            # Retrieval with circuit breaker
            try:
                documents = await self.retrieve_with_circuit_breaker(
                    embedding,
                    k=self.config.retrieval_top_k
                )
            except Exception as e:
                return {"answer": self.fallback_responses["retrieval_failed"], "error": True}
            
            # Generation with fallback
            try:
                response = await asyncio.wait_for(
                    self.generate(user_query, documents),
                    timeout=30.0
                )
            except (asyncio.TimeoutError, Exception):
                # Try with fewer documents
                try:
                    response = await self.generate(user_query, documents[:2])
                except:
                    return {"answer": self.fallback_responses["generation_failed"], "error": True}
            
            return {"answer": response, "sources": documents, "error": False}
        
        except Exception as e:
            logging.error(f"Unexpected error: {e}")
            return {"answer": "An unexpected error occurred.", "error": True}

Security Considerations

from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader
import secrets
import hashlib

# API Key Authentication
api_key_header = APIKeyHeader(name="X-API-Key")

async def verify_api_key(api_key: str = Security(api_key_header)) -> str:
    """Verify API key and return associated user."""
    key_hash = hashlib.sha256(api_key.encode()).hexdigest()
    user = await db.get_user_by_api_key_hash(key_hash)
    
    if not user or not user.is_active:
        raise HTTPException(status_code=403, detail="Invalid API key")
    
    return user.id

# Input Sanitization
import re

def sanitize_query(query: str) -> str:
    """Sanitize user input to prevent injection attacks."""
    # Remove potential prompt injection patterns
    injection_patterns = [
        r"ignore previous instructions",
        r"disregard above",
        r"system:",
        r"\[INST\]",
    ]
    
    sanitized = query
    for pattern in injection_patterns:
        sanitized = re.sub(pattern, "", sanitized, flags=re.IGNORECASE)
    
    # Limit length
    return sanitized[:2000].strip()

# Document Access Control
class SecureRetriever:
    """Retriever with access control."""
    
    async def search(
        self,
        query: str,
        user_id: str,
        k: int = 10
    ) -> list[dict]:
        # Get user's accessible document IDs
        accessible_docs = await self.get_user_permissions(user_id)
        
        # Add access control filter to query
        results = await self.vector_store.search(
            query,
            k=k,
            filter={"document_id": {"$in": accessible_docs}}
        )
        
        return results

Monitoring and Observability

from prometheus_client import Counter, Histogram, Gauge
import structlog

# Metrics
rag_queries_total = Counter(
    'rag_queries_total',
    'Total RAG queries',
    ['status', 'model']
)

rag_latency = Histogram(
    'rag_query_latency_seconds',
    'RAG query latency',
    ['stage'],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

rag_tokens = Counter(
    'rag_tokens_total',
    'Total tokens used',
    ['type']  # input, output
)

# Structured Logging
logger = structlog.get_logger()

class ObservableRAG:
    async def query(self, user_query: str, user_id: str) -> dict:
        request_id = str(uuid.uuid4())
        log = logger.bind(request_id=request_id, user_id=user_id)
        
        log.info("query_started", query_length=len(user_query))
        
        with rag_latency.labels(stage="total").time():
            try:
                # Embedding
                with rag_latency.labels(stage="embedding").time():
                    embedding = await self.embed(user_query)
                
                # Retrieval
                with rag_latency.labels(stage="retrieval").time():
                    documents = await self.retrieve(embedding)
                
                log.info("retrieval_complete", num_documents=len(documents))
                
                # Generation
                with rag_latency.labels(stage="generation").time():
                    response = await self.generate(user_query, documents)
                
                rag_queries_total.labels(status="success", model=self.model).inc()
                rag_tokens.labels(type="input").inc(response["input_tokens"])
                rag_tokens.labels(type="output").inc(response["output_tokens"])
                
                log.info(
                    "query_complete",
                    input_tokens=response["input_tokens"],
                    output_tokens=response["output_tokens"]
                )
                
                return response
            
            except Exception as e:
                rag_queries_total.labels(status="error", model=self.model).inc()
                log.error("query_failed", error=str(e))
                raise

CI/CD Pipeline

# .github/workflows/deploy.yml
name: Deploy RAG System

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest pytest-asyncio
    
    - name: Run tests
      run: pytest tests/ -v
    
    - name: Run RAG evaluation
      run: python scripts/evaluate_rag.py --dataset eval_data.json
      env:
        OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Build Docker image
      run: docker build -t rag-api:$GITHUB_SHA .
    
    - name: Push to registry
      run: |
        docker tag rag-api:$GITHUB_SHA myregistry/rag-api:$GITHUB_SHA
        docker push myregistry/rag-api:$GITHUB_SHA

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - name: Deploy to Kubernetes
      run: |
        kubectl set image deployment/rag-api \
          rag-api=myregistry/rag-api:$GITHUB_SHA

Document Ingestion Pipeline

from celery import Celery
import boto3

celery_app = Celery('rag_tasks', broker='redis://localhost:6379')

@celery_app.task
def process_document(document_id: str, s3_key: str):
    """Async document ingestion task."""
    
    try:
        # Download from S3
        s3 = boto3.client('s3')
        document = s3.get_object(Bucket='documents', Key=s3_key)
        content = document['Body'].read()
        
        # Extract text
        text = extract_text(content)
        
        # Chunk
        chunks = chunk_document(text)
        
        # Generate embeddings
        embeddings = embed_batch(chunks)
        
        # Store in vector DB
        vector_store.upsert(
            ids=[f"{document_id}_{i}" for i in range(len(chunks))],
            embeddings=embeddings,
            documents=chunks,
            metadata=[{"document_id": document_id, "chunk_idx": i} for i in range(len(chunks))]
        )
        
        # Update status
        db.update_document_status(document_id, "indexed")
        
    except Exception as e:
        db.update_document_status(document_id, "failed", error=str(e))
        raise

# API endpoint to trigger ingestion
@app.post("/v1/documents")
async def ingest_document(file: UploadFile):
    document_id = str(uuid.uuid4())
    
    # Upload to S3
    s3_key = f"uploads/{document_id}/{file.filename}"
    s3.upload_fileobj(file.file, 'documents', s3_key)
    
    # Queue processing
    process_document.delay(document_id, s3_key)
    
    return {"document_id": document_id, "status": "processing"}

Production Deployment Checklist

Pre-Deployment:
□ Run full evaluation suite on staging
□ Load test with expected traffic patterns
□ Security audit (API keys, access control, input validation)
□ Review error handling and fallbacks
□ Set up monitoring dashboards and alerts

Infrastructure:
□ Configure auto-scaling rules
□ Set up health checks and readiness probes
□ Configure load balancer and SSL
□ Set up Redis cluster for caching
□ Configure vector database replication

Observability:
□ Structured logging to central system
□ Prometheus metrics for latency, errors, tokens
□ Distributed tracing for request flow
□ Alerting for error rates and latency spikes

Operations:
□ Document runbook for common issues
□ Set up on-call rotation
□ Configure backup and disaster recovery
□ Plan for zero-downtime deployments

Key Takeaways

  • Use microservices for scalability and independent scaling
  • Implement circuit breakers for external dependencies
  • Use retry with exponential backoff for transient failures
  • Secure all endpoints with API keys and access control
  • Monitor everything - latency, errors, token usage, cache hits
  • Set up auto-scaling based on CPU and request volume
  • Use async processing for document ingestion
  • Plan for failure with graceful degradation and fallbacks

Congratulations! You've completed the RAG Systems course. You now have the knowledge to design, build, and deploy production-grade RAG systems.