Vector Databases & Embeddings

0 of 12 lessons completed

Production Deployment of Vector Databases

Deploying vector search to production requires careful consideration of scaling, performance, monitoring, and reliability. This lesson covers production-ready architectures.

Deployment Options

1. Managed Services (Recommended for Most)

ServiceProsBest For
PineconeZero ops, serverless, scales automaticallyQuick start, variable workloads
Weaviate CloudBuilt-in vectorization, multimodalFull-featured needs
Qdrant CloudHigh performance, good filteringPerformance critical
Zilliz Cloud (Milvus)Enterprise scale, GPU supportBillion-scale datasets

2. Self-Hosted on Kubernetes

# qdrant-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: qdrant
spec:
  replicas: 3
  selector:
    matchLabels:
      app: qdrant
  template:
    metadata:
      labels:
        app: qdrant
    spec:
      containers:
      - name: qdrant
        image: qdrant/qdrant:latest
        ports:
        - containerPort: 6333
        - containerPort: 6334
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
          limits:
            memory: "8Gi"
            cpu: "4"
        volumeMounts:
        - name: qdrant-storage
          mountPath: /qdrant/storage
      volumes:
      - name: qdrant-storage
        persistentVolumeClaim:
          claimName: qdrant-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: qdrant-service
spec:
  selector:
    app: qdrant
  ports:
  - name: rest
    port: 6333
    targetPort: 6333
  - name: grpc
    port: 6334
    targetPort: 6334
  type: LoadBalancer

Production Architecture

┌─────────────────────────────────────────────────────────────────┐
│                         Load Balancer                           │
└─────────────────────────────────────────────────────────────────┘
                              │
       ┌──────────────────────┼──────────────────────┐
       ▼                      ▼                      ▼
┌─────────────┐        ┌─────────────┐        ┌─────────────┐
│   API Pod   │        │   API Pod   │        │   API Pod   │
│  + Cache    │        │  + Cache    │        │  + Cache    │
└─────────────┘        └─────────────┘        └─────────────┘
       │                      │                      │
       └──────────────────────┼──────────────────────┘
                              ▼
              ┌───────────────────────────────┐
              │      Embedding Service        │
              │   (OpenAI / Local Models)     │
              └───────────────────────────────┘
                              │
       ┌──────────────────────┼──────────────────────┐
       ▼                      ▼                      ▼
┌─────────────┐        ┌─────────────┐        ┌─────────────┐
│  Qdrant     │        │  Qdrant     │        │  Qdrant     │
│  Node 1     │◄──────►│  Node 2     │◄──────►│  Node 3     │
└─────────────┘        └─────────────┘        └─────────────┘

Production API Service

# app/main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import redis.asyncio as redis
from openai import AsyncOpenAI
from qdrant_client import AsyncQdrantClient
import json
import hashlib
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global clients
qdrant: AsyncQdrantClient = None
openai_client: AsyncOpenAI = None
redis_client: redis.Redis = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global qdrant, openai_client, redis_client
    
    # Initialize connections
    qdrant = AsyncQdrantClient(
        host="qdrant-service",
        port=6333,
        timeout=30
    )
    openai_client = AsyncOpenAI()
    redis_client = redis.Redis(host="redis-service", port=6379)
    
    logger.info("Connections initialized")
    yield
    
    # Cleanup
    await qdrant.close()
    await redis_client.close()
    logger.info("Connections closed")

app = FastAPI(lifespan=lifespan)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# Caching layer
async def get_cached_embedding(text: str) -> list[float] | None:
    key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"
    cached = await redis_client.get(key)
    if cached:
        return json.loads(cached)
    return None

async def cache_embedding(text: str, embedding: list[float]):
    key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"
    await redis_client.setex(key, 3600, json.dumps(embedding))  # 1 hour TTL

async def embed(text: str) -> list[float]:
    # Check cache
    cached = await get_cached_embedding(text)
    if cached:
        return cached
    
    # Generate embedding
    response = await openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    embedding = response.data[0].embedding
    
    # Cache it
    await cache_embedding(text, embedding)
    return embedding

@app.get("/health")
async def health():
    # Check all dependencies
    try:
        await qdrant.get_collections()
        await redis_client.ping()
        return {"status": "healthy"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=str(e))

@app.post("/search")
async def search(query: str, collection: str = "documents", limit: int = 10):
    try:
        query_embedding = await embed(query)
        
        results = await qdrant.search(
            collection_name=collection,
            query_vector=query_embedding,
            limit=limit
        )
        
        return {
            "results": [
                {
                    "id": str(hit.id),
                    "score": hit.score,
                    "content": hit.payload.get("content"),
                    "metadata": {k: v for k, v in hit.payload.items() if k != "content"}
                }
                for hit in results
            ]
        }
    except Exception as e:
        logger.error(f"Search error: {e}")
        raise HTTPException(status_code=500, detail=str(e))

Monitoring and Observability

# Prometheus metrics
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
import time

# Metrics
SEARCH_REQUESTS = Counter(
    'search_requests_total',
    'Total search requests',
    ['collection', 'status']
)

SEARCH_LATENCY = Histogram(
    'search_latency_seconds',
    'Search request latency',
    ['collection']
)

EMBEDDING_LATENCY = Histogram(
    'embedding_latency_seconds',
    'Embedding generation latency'
)

CACHE_HITS = Counter(
    'embedding_cache_hits_total',
    'Embedding cache hits'
)

@app.get("/metrics")
async def metrics():
    return Response(
        generate_latest(),
        media_type="text/plain"
    )

# Instrumented search
@app.post("/search")
async def search(query: str, collection: str = "documents", limit: int = 10):
    start = time.time()
    
    try:
        # Embedding with metrics
        emb_start = time.time()
        query_embedding = await embed(query)
        EMBEDDING_LATENCY.observe(time.time() - emb_start)
        
        results = await qdrant.search(
            collection_name=collection,
            query_vector=query_embedding,
            limit=limit
        )
        
        SEARCH_REQUESTS.labels(collection=collection, status="success").inc()
        SEARCH_LATENCY.labels(collection=collection).observe(time.time() - start)
        
        return {"results": [...]}
        
    except Exception as e:
        SEARCH_REQUESTS.labels(collection=collection, status="error").inc()
        raise

Scaling Strategies

1. Horizontal Scaling (Sharding)

# Qdrant distributed mode
from qdrant_client.models import VectorParams, Distance

# Create sharded collection
client.create_collection(
    collection_name="large_collection",
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
    shard_number=6,  # Distribute across 6 shards
    replication_factor=2,  # Each shard has 2 replicas
    on_disk_payload=True  # Payload on disk to save RAM
)

2. Memory Optimization (Quantization)

from qdrant_client.models import (
    ScalarQuantization, 
    ScalarQuantizationConfig,
    QuantizationSearchParams
)

# Create collection with quantization
client.create_collection(
    collection_name="optimized_collection",
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
    quantization_config=ScalarQuantization(
        scalar=ScalarQuantizationConfig(
            type="int8",
            quantile=0.99,
            always_ram=True  # Keep quantized vectors in RAM
        )
    )
)

# Search with oversampling for accuracy
results = client.search(
    collection_name="optimized_collection",
    query_vector=query_embedding,
    limit=10,
    search_params=QuantizationSearchParams(
        oversampling=2.0,  # Fetch 2x candidates, then re-rank
        rescore=True  # Use original vectors for final ranking
    )
)

3. Index Optimization

from qdrant_client.models import HnswConfigDiff

# Tune HNSW parameters for your use case
client.update_collection(
    collection_name="documents",
    hnsw_config=HnswConfigDiff(
        m=16,  # Number of links (higher = more accuracy, more memory)
        ef_construct=200,  # Build-time accuracy (higher = slower build)
        full_scan_threshold=10000,  # Use brute force for small results
        on_disk=False  # Keep index in RAM for speed
    )
)

# Set search-time parameters
results = client.search(
    collection_name="documents",
    query_vector=query_embedding,
    limit=10,
    search_params=SearchParams(
        hnsw_ef=128  # Search-time accuracy (higher = slower, more accurate)
    )
)

Backup and Recovery

import subprocess
from datetime import datetime

def backup_collection(collection_name: str, backup_path: str):
    """Create snapshot of collection"""
    # Qdrant snapshots
    snapshot = client.create_snapshot(collection_name=collection_name)
    
    print(f"Snapshot created: {snapshot.name}")
    
    # Download snapshot
    snapshot_url = f"http://qdrant:6333/collections/{collection_name}/snapshots/{snapshot.name}"
    subprocess.run([
        "curl", "-o", f"{backup_path}/{collection_name}_{datetime.now().isoformat()}.snapshot",
        snapshot_url
    ])

def restore_collection(collection_name: str, snapshot_path: str):
    """Restore collection from snapshot"""
    client.recover_snapshot(
        collection_name=collection_name,
        location=snapshot_path
    )

# Scheduled backup
import schedule

def daily_backup():
    for collection in ["documents", "users", "products"]:
        backup_collection(collection, "/backups")

schedule.every().day.at("02:00").do(daily_backup)

Security Best Practices

# Qdrant with API key authentication
# config.yaml
service:
  api_key: ${QDRANT_API_KEY}
  enable_tls: true
  
storage:
  storage_path: /qdrant/storage
  
log_level: INFO
# Client with authentication
from qdrant_client import QdrantClient

client = QdrantClient(
    host="qdrant.example.com",
    port=6333,
    api_key="your-secret-api-key",
    https=True
)

# Rate limiting
from fastapi import Request
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/search")
@limiter.limit("100/minute")
async def search(request: Request, query: str):
    ...

Docker Compose for Development

# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - QDRANT_HOST=qdrant
      - REDIS_HOST=redis
    depends_on:
      - qdrant
      - redis

  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - qdrant_storage:/qdrant/storage
    environment:
      - QDRANT__SERVICE__API_KEY=${QDRANT_API_KEY}

  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

volumes:
  qdrant_storage:
  redis_data:

Checklist: Production Readiness

  • High Availability - Replication factor ≥ 2
  • Monitoring - Prometheus + Grafana dashboards
  • Alerting - Latency, error rate, disk usage
  • Backups - Automated daily snapshots
  • Security - API keys, TLS, network policies
  • Rate Limiting - Protect against abuse
  • Caching - Redis for embeddings and results
  • Capacity Planning - Know your memory needs
  • Disaster Recovery - Tested restore procedures

Key Takeaways

  • Managed services reduce operational burden significantly
  • Cache embeddings - they're expensive to compute
  • Quantization reduces memory 4x with minimal quality loss
  • Monitor latency - P99 matters more than average
  • Plan for growth - Sharding is hard to add later

Congratulations! You've completed the Vector Databases course. You now have the knowledge to build, deploy, and scale production vector search systems.