Deploying vector search to production requires careful consideration of scaling, performance, monitoring, and reliability. This lesson covers production-ready architectures.
| Service | Pros | Best For |
|---|---|---|
| Pinecone | Zero ops, serverless, scales automatically | Quick start, variable workloads |
| Weaviate Cloud | Built-in vectorization, multimodal | Full-featured needs |
| Qdrant Cloud | High performance, good filtering | Performance critical |
| Zilliz Cloud (Milvus) | Enterprise scale, GPU support | Billion-scale datasets |
# qdrant-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: qdrant
spec:
replicas: 3
selector:
matchLabels:
app: qdrant
template:
metadata:
labels:
app: qdrant
spec:
containers:
- name: qdrant
image: qdrant/qdrant:latest
ports:
- containerPort: 6333
- containerPort: 6334
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
volumeMounts:
- name: qdrant-storage
mountPath: /qdrant/storage
volumes:
- name: qdrant-storage
persistentVolumeClaim:
claimName: qdrant-pvc
---
apiVersion: v1
kind: Service
metadata:
name: qdrant-service
spec:
selector:
app: qdrant
ports:
- name: rest
port: 6333
targetPort: 6333
- name: grpc
port: 6334
targetPort: 6334
type: LoadBalancer┌─────────────────────────────────────────────────────────────────┐
│ Load Balancer │
└─────────────────────────────────────────────────────────────────┘
│
┌──────────────────────┼──────────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ API Pod │ │ API Pod │ │ API Pod │
│ + Cache │ │ + Cache │ │ + Cache │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└──────────────────────┼──────────────────────┘
▼
┌───────────────────────────────┐
│ Embedding Service │
│ (OpenAI / Local Models) │
└───────────────────────────────┘
│
┌──────────────────────┼──────────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Qdrant │ │ Qdrant │ │ Qdrant │
│ Node 1 │◄──────►│ Node 2 │◄──────►│ Node 3 │
└─────────────┘ └─────────────┘ └─────────────┘# app/main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import redis.asyncio as redis
from openai import AsyncOpenAI
from qdrant_client import AsyncQdrantClient
import json
import hashlib
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global clients
qdrant: AsyncQdrantClient = None
openai_client: AsyncOpenAI = None
redis_client: redis.Redis = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global qdrant, openai_client, redis_client
# Initialize connections
qdrant = AsyncQdrantClient(
host="qdrant-service",
port=6333,
timeout=30
)
openai_client = AsyncOpenAI()
redis_client = redis.Redis(host="redis-service", port=6379)
logger.info("Connections initialized")
yield
# Cleanup
await qdrant.close()
await redis_client.close()
logger.info("Connections closed")
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Caching layer
async def get_cached_embedding(text: str) -> list[float] | None:
key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"
cached = await redis_client.get(key)
if cached:
return json.loads(cached)
return None
async def cache_embedding(text: str, embedding: list[float]):
key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"
await redis_client.setex(key, 3600, json.dumps(embedding)) # 1 hour TTL
async def embed(text: str) -> list[float]:
# Check cache
cached = await get_cached_embedding(text)
if cached:
return cached
# Generate embedding
response = await openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
embedding = response.data[0].embedding
# Cache it
await cache_embedding(text, embedding)
return embedding
@app.get("/health")
async def health():
# Check all dependencies
try:
await qdrant.get_collections()
await redis_client.ping()
return {"status": "healthy"}
except Exception as e:
raise HTTPException(status_code=503, detail=str(e))
@app.post("/search")
async def search(query: str, collection: str = "documents", limit: int = 10):
try:
query_embedding = await embed(query)
results = await qdrant.search(
collection_name=collection,
query_vector=query_embedding,
limit=limit
)
return {
"results": [
{
"id": str(hit.id),
"score": hit.score,
"content": hit.payload.get("content"),
"metadata": {k: v for k, v in hit.payload.items() if k != "content"}
}
for hit in results
]
}
except Exception as e:
logger.error(f"Search error: {e}")
raise HTTPException(status_code=500, detail=str(e))# Prometheus metrics
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
import time
# Metrics
SEARCH_REQUESTS = Counter(
'search_requests_total',
'Total search requests',
['collection', 'status']
)
SEARCH_LATENCY = Histogram(
'search_latency_seconds',
'Search request latency',
['collection']
)
EMBEDDING_LATENCY = Histogram(
'embedding_latency_seconds',
'Embedding generation latency'
)
CACHE_HITS = Counter(
'embedding_cache_hits_total',
'Embedding cache hits'
)
@app.get("/metrics")
async def metrics():
return Response(
generate_latest(),
media_type="text/plain"
)
# Instrumented search
@app.post("/search")
async def search(query: str, collection: str = "documents", limit: int = 10):
start = time.time()
try:
# Embedding with metrics
emb_start = time.time()
query_embedding = await embed(query)
EMBEDDING_LATENCY.observe(time.time() - emb_start)
results = await qdrant.search(
collection_name=collection,
query_vector=query_embedding,
limit=limit
)
SEARCH_REQUESTS.labels(collection=collection, status="success").inc()
SEARCH_LATENCY.labels(collection=collection).observe(time.time() - start)
return {"results": [...]}
except Exception as e:
SEARCH_REQUESTS.labels(collection=collection, status="error").inc()
raise# Qdrant distributed mode
from qdrant_client.models import VectorParams, Distance
# Create sharded collection
client.create_collection(
collection_name="large_collection",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
shard_number=6, # Distribute across 6 shards
replication_factor=2, # Each shard has 2 replicas
on_disk_payload=True # Payload on disk to save RAM
)from qdrant_client.models import (
ScalarQuantization,
ScalarQuantizationConfig,
QuantizationSearchParams
)
# Create collection with quantization
client.create_collection(
collection_name="optimized_collection",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
quantization_config=ScalarQuantization(
scalar=ScalarQuantizationConfig(
type="int8",
quantile=0.99,
always_ram=True # Keep quantized vectors in RAM
)
)
)
# Search with oversampling for accuracy
results = client.search(
collection_name="optimized_collection",
query_vector=query_embedding,
limit=10,
search_params=QuantizationSearchParams(
oversampling=2.0, # Fetch 2x candidates, then re-rank
rescore=True # Use original vectors for final ranking
)
)from qdrant_client.models import HnswConfigDiff
# Tune HNSW parameters for your use case
client.update_collection(
collection_name="documents",
hnsw_config=HnswConfigDiff(
m=16, # Number of links (higher = more accuracy, more memory)
ef_construct=200, # Build-time accuracy (higher = slower build)
full_scan_threshold=10000, # Use brute force for small results
on_disk=False # Keep index in RAM for speed
)
)
# Set search-time parameters
results = client.search(
collection_name="documents",
query_vector=query_embedding,
limit=10,
search_params=SearchParams(
hnsw_ef=128 # Search-time accuracy (higher = slower, more accurate)
)
)import subprocess
from datetime import datetime
def backup_collection(collection_name: str, backup_path: str):
"""Create snapshot of collection"""
# Qdrant snapshots
snapshot = client.create_snapshot(collection_name=collection_name)
print(f"Snapshot created: {snapshot.name}")
# Download snapshot
snapshot_url = f"http://qdrant:6333/collections/{collection_name}/snapshots/{snapshot.name}"
subprocess.run([
"curl", "-o", f"{backup_path}/{collection_name}_{datetime.now().isoformat()}.snapshot",
snapshot_url
])
def restore_collection(collection_name: str, snapshot_path: str):
"""Restore collection from snapshot"""
client.recover_snapshot(
collection_name=collection_name,
location=snapshot_path
)
# Scheduled backup
import schedule
def daily_backup():
for collection in ["documents", "users", "products"]:
backup_collection(collection, "/backups")
schedule.every().day.at("02:00").do(daily_backup)# Qdrant with API key authentication
# config.yaml
service:
api_key: ${QDRANT_API_KEY}
enable_tls: true
storage:
storage_path: /qdrant/storage
log_level: INFO# Client with authentication
from qdrant_client import QdrantClient
client = QdrantClient(
host="qdrant.example.com",
port=6333,
api_key="your-secret-api-key",
https=True
)
# Rate limiting
from fastapi import Request
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/search")
@limiter.limit("100/minute")
async def search(request: Request, query: str):
...# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_HOST=qdrant
- REDIS_HOST=redis
depends_on:
- qdrant
- redis
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_storage:/qdrant/storage
environment:
- QDRANT__SERVICE__API_KEY=${QDRANT_API_KEY}
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
volumes:
qdrant_storage:
redis_data:Congratulations! You've completed the Vector Databases course. You now have the knowledge to build, deploy, and scale production vector search systems.