AdarshDRC's picture
fix : Lazy Loading issues
6c6ffa6
import asyncio
import os
import shutil
import uuid
import re
import time
import json
import traceback
import inflect
from datetime import datetime, timezone
from urllib.parse import urlparse
from typing import List
from contextlib import asynccontextmanager
from collections import OrderedDict
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
import cloudinary
import cloudinary.uploader
import cloudinary.api
from pinecone import Pinecone, ServerlessSpec
# ── loguru for pretty local console logs (optional dep) ──────────
try:
from loguru import logger as _loguru
_loguru.remove()
_loguru.add(
lambda msg: print(msg, end=""),
format="<green>{time:HH:mm:ss}</green> | <level>{level:<8}</level> | {message}",
level="DEBUG", colorize=True,
)
_log_fn = _loguru.log
except ImportError:
import logging as _logging
_logging.basicConfig(level=_logging.INFO)
_stdlib = _logging.getLogger("el")
def _log_fn(level, msg): _stdlib.log(getattr(_logging, level, 20), msg)
# ── Deferred imports ─────────────────────────────────────────────
ai = None
p = inflect.engine()
MAX_CONCURRENT_INFERENCES = int(os.getenv("MAX_CONCURRENT_INFERENCES", "1")) # InsightFace ONNX is NOT thread-safe
_inference_sem: asyncio.Semaphore
_pinecone_pool = OrderedDict()
_POOL_MAX = 64
IDX_FACES = "enterprise-faces"
MAX_FILES_PER_UPLOAD = 20 # cap to prevent memory corruption on large batches
IDX_OBJECTS = "enterprise-objects"
# ── V4 index dimensions ───────────────────────────────────────────
# enterprise-faces : 1024-D (ArcFace 512 + AdaFace 512, fused)
# enterprise-objects: 1536-D (SigLIP 768 + DINOv2 768, fused)
# ⚠️ If upgrading from V3 (512-D faces), you MUST reset the
# enterprise-faces index via Settings → Danger Zone → Reset DB
IDX_FACES_DIM = int(os.getenv("IDX_FACES_DIM", "1024"))
IDX_OBJECTS_DIM = int(os.getenv("IDX_OBJECTS_DIM", "1536"))
# V4 face search thresholds
# Cosine similarity thresholds for the fused 1024-D ArcFace+AdaFace space
FACE_THRESHOLD_HIGH = 0.40 # high-quality faces (det_score ≥ 0.85)
FACE_THRESHOLD_LOW = 0.32 # lower-quality faces (det_score < 0.85)
FACE_TOP_K_FETCH = 50 # fetch more candidates, filter after merge
# ════════════════════════════════════════════════════════════════
# SUPABASE LOGGING — async, fire-and-forget, never crashes API
# HF Space Secrets needed:
# SUPABASE_URL → https://xxxx.supabase.co
# SUPABASE_SERVICE_KEY → your Supabase service_role key (not anon!)
# ════════════════════════════════════════════════════════════════
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY", "")
async def _supabase_log_push(level: str, event: str, data: dict):
"""Fire-and-forget insert into Supabase app_logs table."""
if not (SUPABASE_URL and SUPABASE_SERVICE_KEY):
return
try:
import aiohttp
row = {
"level": level.upper(),
"event": event,
"user_id": str(data.get("user_id", "anonymous")),
"ip": str(data.get("ip", "")),
"mode": str(data.get("mode", "")),
"page": str(data.get("page", "")),
"duration_ms": int(data["duration_ms"]) if "duration_ms" in data else None,
"error": str(data["error"]) if "error" in data else None,
"data": data,
}
headers = {
"Content-Type": "application/json",
"apikey": SUPABASE_SERVICE_KEY,
"Authorization": f"Bearer {SUPABASE_SERVICE_KEY}",
"Prefer": "return=minimal",
}
async with aiohttp.ClientSession() as s:
async with s.post(
f"{SUPABASE_URL}/rest/v1/app_logs",
json=row, headers=headers,
timeout=aiohttp.ClientTimeout(total=5)
) as r:
if r.status not in (200, 201):
body = await r.text()
_log_fn("WARNING", f"Supabase log insert failed {r.status}: {body[:200]}")
except Exception as exc:
_log_fn("DEBUG", f"Supabase log push skipped: {exc}")
def log(level: str, event: str, **data):
"""
Log to console + Supabase app_logs table (background task).
Usage: log("INFO", "upload.complete", user_id="x", files=3, duration_ms=340)
"""
clean = {k: v for k, v in data.items()}
_log_fn(level.upper(), f"[{event}] {json.dumps(clean, default=str)}")
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(_supabase_log_push(level, event, data))
except Exception:
pass
# ════════════════════════════════════════════════════════════════
# HELPERS
# ════════════════════════════════════════════════════════════════
def get_ip(request: Request) -> str:
xff = request.headers.get("X-Forwarded-For", "")
return xff.split(",")[0].strip() if xff else getattr(request.client, "host", "unknown")
def is_guest(key: str) -> bool:
default = os.getenv("DEFAULT_PINECONE_KEY", "")
return bool(default) and key.strip() == default.strip()
def _get_pinecone(api_key: str) -> Pinecone:
if api_key not in _pinecone_pool:
if len(_pinecone_pool) >= _POOL_MAX:
_pinecone_pool.popitem(last=False)
_pinecone_pool[api_key] = Pinecone(api_key=api_key)
_pinecone_pool.move_to_end(api_key)
return _pinecone_pool[api_key]
def _cld_upload(tmp_path, folder, creds):
return cloudinary.uploader.upload(tmp_path, folder=folder,
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
def _cld_ping(creds):
return cloudinary.api.ping(
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
def _cld_root_folders(creds):
return cloudinary.api.root_folders(
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
def get_cloudinary_creds(env_url: str) -> dict:
if not env_url: return {}
parsed = urlparse(env_url)
return {"api_key": parsed.username, "api_secret": parsed.password, "cloud_name": parsed.hostname}
def standardize_category_name(name: str) -> str:
clean = re.sub(r'\s+', '_', name.strip().lower())
clean = re.sub(r'[^\w]', '', clean)
return p.singular_noun(clean) or clean
def sanitize_filename(filename: str) -> str:
return re.sub(r'[^\w.\-]', '', re.sub(r'\s+', '_', filename))
DEFAULT_PC_KEY = os.getenv("DEFAULT_PINECONE_KEY", "")
DEFAULT_CLD_URL = os.getenv("DEFAULT_CLOUDINARY_URL", "")
def _is_default_key(key: str, default: str) -> bool:
return bool(default) and key.strip() == default.strip()
# ════════════════════════════════════════════════════════════════
# APP STARTUP / SHUTDOWN
# ════════════════════════════════════════════════════════════════
@asynccontextmanager
async def lifespan(app: FastAPI):
global ai, _inference_sem
from src.models import AIModelManager
log("INFO", "server.startup", message="Loading AI models...")
loop = asyncio.get_event_loop()
ai = await loop.run_in_executor(None, AIModelManager)
_inference_sem = asyncio.Semaphore(MAX_CONCURRENT_INFERENCES)
log("INFO", "server.ready", message="All models loaded. API ready.")
yield
log("INFO", "server.shutdown", message="API shutting down.")
app = FastAPI(lifespan=lifespan)
app.add_middleware(CORSMiddleware,
allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"])
os.makedirs("temp_uploads", exist_ok=True)
# ════════════════════════════════════════════════════════════════
# FRONTEND EVENT LOG — React calls this for client-side events
# Logs: page visits, tab switches, mode toggles, search/upload
# initiated, settings changes, errors caught in UI
# ════════════════════════════════════════════════════════════════
@app.post("/api/log")
async def frontend_log(
request: Request,
event: str = Form(...), # e.g. "page.visit", "search.initiated"
user_id: str = Form(""),
page: str = Form(""),
metadata: str = Form("{}"), # JSON string with extra context
):
ip = get_ip(request)
try:
meta = json.loads(metadata) if metadata else {}
except Exception:
meta = {}
log("INFO", f"frontend.{event}",
user_id = user_id or "anonymous",
page = page,
ip = ip,
ua = request.headers.get("User-Agent", "")[:120],
**meta,
)
return {"ok": True}
# ════════════════════════════════════════════════════════════════
# 1. VERIFY KEYS & AUTO-BUILD INDEXES
# ════════════════════════════════════════════════════════════════
@app.post("/api/verify-keys")
async def verify_keys(
request: Request,
pinecone_key: str = Form(""),
cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
mode = "guest" if is_guest(pinecone_key) else "personal"
start = time.perf_counter()
log("INFO", "settings.verify_keys.start",
user_id=user_id or "anonymous", mode=mode, ip=ip)
if cloudinary_url:
try:
creds_v = get_cloudinary_creds(cloudinary_url)
if not creds_v.get("cloud_name"): raise ValueError("bad url")
await asyncio.to_thread(_cld_ping, creds_v)
except HTTPException: raise
except Exception as e:
log("ERROR", "settings.verify_keys.cloudinary_fail",
user_id=user_id or "anonymous", ip=ip, error=str(e),
duration_ms=round((time.perf_counter()-start)*1000))
raise HTTPException(400, "Invalid Cloudinary Environment URL.")
indexes_created = []
if pinecone_key:
try:
pc = _get_pinecone(pinecone_key)
existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)}
tasks = []
if IDX_OBJECTS not in existing:
tasks.append(asyncio.to_thread(pc.create_index, name=IDX_OBJECTS,
dimension=IDX_OBJECTS_DIM, # 1536-D SigLIP+DINOv2
metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1")))
indexes_created.append(IDX_OBJECTS)
if IDX_FACES not in existing:
tasks.append(asyncio.to_thread(pc.create_index, name=IDX_FACES,
dimension=IDX_FACES_DIM, # 1024-D ArcFace+AdaFace (V4)
metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1")))
indexes_created.append(IDX_FACES)
if tasks: await asyncio.gather(*tasks)
except Exception as e:
err = str(e)
clean = ("Invalid Pinecone API Key. Please check your key and try again."
if "401" in err or "unauthorized" in err.lower()
else f"Pinecone Error: {err}")
log("ERROR", "settings.verify_keys.pinecone_fail",
user_id=user_id or "anonymous", ip=ip, error=clean,
duration_ms=round((time.perf_counter()-start)*1000))
raise HTTPException(400, clean)
log("INFO", "settings.verify_keys.success",
user_id=user_id or "anonymous", mode=mode, ip=ip,
indexes_created=indexes_created,
duration_ms=round((time.perf_counter()-start)*1000))
return {"message": "Keys verified and indexes ready!"}
# ════════════════════════════════════════════════════════════════
# 2. UPLOAD
# ════════════════════════════════════════════════════════════════
@app.post("/api/upload")
async def upload_new_images(
request: Request,
files: List[UploadFile] = File(...),
folder_name: str = Form(...),
detect_faces: bool = Form(True),
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
start = time.perf_counter()
actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "")
actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
mode = "guest" if is_guest(actual_pc_key) else "personal"
log("INFO", "upload.start",
user_id=user_id or "anonymous", ip=ip, mode=mode,
folder=folder_name, file_count=len(files),
file_names=[f.filename for f in files][:10],
detect_faces=detect_faces)
if not actual_pc_key or not actual_cld_url:
log("ERROR", "upload.missing_keys", user_id=user_id or "anonymous", ip=ip, mode=mode)
raise HTTPException(400, "API Keys are missing. If you are a guest, the server is missing its DEFAULT_ secrets in Hugging Face.")
if len(files) > MAX_FILES_PER_UPLOAD:
raise HTTPException(400, f"Maximum {MAX_FILES_PER_UPLOAD} files per upload. Please split into smaller batches.")
folder = standardize_category_name(folder_name)
creds = get_cloudinary_creds(actual_cld_url)
if not creds.get("cloud_name"):
log("ERROR", "upload.bad_cloudinary_url", user_id=user_id or "anonymous", ip=ip)
raise HTTPException(400, "Invalid Cloudinary URL format.")
pc = _get_pinecone(actual_pc_key)
idx_obj = pc.Index(IDX_OBJECTS)
idx_face = pc.Index(IDX_FACES)
uploaded_urls = []
face_vec_total = 0
object_vec_total = 0
for file in files:
tmp_path = f"temp_uploads/{uuid.uuid4().hex}_{sanitize_filename(file.filename)}"
file_start = time.perf_counter()
try:
with open(tmp_path, "wb") as buf:
shutil.copyfileobj(file.file, buf)
res = await asyncio.to_thread(_cld_upload, tmp_path, folder, creds)
image_url = res["secure_url"]
uploaded_urls.append(image_url)
async with _inference_sem:
vectors = await ai.process_image_async(tmp_path, is_query=False, detect_faces=detect_faces)
face_upserts = []
object_upserts = []
for v in vectors:
vec_list = v["vector"].tolist() if hasattr(v["vector"], "tolist") else v["vector"]
if v["type"] == "face":
# ── FACE STORE: ArcFace+AdaFace 1024-D fused embedding
# V4: includes face_quality + face_width_px for retrieval scoring
face_upserts.append({
"id": str(uuid.uuid4()),
"values": vec_list,
"metadata": {
"image_url": image_url,
"url": image_url,
"folder": folder,
"face_idx": v.get("face_idx", 0),
"bbox": str(v.get("bbox", [])),
"face_crop": v.get("face_crop", ""),
"det_score": v.get("det_score", 1.0),
"face_quality": v.get("face_quality", v.get("det_score", 1.0)),
"face_width_px": v.get("face_width_px", 0),
}
})
else:
# ── OBJECT STORE: SigLIP+DINOv2 1536-D fused embedding
object_upserts.append({
"id": str(uuid.uuid4()),
"values": vec_list,
"metadata": {
"image_url": image_url,
"url": image_url,
"folder": folder,
}
})
# Always upsert to BOTH indexes:
# - face index gets face embeddings (if any faces detected)
# - object index ALWAYS gets full image embedding
face_vec_total += len(face_upserts)
object_vec_total += len(object_upserts)
upsert_tasks = []
if face_upserts:
upsert_tasks.append(asyncio.to_thread(idx_face.upsert, vectors=face_upserts))
if object_upserts:
upsert_tasks.append(asyncio.to_thread(idx_obj.upsert, vectors=object_upserts))
if upsert_tasks:
await asyncio.gather(*upsert_tasks)
log("INFO", "upload.file.success",
user_id=user_id or "anonymous", ip=ip, mode=mode,
filename=file.filename, folder=folder, image_url=image_url,
face_vectors=len(face_upserts), obj_vectors=len(object_upserts),
detect_faces=detect_faces,
duration_ms=round((time.perf_counter()-file_start)*1000))
except Exception as e:
log("ERROR", "upload.file.error",
user_id=user_id or "anonymous", ip=ip, mode=mode,
filename=file.filename, folder=folder, error=str(e),
traceback=traceback.format_exc()[-800:],
duration_ms=round((time.perf_counter()-file_start)*1000))
err = str(e)
if "not found" in err.lower() or "404" in err:
raise HTTPException(404, "Pinecone index not found. Please go to Settings and click 'Verify & Save' to recreate your indexes.")
raise HTTPException(500, f"Upload processing failed: {err}")
finally:
if os.path.exists(tmp_path): os.remove(tmp_path)
log("INFO", "upload.complete",
user_id=user_id or "anonymous", ip=ip, mode=mode,
folder=folder, files_uploaded=len(uploaded_urls),
face_vectors=face_vec_total, object_vectors=object_vec_total,
detect_faces=detect_faces,
duration_ms=round((time.perf_counter()-start)*1000))
return {"message": "Done!", "urls": uploaded_urls}
# ════════════════════════════════════════════════════════════════
# 3. SEARCH
# ════════════════════════════════════════════════════════════════
@app.post("/api/search")
async def search_database(
request: Request,
file: UploadFile = File(...),
detect_faces: bool = Form(True),
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
start = time.perf_counter()
actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "")
mode = "guest" if is_guest(actual_pc_key) else "personal"
log("INFO", "search.start",
user_id=user_id or "anonymous", ip=ip, mode=mode,
filename=file.filename, detect_faces=detect_faces)
if not actual_pc_key:
log("ERROR", "search.missing_keys", user_id=user_id or "anonymous", ip=ip, mode=mode)
raise HTTPException(400, "Pinecone Key is missing.")
tmp_path = f"temp_uploads/query_{uuid.uuid4().hex}_{sanitize_filename(file.filename)}"
try:
with open(tmp_path, "wb") as buf:
shutil.copyfileobj(file.file, buf)
async with _inference_sem:
vectors = await ai.process_image_async(tmp_path, is_query=True, detect_faces=detect_faces)
inference_ms = round((time.perf_counter() - start) * 1000)
lanes_used = list({v["type"] for v in vectors})
log("INFO", "search.inference_done",
user_id=user_id or "anonymous", ip=ip, mode=mode,
vector_count=len(vectors), lanes=lanes_used, inference_ms=inference_ms)
pc = _get_pinecone(actual_pc_key)
idx_obj = pc.Index(IDX_OBJECTS)
idx_face = pc.Index(IDX_FACES)
# ── V4: split vectors by type ────────────────────────────
face_vectors = [v for v in vectors if v["type"] == "face"]
object_vectors = [v for v in vectors if v["type"] == "object"]
# ════════════════════════════════════════════════════════
# OBJECT MODE helper
# Used when no faces detected or face search disabled.
# ════════════════════════════════════════════════════════
async def _query_object_one(vec_dict: dict):
vec_list = (vec_dict["vector"].tolist()
if hasattr(vec_dict["vector"], "tolist")
else vec_dict["vector"])
try:
res = await asyncio.to_thread(
idx_obj.query, vector=vec_list, top_k=10, include_metadata=True)
except Exception as e:
if "404" in str(e):
raise HTTPException(404, "Pinecone Index not found. Go to Settings → Verify & Save.")
raise e
out = []
for match in res.get("matches", []):
if match["score"] < 0.45:
continue
out.append({
"url": match["metadata"].get("url") or match["metadata"].get("image_url", ""),
"score": round(match["score"], 4),
"caption": match["metadata"].get("folder", "🎯 Visual Match"),
})
return out
if detect_faces and face_vectors:
# ════════════════════════════════════════════════════
# V4 FACE MODE — Multi-face merge retrieval
#
# For a group photo with N detected faces:
# 1. Query enterprise-faces for EACH face (top_k=50)
# 2. Build a global image_url → match_data map
# • An image is included if ANY face matches
# • Score = highest matching face score for that image
# • Track WHICH face indices matched each image
# 3. Group results PER query face (for UI display)
# 4. Also build a "cross-face" flat list:
# images that matched multiple faces rank higher
#
# Threshold logic:
# High-quality face (det_score ≥ 0.85) → threshold 0.40
# Lower-quality face → threshold 0.32
# (Fused 1024-D space has different cosine distribution
# than raw ArcFace 512-D — thresholds adjusted accordingly)
# ════════════════════════════════════════════════════
async def _query_single_face(face_vec: dict) -> dict:
"""
Query enterprise-faces for one detected face.
Returns per-face result group for UI + raw match map.
"""
vec_list = (face_vec["vector"].tolist()
if hasattr(face_vec["vector"], "tolist")
else face_vec["vector"])
# Adaptive threshold: high-quality → stricter
det_score = face_vec.get("det_score", 1.0)
threshold = FACE_THRESHOLD_HIGH if det_score >= 0.85 else FACE_THRESHOLD_LOW
try:
face_res = await asyncio.to_thread(
idx_face.query,
vector=vec_list,
top_k=FACE_TOP_K_FETCH,
include_metadata=True,
)
except Exception as e:
if "404" in str(e):
raise HTTPException(404, "Pinecone index not found. Go to Settings → Verify & Save.")
raise e
# Collect matches — keep BEST score per image_url
# (multiple face vectors stored per image during upload,
# we only want the best matching one per image)
image_map = {} # image_url → best match data
for match in face_res.get("matches", []):
raw = match["score"]
if raw < threshold:
continue
url = (match["metadata"].get("url") or
match["metadata"].get("image_url", ""))
if not url:
continue
if url not in image_map or raw > image_map[url]["raw_score"]:
image_map[url] = {
"raw_score": raw,
"face_crop": match["metadata"].get("face_crop", ""),
"folder": match["metadata"].get("folder", ""),
"face_quality": match["metadata"].get("face_quality", 1.0),
"face_width_px": match["metadata"].get("face_width_px", 0),
}
# Remap raw cosine → UI score (75%–99%)
# Range is now 0.32–1.0 (wider than old 0.35–1.0)
def _ui_score(raw: float) -> float:
lo, hi = FACE_THRESHOLD_LOW, 1.0
return round(min(0.99, 0.75 + ((raw - lo) / (hi - lo)) * 0.24), 4)
matches = [
{
"url": url,
"score": _ui_score(d["raw_score"]),
"raw_score": round(d["raw_score"], 4),
"face_crop": d["face_crop"],
"folder": d["folder"],
"caption": "👤 Verified Identity",
}
for url, d in image_map.items()
]
matches = sorted(matches, key=lambda x: x["score"], reverse=True)[:15]
return {
"query_face_idx": face_vec.get("face_idx", 0),
"query_face_crop": face_vec.get("face_crop", ""),
"det_score": det_score,
"face_width_px": face_vec.get("face_width_px", 0),
"matches": matches,
"_image_map": image_map, # used for cross-face merge below
}
# Query all faces in parallel
raw_groups = await asyncio.gather(
*[_query_single_face(fv) for fv in face_vectors])
# ── Cross-face merge ────────────────────────────────
# Build a global image → {best_score, matched_face_indices}
# An image appearing for multiple faces gets a boost:
# final_score = best_face_score * (1 + 0.05 * extra_face_count)
# This makes images with multiple searched people rank higher.
global_image_map = {} # url → {score, matched_faces, face_crop, folder}
for gi, group in enumerate(raw_groups):
for url, d in group["_image_map"].items():
raw = d["raw_score"]
if url not in global_image_map:
global_image_map[url] = {
"raw_score": raw,
"face_crop": d["face_crop"],
"folder": d["folder"],
"matched_faces": [gi],
}
else:
existing = global_image_map[url]
existing["matched_faces"].append(gi)
if raw > existing["raw_score"]:
existing["raw_score"] = raw
existing["face_crop"] = d["face_crop"]
# Apply multi-face boost and build flat merged list
def _boosted_ui_score(raw: float, n_faces: int) -> float:
lo = FACE_THRESHOLD_LOW
base = 0.75 + ((raw - lo) / (1.0 - lo)) * 0.24
boosted = base * (1.0 + 0.05 * (n_faces - 1))
return round(min(0.99, boosted), 4)
merged_results = []
for url, d in global_image_map.items():
n = len(d["matched_faces"])
merged_results.append({
"url": url,
"score": _boosted_ui_score(d["raw_score"], n),
"raw_score": round(d["raw_score"], 4),
"face_crop": d["face_crop"],
"folder": d["folder"],
"matched_faces": d["matched_faces"],
"caption": (f"👥 {n} faces matched" if n > 1
else "👤 Verified Identity"),
})
merged_results = sorted(
merged_results, key=lambda x: x["score"], reverse=True)[:20]
# Clean per-group results (remove internal _image_map)
face_groups = []
for g in raw_groups:
clean = {k: v for k, v in g.items() if k != "_image_map"}
if clean["matches"]:
face_groups.append(clean)
duration_ms = round((time.perf_counter() - start) * 1000)
total_matches = len(merged_results)
log("INFO", "search.complete",
user_id=user_id or "anonymous", ip=ip, mode=mode,
lanes=["face"], detect_faces=detect_faces,
face_groups=len(face_groups),
merged_results=total_matches,
top_score=merged_results[0]["score"] if merged_results else 0,
duration_ms=duration_ms)
return {
"mode": "face",
"face_groups": face_groups, # per-face results for UI tabs
"results": merged_results, # V4: flat merged cross-face list
}
else:
# ════════════════════════════════════════════════════
# OBJECT MODE — flat ranked results from object index
# ════════════════════════════════════════════════════
nested = await asyncio.gather(
*[_query_object_one(v) for v in vectors])
all_results = [r for sub in nested for r in sub]
seen = {}
for r in all_results:
url = r["url"]
if url not in seen or r["score"] > seen[url]["score"]:
seen[url] = r
final = sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:10]
duration_ms = round((time.perf_counter() - start) * 1000)
log("INFO", "search.complete",
user_id=user_id or "anonymous", ip=ip, mode=mode,
lanes=lanes_used, detect_faces=detect_faces,
results_count=len(final),
top_score=final[0]["score"] if final else 0,
duration_ms=duration_ms)
return {"mode": "object", "results": final, "face_groups": []}
except HTTPException: raise
except Exception as e:
log("ERROR", "search.error",
user_id=user_id or "anonymous", ip=ip, mode=mode,
error=str(e), traceback=traceback.format_exc()[-800:],
duration_ms=round((time.perf_counter()-start)*1000))
raise HTTPException(500, str(e))
finally:
if os.path.exists(tmp_path): os.remove(tmp_path)
# ════════════════════════════════════════════════════════════════
# 4. CATEGORIES
# ════════════════════════════════════════════════════════════════
@app.post("/api/categories")
async def get_categories(
request: Request,
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
actual_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
if not actual_url: return {"categories": []}
try:
creds = get_cloudinary_creds(actual_url)
if not creds.get("cloud_name"): return {"categories": []}
result = await asyncio.to_thread(_cld_root_folders, creds)
categories = [f["name"] for f in result.get("folders", [])]
log("INFO", "categories.fetched",
user_id=user_id or "anonymous", ip=ip, category_count=len(categories))
return {"categories": categories}
except Exception as e:
log("ERROR", "categories.error", user_id=user_id or "anonymous", ip=ip, error=str(e))
return {"categories": []}
@app.get("/")
async def root():
return {"status": "ok"}
@app.get("/api/health")
async def health():
return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()}
# ════════════════════════════════════════════════════════════════
# 5. LIST FOLDER IMAGES
# ════════════════════════════════════════════════════════════════
def _cld_list_folder_images(folder: str, creds: dict, next_cursor: str = None, max_results: int = 100):
kwargs = dict(type="upload", prefix=f"{folder}/", max_results=max_results,
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
if next_cursor: kwargs["next_cursor"] = next_cursor
return cloudinary.api.resources(**kwargs)
def _cld_thumb_url(secure_url: str, cloud_name: str) -> str:
"""
Convert full-resolution Cloudinary URL to a small thumbnail URL.
Inserts Cloudinary transformation: 400×400 fill, auto quality, auto format.
Example:
https://res.cloudinary.com/demo/image/upload/v123/folder/img.jpg
https://res.cloudinary.com/demo/image/upload/w_400,h_400,c_fill,q_auto,f_auto/v123/folder/img.jpg
"""
try:
marker = f"/image/upload/"
idx = secure_url.find(marker)
if idx == -1:
return secure_url # can't transform — return original
base = secure_url[:idx + len(marker)]
rest = secure_url[idx + len(marker):]
# Skip existing transformation block if present
if rest.startswith("w_") or rest.startswith("h_") or rest.startswith("c_") or rest.startswith("q_"):
return secure_url # already transformed
return f"{base}w_400,h_400,c_fill,q_auto,f_auto/{rest}"
except Exception:
return secure_url
@app.post("/api/cloudinary/folder-images")
async def list_folder_images(
request: Request,
user_cloudinary_url: str = Form(""),
folder_name: str = Form(...),
user_id: str = Form(""),
next_cursor: str = Form(""),
page_size: int = Form(100),
):
ip = get_ip(request)
actual_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
creds = get_cloudinary_creds(actual_url)
if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.")
# Single page fetch — frontend drives pagination via next_cursor
result = await asyncio.to_thread(
_cld_list_folder_images, folder_name, creds,
next_cursor or None, min(page_size, 100)
)
images = []
for r in result.get("resources", []):
full_url = r["secure_url"]
thumb_url = _cld_thumb_url(full_url, creds["cloud_name"])
images.append({
"url": full_url, # full-res for lightbox / download
"thumb_url": thumb_url, # 400×400 thumbnail for grid display
"public_id": r["public_id"],
})
response_cursor = result.get("next_cursor") or ""
log("INFO", "explorer.folder_opened",
user_id=user_id or "anonymous", ip=ip,
folder_name=folder_name, image_count=len(images),
has_more=bool(response_cursor))
return {"images": images, "count": len(images), "next_cursor": response_cursor}
# ════════════════════════════════════════════════════════════════
# 6. DELETE SINGLE IMAGE
# ════════════════════════════════════════════════════════════════
def url_to_public_id(image_url: str, cloud_name: str) -> str:
try:
path = urlparse(image_url).path
parts = path.split("/")
upload_idx = parts.index("upload")
after = parts[upload_idx + 1:]
if after and after[0].startswith("v") and after[0][1:].isdigit(): after = after[1:]
return "/".join(after).rsplit(".", 1)[0]
except Exception: return ""
def _cld_delete_resource(public_id: str, creds: dict):
return cloudinary.uploader.destroy(public_id,
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
@app.post("/api/delete-image")
async def delete_image(
request: Request,
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
image_url: str = Form(""),
public_id: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "")
actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
creds = get_cloudinary_creds(actual_cld_url)
if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.")
pid = public_id or url_to_public_id(image_url, creds["cloud_name"])
if not pid: raise HTTPException(400, "Could not determine public_id.")
await asyncio.to_thread(_cld_delete_resource, pid, creds)
if actual_pc_key and image_url:
try:
pc = _get_pinecone(actual_pc_key)
for idx_name in [IDX_OBJECTS, IDX_FACES]:
await asyncio.to_thread(pc.Index(idx_name).delete, filter={"url": {"$eq": image_url}})
except Exception as e:
_log_fn("WARNING", f"Pinecone delete warning: {e}")
log("INFO", "explorer.image_deleted",
user_id=user_id or "anonymous", ip=ip,
image_url=image_url, public_id=pid)
return {"message": "Image deleted successfully."}
# ════════════════════════════════════════════════════════════════
# 7. DELETE ENTIRE FOLDER
# ════════════════════════════════════════════════════════════════
def _cld_delete_folder(folder: str, creds: dict):
return cloudinary.api.delete_resources_by_prefix(f"{folder}/",
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
def _cld_remove_folder(folder: str, creds: dict):
try:
return cloudinary.api.delete_folder(folder,
api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"])
except Exception: pass
def _cld_delete_all_paginated(creds: dict):
"""Delete ALL Cloudinary resources in batches of 100 until none left."""
deleted = 0
while True:
try:
res = cloudinary.api.resources(
type="upload", max_results=100,
api_key=creds["api_key"], api_secret=creds["api_secret"],
cloud_name=creds["cloud_name"],
)
resources = res.get("resources", [])
if not resources:
break
public_ids = [r["public_id"] for r in resources]
cloudinary.api.delete_resources(
public_ids,
api_key=creds["api_key"], api_secret=creds["api_secret"],
cloud_name=creds["cloud_name"],
)
deleted += len(public_ids)
print(f"🗑️ Deleted {deleted} resources so far...")
if not res.get("next_cursor"):
break
except Exception as e:
print(f"Cloudinary batch delete error: {e}")
break
return deleted
@app.post("/api/delete-folder")
async def delete_folder(
request: Request,
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
folder_name: str = Form(...),
user_id: str = Form(""),
):
ip = get_ip(request)
actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "")
actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "")
creds = get_cloudinary_creds(actual_cld_url)
if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.")
all_images, next_cursor = [], None
while True:
result = await asyncio.to_thread(_cld_list_folder_images, folder_name, creds, next_cursor)
all_images.extend(result.get("resources", []))
next_cursor = result.get("next_cursor")
if not next_cursor: break
await asyncio.to_thread(_cld_delete_folder, folder_name, creds)
await asyncio.to_thread(_cld_remove_folder, folder_name, creds)
if actual_pc_key:
try:
pc = _get_pinecone(actual_pc_key)
for idx_name in [IDX_OBJECTS, IDX_FACES]:
idx = pc.Index(idx_name)
try:
await asyncio.to_thread(idx.delete, filter={"folder": {"$eq": folder_name}})
except Exception:
for img in all_images:
try:
if img.get("secure_url"):
await asyncio.to_thread(idx.delete, filter={"url": {"$eq": img["secure_url"]}})
except Exception: pass
except Exception as e:
_log_fn("WARNING", f"Pinecone folder delete warning: {e}")
log("INFO", "explorer.folder_deleted",
user_id=user_id or "anonymous", ip=ip,
folder_name=folder_name, deleted_count=len(all_images))
return {"message": f"Folder '{folder_name}' and all its contents deleted.", "deleted_count": len(all_images)}
# ════════════════════════════════════════════════════════════════
# 8. RESET DATABASE ⚠️ DESTRUCTIVE — triple-logged
# ════════════════════════════════════════════════════════════════
@app.post("/api/reset-database")
async def reset_database(
request: Request,
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
start = time.perf_counter()
log("WARNING", "danger.reset_database.attempt",
user_id=user_id or "anonymous", ip=ip)
if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL):
log("WARNING", "danger.reset_database.blocked_shared_db",
user_id=user_id or "anonymous", ip=ip)
raise HTTPException(403, "Reset is not allowed on the shared demo database.")
creds = get_cloudinary_creds(user_cloudinary_url)
if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.")
# ── Cloudinary: paginated delete ALL resources then folders ────
try:
deleted = await asyncio.to_thread(_cld_delete_all_paginated, creds)
_log_fn("INFO", f"Cloudinary: deleted {deleted} resources")
except Exception as e:
_log_fn("WARNING", f"Cloudinary wipe: {e}")
try:
folders_res = await asyncio.to_thread(_cld_root_folders, creds)
# Delete all folders in parallel
folder_tasks = [
asyncio.to_thread(_cld_remove_folder, f["name"], creds)
for f in folders_res.get("folders", [])
]
if folder_tasks:
await asyncio.gather(*folder_tasks, return_exceptions=True)
except Exception as e:
_log_fn("WARNING", f"Cloudinary folder cleanup: {e}")
# ── Pinecone: delete both indexes + recreate ─────────────────
try:
pc = _get_pinecone(user_pinecone_key)
existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)}
tasks = []
if IDX_OBJECTS in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS))
if IDX_FACES in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES))
if tasks: await asyncio.gather(*tasks)
await asyncio.sleep(3) # wait for Pinecone to fully delete
await asyncio.gather(
asyncio.to_thread(pc.create_index, name=IDX_OBJECTS,
dimension=IDX_OBJECTS_DIM, metric="cosine", # 1536-D
spec=ServerlessSpec(cloud="aws", region="us-east-1")),
asyncio.to_thread(pc.create_index, name=IDX_FACES,
dimension=IDX_FACES_DIM, metric="cosine", # 1024-D V4
spec=ServerlessSpec(cloud="aws", region="us-east-1")),
)
except Exception as e:
log("ERROR", "danger.reset_database.pinecone_error",
user_id=user_id or "anonymous", ip=ip, error=str(e))
raise HTTPException(500, f"Pinecone reset error: {e}")
log("WARNING", "danger.reset_database.complete",
user_id=user_id or "anonymous", ip=ip,
duration_ms=round((time.perf_counter()-start)*1000))
return {"message": "Database reset complete. All data wiped and indexes recreated."}
# ════════════════════════════════════════════════════════════════
# 9. DELETE ACCOUNT ⚠️ DESTRUCTIVE — triple-logged
# ════════════════════════════════════════════════════════════════
@app.post("/api/delete-account")
async def delete_account(
request: Request,
user_pinecone_key: str = Form(""),
user_cloudinary_url: str = Form(""),
user_id: str = Form(""),
):
ip = get_ip(request)
start = time.perf_counter()
log("WARNING", "danger.delete_account.attempt",
user_id=user_id or "anonymous", ip=ip)
if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL):
log("WARNING", "danger.delete_account.blocked_shared_db",
user_id=user_id or "anonymous", ip=ip)
raise HTTPException(403, "Account deletion is not allowed on the shared demo database.")
creds = get_cloudinary_creds(user_cloudinary_url)
# ── Cloudinary: paginated delete ALL resources then folders ────
try:
deleted = await asyncio.to_thread(_cld_delete_all_paginated, creds)
_log_fn("INFO", f"Account delete Cloudinary: {deleted} resources removed")
except Exception as e:
_log_fn("WARNING", f"Account delete Cloudinary: {e}")
try:
folders_res = await asyncio.to_thread(_cld_root_folders, creds)
folder_tasks = [
asyncio.to_thread(_cld_remove_folder, f["name"], creds)
for f in folders_res.get("folders", [])
]
if folder_tasks:
await asyncio.gather(*folder_tasks, return_exceptions=True)
except Exception as e:
_log_fn("WARNING", f"Account delete Cloudinary folders: {e}")
# ── Pinecone: delete both indexes ────────────────────────────
try:
pc = _get_pinecone(user_pinecone_key)
existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)}
tasks = []
if IDX_OBJECTS in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS))
if IDX_FACES in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES))
if tasks: await asyncio.gather(*tasks)
except Exception as e:
_log_fn("WARNING", f"Account delete Pinecone: {e}")
log("WARNING", "danger.delete_account.complete",
user_id=user_id or "anonymous", ip=ip,
duration_ms=round((time.perf_counter()-start)*1000))
return {"message": "Account data deleted. Sign out initiated."}