Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import shutil | |
| import uuid | |
| import re | |
| import time | |
| import json | |
| import traceback | |
| import inflect | |
| from datetime import datetime, timezone | |
| from urllib.parse import urlparse | |
| from typing import List | |
| from contextlib import asynccontextmanager | |
| from collections import OrderedDict | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import cloudinary | |
| import cloudinary.uploader | |
| import cloudinary.api | |
| from pinecone import Pinecone, ServerlessSpec | |
| # ── loguru for pretty local console logs (optional dep) ────────── | |
| try: | |
| from loguru import logger as _loguru | |
| _loguru.remove() | |
| _loguru.add( | |
| lambda msg: print(msg, end=""), | |
| format="<green>{time:HH:mm:ss}</green> | <level>{level:<8}</level> | {message}", | |
| level="DEBUG", colorize=True, | |
| ) | |
| _log_fn = _loguru.log | |
| except ImportError: | |
| import logging as _logging | |
| _logging.basicConfig(level=_logging.INFO) | |
| _stdlib = _logging.getLogger("el") | |
| def _log_fn(level, msg): _stdlib.log(getattr(_logging, level, 20), msg) | |
| # ── Deferred imports ───────────────────────────────────────────── | |
| ai = None | |
| p = inflect.engine() | |
| MAX_CONCURRENT_INFERENCES = int(os.getenv("MAX_CONCURRENT_INFERENCES", "1")) # InsightFace ONNX is NOT thread-safe | |
| _inference_sem: asyncio.Semaphore | |
| _pinecone_pool = OrderedDict() | |
| _POOL_MAX = 64 | |
| IDX_FACES = "enterprise-faces" | |
| MAX_FILES_PER_UPLOAD = 20 # cap to prevent memory corruption on large batches | |
| IDX_OBJECTS = "enterprise-objects" | |
| # ── V4 index dimensions ─────────────────────────────────────────── | |
| # enterprise-faces : 1024-D (ArcFace 512 + AdaFace 512, fused) | |
| # enterprise-objects: 1536-D (SigLIP 768 + DINOv2 768, fused) | |
| # ⚠️ If upgrading from V3 (512-D faces), you MUST reset the | |
| # enterprise-faces index via Settings → Danger Zone → Reset DB | |
| IDX_FACES_DIM = int(os.getenv("IDX_FACES_DIM", "1024")) | |
| IDX_OBJECTS_DIM = int(os.getenv("IDX_OBJECTS_DIM", "1536")) | |
| # V4 face search thresholds | |
| # Cosine similarity thresholds for the fused 1024-D ArcFace+AdaFace space | |
| FACE_THRESHOLD_HIGH = 0.40 # high-quality faces (det_score ≥ 0.85) | |
| FACE_THRESHOLD_LOW = 0.32 # lower-quality faces (det_score < 0.85) | |
| FACE_TOP_K_FETCH = 50 # fetch more candidates, filter after merge | |
| # ════════════════════════════════════════════════════════════════ | |
| # SUPABASE LOGGING — async, fire-and-forget, never crashes API | |
| # HF Space Secrets needed: | |
| # SUPABASE_URL → https://xxxx.supabase.co | |
| # SUPABASE_SERVICE_KEY → your Supabase service_role key (not anon!) | |
| # ════════════════════════════════════════════════════════════════ | |
| SUPABASE_URL = os.getenv("SUPABASE_URL", "") | |
| SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY", "") | |
| async def _supabase_log_push(level: str, event: str, data: dict): | |
| """Fire-and-forget insert into Supabase app_logs table.""" | |
| if not (SUPABASE_URL and SUPABASE_SERVICE_KEY): | |
| return | |
| try: | |
| import aiohttp | |
| row = { | |
| "level": level.upper(), | |
| "event": event, | |
| "user_id": str(data.get("user_id", "anonymous")), | |
| "ip": str(data.get("ip", "")), | |
| "mode": str(data.get("mode", "")), | |
| "page": str(data.get("page", "")), | |
| "duration_ms": int(data["duration_ms"]) if "duration_ms" in data else None, | |
| "error": str(data["error"]) if "error" in data else None, | |
| "data": data, | |
| } | |
| headers = { | |
| "Content-Type": "application/json", | |
| "apikey": SUPABASE_SERVICE_KEY, | |
| "Authorization": f"Bearer {SUPABASE_SERVICE_KEY}", | |
| "Prefer": "return=minimal", | |
| } | |
| async with aiohttp.ClientSession() as s: | |
| async with s.post( | |
| f"{SUPABASE_URL}/rest/v1/app_logs", | |
| json=row, headers=headers, | |
| timeout=aiohttp.ClientTimeout(total=5) | |
| ) as r: | |
| if r.status not in (200, 201): | |
| body = await r.text() | |
| _log_fn("WARNING", f"Supabase log insert failed {r.status}: {body[:200]}") | |
| except Exception as exc: | |
| _log_fn("DEBUG", f"Supabase log push skipped: {exc}") | |
| def log(level: str, event: str, **data): | |
| """ | |
| Log to console + Supabase app_logs table (background task). | |
| Usage: log("INFO", "upload.complete", user_id="x", files=3, duration_ms=340) | |
| """ | |
| clean = {k: v for k, v in data.items()} | |
| _log_fn(level.upper(), f"[{event}] {json.dumps(clean, default=str)}") | |
| try: | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| asyncio.create_task(_supabase_log_push(level, event, data)) | |
| except Exception: | |
| pass | |
| # ════════════════════════════════════════════════════════════════ | |
| # HELPERS | |
| # ════════════════════════════════════════════════════════════════ | |
| def get_ip(request: Request) -> str: | |
| xff = request.headers.get("X-Forwarded-For", "") | |
| return xff.split(",")[0].strip() if xff else getattr(request.client, "host", "unknown") | |
| def is_guest(key: str) -> bool: | |
| default = os.getenv("DEFAULT_PINECONE_KEY", "") | |
| return bool(default) and key.strip() == default.strip() | |
| def _get_pinecone(api_key: str) -> Pinecone: | |
| if api_key not in _pinecone_pool: | |
| if len(_pinecone_pool) >= _POOL_MAX: | |
| _pinecone_pool.popitem(last=False) | |
| _pinecone_pool[api_key] = Pinecone(api_key=api_key) | |
| _pinecone_pool.move_to_end(api_key) | |
| return _pinecone_pool[api_key] | |
| def _cld_upload(tmp_path, folder, creds): | |
| return cloudinary.uploader.upload(tmp_path, folder=folder, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| def _cld_ping(creds): | |
| return cloudinary.api.ping( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| def _cld_root_folders(creds): | |
| return cloudinary.api.root_folders( | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| def get_cloudinary_creds(env_url: str) -> dict: | |
| if not env_url: return {} | |
| parsed = urlparse(env_url) | |
| return {"api_key": parsed.username, "api_secret": parsed.password, "cloud_name": parsed.hostname} | |
| def standardize_category_name(name: str) -> str: | |
| clean = re.sub(r'\s+', '_', name.strip().lower()) | |
| clean = re.sub(r'[^\w]', '', clean) | |
| return p.singular_noun(clean) or clean | |
| def sanitize_filename(filename: str) -> str: | |
| return re.sub(r'[^\w.\-]', '', re.sub(r'\s+', '_', filename)) | |
| DEFAULT_PC_KEY = os.getenv("DEFAULT_PINECONE_KEY", "") | |
| DEFAULT_CLD_URL = os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| def _is_default_key(key: str, default: str) -> bool: | |
| return bool(default) and key.strip() == default.strip() | |
| # ════════════════════════════════════════════════════════════════ | |
| # APP STARTUP / SHUTDOWN | |
| # ════════════════════════════════════════════════════════════════ | |
| async def lifespan(app: FastAPI): | |
| global ai, _inference_sem | |
| from src.models import AIModelManager | |
| log("INFO", "server.startup", message="Loading AI models...") | |
| loop = asyncio.get_event_loop() | |
| ai = await loop.run_in_executor(None, AIModelManager) | |
| _inference_sem = asyncio.Semaphore(MAX_CONCURRENT_INFERENCES) | |
| log("INFO", "server.ready", message="All models loaded. API ready.") | |
| yield | |
| log("INFO", "server.shutdown", message="API shutting down.") | |
| app = FastAPI(lifespan=lifespan) | |
| app.add_middleware(CORSMiddleware, | |
| allow_origins=["*"], allow_credentials=True, | |
| allow_methods=["*"], allow_headers=["*"]) | |
| os.makedirs("temp_uploads", exist_ok=True) | |
| # ════════════════════════════════════════════════════════════════ | |
| # FRONTEND EVENT LOG — React calls this for client-side events | |
| # Logs: page visits, tab switches, mode toggles, search/upload | |
| # initiated, settings changes, errors caught in UI | |
| # ════════════════════════════════════════════════════════════════ | |
| async def frontend_log( | |
| request: Request, | |
| event: str = Form(...), # e.g. "page.visit", "search.initiated" | |
| user_id: str = Form(""), | |
| page: str = Form(""), | |
| metadata: str = Form("{}"), # JSON string with extra context | |
| ): | |
| ip = get_ip(request) | |
| try: | |
| meta = json.loads(metadata) if metadata else {} | |
| except Exception: | |
| meta = {} | |
| log("INFO", f"frontend.{event}", | |
| user_id = user_id or "anonymous", | |
| page = page, | |
| ip = ip, | |
| ua = request.headers.get("User-Agent", "")[:120], | |
| **meta, | |
| ) | |
| return {"ok": True} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 1. VERIFY KEYS & AUTO-BUILD INDEXES | |
| # ════════════════════════════════════════════════════════════════ | |
| async def verify_keys( | |
| request: Request, | |
| pinecone_key: str = Form(""), | |
| cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| mode = "guest" if is_guest(pinecone_key) else "personal" | |
| start = time.perf_counter() | |
| log("INFO", "settings.verify_keys.start", | |
| user_id=user_id or "anonymous", mode=mode, ip=ip) | |
| if cloudinary_url: | |
| try: | |
| creds_v = get_cloudinary_creds(cloudinary_url) | |
| if not creds_v.get("cloud_name"): raise ValueError("bad url") | |
| await asyncio.to_thread(_cld_ping, creds_v) | |
| except HTTPException: raise | |
| except Exception as e: | |
| log("ERROR", "settings.verify_keys.cloudinary_fail", | |
| user_id=user_id or "anonymous", ip=ip, error=str(e), | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| raise HTTPException(400, "Invalid Cloudinary Environment URL.") | |
| indexes_created = [] | |
| if pinecone_key: | |
| try: | |
| pc = _get_pinecone(pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| tasks = [] | |
| if IDX_OBJECTS not in existing: | |
| tasks.append(asyncio.to_thread(pc.create_index, name=IDX_OBJECTS, | |
| dimension=IDX_OBJECTS_DIM, # 1536-D SigLIP+DINOv2 | |
| metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"))) | |
| indexes_created.append(IDX_OBJECTS) | |
| if IDX_FACES not in existing: | |
| tasks.append(asyncio.to_thread(pc.create_index, name=IDX_FACES, | |
| dimension=IDX_FACES_DIM, # 1024-D ArcFace+AdaFace (V4) | |
| metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"))) | |
| indexes_created.append(IDX_FACES) | |
| if tasks: await asyncio.gather(*tasks) | |
| except Exception as e: | |
| err = str(e) | |
| clean = ("Invalid Pinecone API Key. Please check your key and try again." | |
| if "401" in err or "unauthorized" in err.lower() | |
| else f"Pinecone Error: {err}") | |
| log("ERROR", "settings.verify_keys.pinecone_fail", | |
| user_id=user_id or "anonymous", ip=ip, error=clean, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| raise HTTPException(400, clean) | |
| log("INFO", "settings.verify_keys.success", | |
| user_id=user_id or "anonymous", mode=mode, ip=ip, | |
| indexes_created=indexes_created, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"message": "Keys verified and indexes ready!"} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 2. UPLOAD | |
| # ════════════════════════════════════════════════════════════════ | |
| async def upload_new_images( | |
| request: Request, | |
| files: List[UploadFile] = File(...), | |
| folder_name: str = Form(...), | |
| detect_faces: bool = Form(True), | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| mode = "guest" if is_guest(actual_pc_key) else "personal" | |
| log("INFO", "upload.start", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| folder=folder_name, file_count=len(files), | |
| file_names=[f.filename for f in files][:10], | |
| detect_faces=detect_faces) | |
| if not actual_pc_key or not actual_cld_url: | |
| log("ERROR", "upload.missing_keys", user_id=user_id or "anonymous", ip=ip, mode=mode) | |
| raise HTTPException(400, "API Keys are missing. If you are a guest, the server is missing its DEFAULT_ secrets in Hugging Face.") | |
| if len(files) > MAX_FILES_PER_UPLOAD: | |
| raise HTTPException(400, f"Maximum {MAX_FILES_PER_UPLOAD} files per upload. Please split into smaller batches.") | |
| folder = standardize_category_name(folder_name) | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): | |
| log("ERROR", "upload.bad_cloudinary_url", user_id=user_id or "anonymous", ip=ip) | |
| raise HTTPException(400, "Invalid Cloudinary URL format.") | |
| pc = _get_pinecone(actual_pc_key) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| idx_face = pc.Index(IDX_FACES) | |
| uploaded_urls = [] | |
| face_vec_total = 0 | |
| object_vec_total = 0 | |
| for file in files: | |
| tmp_path = f"temp_uploads/{uuid.uuid4().hex}_{sanitize_filename(file.filename)}" | |
| file_start = time.perf_counter() | |
| try: | |
| with open(tmp_path, "wb") as buf: | |
| shutil.copyfileobj(file.file, buf) | |
| res = await asyncio.to_thread(_cld_upload, tmp_path, folder, creds) | |
| image_url = res["secure_url"] | |
| uploaded_urls.append(image_url) | |
| async with _inference_sem: | |
| vectors = await ai.process_image_async(tmp_path, is_query=False, detect_faces=detect_faces) | |
| face_upserts = [] | |
| object_upserts = [] | |
| for v in vectors: | |
| vec_list = v["vector"].tolist() if hasattr(v["vector"], "tolist") else v["vector"] | |
| if v["type"] == "face": | |
| # ── FACE STORE: ArcFace+AdaFace 1024-D fused embedding | |
| # V4: includes face_quality + face_width_px for retrieval scoring | |
| face_upserts.append({ | |
| "id": str(uuid.uuid4()), | |
| "values": vec_list, | |
| "metadata": { | |
| "image_url": image_url, | |
| "url": image_url, | |
| "folder": folder, | |
| "face_idx": v.get("face_idx", 0), | |
| "bbox": str(v.get("bbox", [])), | |
| "face_crop": v.get("face_crop", ""), | |
| "det_score": v.get("det_score", 1.0), | |
| "face_quality": v.get("face_quality", v.get("det_score", 1.0)), | |
| "face_width_px": v.get("face_width_px", 0), | |
| } | |
| }) | |
| else: | |
| # ── OBJECT STORE: SigLIP+DINOv2 1536-D fused embedding | |
| object_upserts.append({ | |
| "id": str(uuid.uuid4()), | |
| "values": vec_list, | |
| "metadata": { | |
| "image_url": image_url, | |
| "url": image_url, | |
| "folder": folder, | |
| } | |
| }) | |
| # Always upsert to BOTH indexes: | |
| # - face index gets face embeddings (if any faces detected) | |
| # - object index ALWAYS gets full image embedding | |
| face_vec_total += len(face_upserts) | |
| object_vec_total += len(object_upserts) | |
| upsert_tasks = [] | |
| if face_upserts: | |
| upsert_tasks.append(asyncio.to_thread(idx_face.upsert, vectors=face_upserts)) | |
| if object_upserts: | |
| upsert_tasks.append(asyncio.to_thread(idx_obj.upsert, vectors=object_upserts)) | |
| if upsert_tasks: | |
| await asyncio.gather(*upsert_tasks) | |
| log("INFO", "upload.file.success", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| filename=file.filename, folder=folder, image_url=image_url, | |
| face_vectors=len(face_upserts), obj_vectors=len(object_upserts), | |
| detect_faces=detect_faces, | |
| duration_ms=round((time.perf_counter()-file_start)*1000)) | |
| except Exception as e: | |
| log("ERROR", "upload.file.error", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| filename=file.filename, folder=folder, error=str(e), | |
| traceback=traceback.format_exc()[-800:], | |
| duration_ms=round((time.perf_counter()-file_start)*1000)) | |
| err = str(e) | |
| if "not found" in err.lower() or "404" in err: | |
| raise HTTPException(404, "Pinecone index not found. Please go to Settings and click 'Verify & Save' to recreate your indexes.") | |
| raise HTTPException(500, f"Upload processing failed: {err}") | |
| finally: | |
| if os.path.exists(tmp_path): os.remove(tmp_path) | |
| log("INFO", "upload.complete", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| folder=folder, files_uploaded=len(uploaded_urls), | |
| face_vectors=face_vec_total, object_vectors=object_vec_total, | |
| detect_faces=detect_faces, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"message": "Done!", "urls": uploaded_urls} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 3. SEARCH | |
| # ════════════════════════════════════════════════════════════════ | |
| async def search_database( | |
| request: Request, | |
| file: UploadFile = File(...), | |
| detect_faces: bool = Form(True), | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| mode = "guest" if is_guest(actual_pc_key) else "personal" | |
| log("INFO", "search.start", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| filename=file.filename, detect_faces=detect_faces) | |
| if not actual_pc_key: | |
| log("ERROR", "search.missing_keys", user_id=user_id or "anonymous", ip=ip, mode=mode) | |
| raise HTTPException(400, "Pinecone Key is missing.") | |
| tmp_path = f"temp_uploads/query_{uuid.uuid4().hex}_{sanitize_filename(file.filename)}" | |
| try: | |
| with open(tmp_path, "wb") as buf: | |
| shutil.copyfileobj(file.file, buf) | |
| async with _inference_sem: | |
| vectors = await ai.process_image_async(tmp_path, is_query=True, detect_faces=detect_faces) | |
| inference_ms = round((time.perf_counter() - start) * 1000) | |
| lanes_used = list({v["type"] for v in vectors}) | |
| log("INFO", "search.inference_done", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| vector_count=len(vectors), lanes=lanes_used, inference_ms=inference_ms) | |
| pc = _get_pinecone(actual_pc_key) | |
| idx_obj = pc.Index(IDX_OBJECTS) | |
| idx_face = pc.Index(IDX_FACES) | |
| # ── V4: split vectors by type ──────────────────────────── | |
| face_vectors = [v for v in vectors if v["type"] == "face"] | |
| object_vectors = [v for v in vectors if v["type"] == "object"] | |
| # ════════════════════════════════════════════════════════ | |
| # OBJECT MODE helper | |
| # Used when no faces detected or face search disabled. | |
| # ════════════════════════════════════════════════════════ | |
| async def _query_object_one(vec_dict: dict): | |
| vec_list = (vec_dict["vector"].tolist() | |
| if hasattr(vec_dict["vector"], "tolist") | |
| else vec_dict["vector"]) | |
| try: | |
| res = await asyncio.to_thread( | |
| idx_obj.query, vector=vec_list, top_k=10, include_metadata=True) | |
| except Exception as e: | |
| if "404" in str(e): | |
| raise HTTPException(404, "Pinecone Index not found. Go to Settings → Verify & Save.") | |
| raise e | |
| out = [] | |
| for match in res.get("matches", []): | |
| if match["score"] < 0.45: | |
| continue | |
| out.append({ | |
| "url": match["metadata"].get("url") or match["metadata"].get("image_url", ""), | |
| "score": round(match["score"], 4), | |
| "caption": match["metadata"].get("folder", "🎯 Visual Match"), | |
| }) | |
| return out | |
| if detect_faces and face_vectors: | |
| # ════════════════════════════════════════════════════ | |
| # V4 FACE MODE — Multi-face merge retrieval | |
| # | |
| # For a group photo with N detected faces: | |
| # 1. Query enterprise-faces for EACH face (top_k=50) | |
| # 2. Build a global image_url → match_data map | |
| # • An image is included if ANY face matches | |
| # • Score = highest matching face score for that image | |
| # • Track WHICH face indices matched each image | |
| # 3. Group results PER query face (for UI display) | |
| # 4. Also build a "cross-face" flat list: | |
| # images that matched multiple faces rank higher | |
| # | |
| # Threshold logic: | |
| # High-quality face (det_score ≥ 0.85) → threshold 0.40 | |
| # Lower-quality face → threshold 0.32 | |
| # (Fused 1024-D space has different cosine distribution | |
| # than raw ArcFace 512-D — thresholds adjusted accordingly) | |
| # ════════════════════════════════════════════════════ | |
| async def _query_single_face(face_vec: dict) -> dict: | |
| """ | |
| Query enterprise-faces for one detected face. | |
| Returns per-face result group for UI + raw match map. | |
| """ | |
| vec_list = (face_vec["vector"].tolist() | |
| if hasattr(face_vec["vector"], "tolist") | |
| else face_vec["vector"]) | |
| # Adaptive threshold: high-quality → stricter | |
| det_score = face_vec.get("det_score", 1.0) | |
| threshold = FACE_THRESHOLD_HIGH if det_score >= 0.85 else FACE_THRESHOLD_LOW | |
| try: | |
| face_res = await asyncio.to_thread( | |
| idx_face.query, | |
| vector=vec_list, | |
| top_k=FACE_TOP_K_FETCH, | |
| include_metadata=True, | |
| ) | |
| except Exception as e: | |
| if "404" in str(e): | |
| raise HTTPException(404, "Pinecone index not found. Go to Settings → Verify & Save.") | |
| raise e | |
| # Collect matches — keep BEST score per image_url | |
| # (multiple face vectors stored per image during upload, | |
| # we only want the best matching one per image) | |
| image_map = {} # image_url → best match data | |
| for match in face_res.get("matches", []): | |
| raw = match["score"] | |
| if raw < threshold: | |
| continue | |
| url = (match["metadata"].get("url") or | |
| match["metadata"].get("image_url", "")) | |
| if not url: | |
| continue | |
| if url not in image_map or raw > image_map[url]["raw_score"]: | |
| image_map[url] = { | |
| "raw_score": raw, | |
| "face_crop": match["metadata"].get("face_crop", ""), | |
| "folder": match["metadata"].get("folder", ""), | |
| "face_quality": match["metadata"].get("face_quality", 1.0), | |
| "face_width_px": match["metadata"].get("face_width_px", 0), | |
| } | |
| # Remap raw cosine → UI score (75%–99%) | |
| # Range is now 0.32–1.0 (wider than old 0.35–1.0) | |
| def _ui_score(raw: float) -> float: | |
| lo, hi = FACE_THRESHOLD_LOW, 1.0 | |
| return round(min(0.99, 0.75 + ((raw - lo) / (hi - lo)) * 0.24), 4) | |
| matches = [ | |
| { | |
| "url": url, | |
| "score": _ui_score(d["raw_score"]), | |
| "raw_score": round(d["raw_score"], 4), | |
| "face_crop": d["face_crop"], | |
| "folder": d["folder"], | |
| "caption": "👤 Verified Identity", | |
| } | |
| for url, d in image_map.items() | |
| ] | |
| matches = sorted(matches, key=lambda x: x["score"], reverse=True)[:15] | |
| return { | |
| "query_face_idx": face_vec.get("face_idx", 0), | |
| "query_face_crop": face_vec.get("face_crop", ""), | |
| "det_score": det_score, | |
| "face_width_px": face_vec.get("face_width_px", 0), | |
| "matches": matches, | |
| "_image_map": image_map, # used for cross-face merge below | |
| } | |
| # Query all faces in parallel | |
| raw_groups = await asyncio.gather( | |
| *[_query_single_face(fv) for fv in face_vectors]) | |
| # ── Cross-face merge ──────────────────────────────── | |
| # Build a global image → {best_score, matched_face_indices} | |
| # An image appearing for multiple faces gets a boost: | |
| # final_score = best_face_score * (1 + 0.05 * extra_face_count) | |
| # This makes images with multiple searched people rank higher. | |
| global_image_map = {} # url → {score, matched_faces, face_crop, folder} | |
| for gi, group in enumerate(raw_groups): | |
| for url, d in group["_image_map"].items(): | |
| raw = d["raw_score"] | |
| if url not in global_image_map: | |
| global_image_map[url] = { | |
| "raw_score": raw, | |
| "face_crop": d["face_crop"], | |
| "folder": d["folder"], | |
| "matched_faces": [gi], | |
| } | |
| else: | |
| existing = global_image_map[url] | |
| existing["matched_faces"].append(gi) | |
| if raw > existing["raw_score"]: | |
| existing["raw_score"] = raw | |
| existing["face_crop"] = d["face_crop"] | |
| # Apply multi-face boost and build flat merged list | |
| def _boosted_ui_score(raw: float, n_faces: int) -> float: | |
| lo = FACE_THRESHOLD_LOW | |
| base = 0.75 + ((raw - lo) / (1.0 - lo)) * 0.24 | |
| boosted = base * (1.0 + 0.05 * (n_faces - 1)) | |
| return round(min(0.99, boosted), 4) | |
| merged_results = [] | |
| for url, d in global_image_map.items(): | |
| n = len(d["matched_faces"]) | |
| merged_results.append({ | |
| "url": url, | |
| "score": _boosted_ui_score(d["raw_score"], n), | |
| "raw_score": round(d["raw_score"], 4), | |
| "face_crop": d["face_crop"], | |
| "folder": d["folder"], | |
| "matched_faces": d["matched_faces"], | |
| "caption": (f"👥 {n} faces matched" if n > 1 | |
| else "👤 Verified Identity"), | |
| }) | |
| merged_results = sorted( | |
| merged_results, key=lambda x: x["score"], reverse=True)[:20] | |
| # Clean per-group results (remove internal _image_map) | |
| face_groups = [] | |
| for g in raw_groups: | |
| clean = {k: v for k, v in g.items() if k != "_image_map"} | |
| if clean["matches"]: | |
| face_groups.append(clean) | |
| duration_ms = round((time.perf_counter() - start) * 1000) | |
| total_matches = len(merged_results) | |
| log("INFO", "search.complete", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| lanes=["face"], detect_faces=detect_faces, | |
| face_groups=len(face_groups), | |
| merged_results=total_matches, | |
| top_score=merged_results[0]["score"] if merged_results else 0, | |
| duration_ms=duration_ms) | |
| return { | |
| "mode": "face", | |
| "face_groups": face_groups, # per-face results for UI tabs | |
| "results": merged_results, # V4: flat merged cross-face list | |
| } | |
| else: | |
| # ════════════════════════════════════════════════════ | |
| # OBJECT MODE — flat ranked results from object index | |
| # ════════════════════════════════════════════════════ | |
| nested = await asyncio.gather( | |
| *[_query_object_one(v) for v in vectors]) | |
| all_results = [r for sub in nested for r in sub] | |
| seen = {} | |
| for r in all_results: | |
| url = r["url"] | |
| if url not in seen or r["score"] > seen[url]["score"]: | |
| seen[url] = r | |
| final = sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:10] | |
| duration_ms = round((time.perf_counter() - start) * 1000) | |
| log("INFO", "search.complete", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| lanes=lanes_used, detect_faces=detect_faces, | |
| results_count=len(final), | |
| top_score=final[0]["score"] if final else 0, | |
| duration_ms=duration_ms) | |
| return {"mode": "object", "results": final, "face_groups": []} | |
| except HTTPException: raise | |
| except Exception as e: | |
| log("ERROR", "search.error", | |
| user_id=user_id or "anonymous", ip=ip, mode=mode, | |
| error=str(e), traceback=traceback.format_exc()[-800:], | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| raise HTTPException(500, str(e)) | |
| finally: | |
| if os.path.exists(tmp_path): os.remove(tmp_path) | |
| # ════════════════════════════════════════════════════════════════ | |
| # 4. CATEGORIES | |
| # ════════════════════════════════════════════════════════════════ | |
| async def get_categories( | |
| request: Request, | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| actual_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| if not actual_url: return {"categories": []} | |
| try: | |
| creds = get_cloudinary_creds(actual_url) | |
| if not creds.get("cloud_name"): return {"categories": []} | |
| result = await asyncio.to_thread(_cld_root_folders, creds) | |
| categories = [f["name"] for f in result.get("folders", [])] | |
| log("INFO", "categories.fetched", | |
| user_id=user_id or "anonymous", ip=ip, category_count=len(categories)) | |
| return {"categories": categories} | |
| except Exception as e: | |
| log("ERROR", "categories.error", user_id=user_id or "anonymous", ip=ip, error=str(e)) | |
| return {"categories": []} | |
| async def root(): | |
| return {"status": "ok"} | |
| async def health(): | |
| return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 5. LIST FOLDER IMAGES | |
| # ════════════════════════════════════════════════════════════════ | |
| def _cld_list_folder_images(folder: str, creds: dict, next_cursor: str = None, max_results: int = 100): | |
| kwargs = dict(type="upload", prefix=f"{folder}/", max_results=max_results, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| if next_cursor: kwargs["next_cursor"] = next_cursor | |
| return cloudinary.api.resources(**kwargs) | |
| def _cld_thumb_url(secure_url: str, cloud_name: str) -> str: | |
| """ | |
| Convert full-resolution Cloudinary URL to a small thumbnail URL. | |
| Inserts Cloudinary transformation: 400×400 fill, auto quality, auto format. | |
| Example: | |
| https://res.cloudinary.com/demo/image/upload/v123/folder/img.jpg | |
| → | |
| https://res.cloudinary.com/demo/image/upload/w_400,h_400,c_fill,q_auto,f_auto/v123/folder/img.jpg | |
| """ | |
| try: | |
| marker = f"/image/upload/" | |
| idx = secure_url.find(marker) | |
| if idx == -1: | |
| return secure_url # can't transform — return original | |
| base = secure_url[:idx + len(marker)] | |
| rest = secure_url[idx + len(marker):] | |
| # Skip existing transformation block if present | |
| if rest.startswith("w_") or rest.startswith("h_") or rest.startswith("c_") or rest.startswith("q_"): | |
| return secure_url # already transformed | |
| return f"{base}w_400,h_400,c_fill,q_auto,f_auto/{rest}" | |
| except Exception: | |
| return secure_url | |
| async def list_folder_images( | |
| request: Request, | |
| user_cloudinary_url: str = Form(""), | |
| folder_name: str = Form(...), | |
| user_id: str = Form(""), | |
| next_cursor: str = Form(""), | |
| page_size: int = Form(100), | |
| ): | |
| ip = get_ip(request) | |
| actual_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_url) | |
| if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.") | |
| # Single page fetch — frontend drives pagination via next_cursor | |
| result = await asyncio.to_thread( | |
| _cld_list_folder_images, folder_name, creds, | |
| next_cursor or None, min(page_size, 100) | |
| ) | |
| images = [] | |
| for r in result.get("resources", []): | |
| full_url = r["secure_url"] | |
| thumb_url = _cld_thumb_url(full_url, creds["cloud_name"]) | |
| images.append({ | |
| "url": full_url, # full-res for lightbox / download | |
| "thumb_url": thumb_url, # 400×400 thumbnail for grid display | |
| "public_id": r["public_id"], | |
| }) | |
| response_cursor = result.get("next_cursor") or "" | |
| log("INFO", "explorer.folder_opened", | |
| user_id=user_id or "anonymous", ip=ip, | |
| folder_name=folder_name, image_count=len(images), | |
| has_more=bool(response_cursor)) | |
| return {"images": images, "count": len(images), "next_cursor": response_cursor} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 6. DELETE SINGLE IMAGE | |
| # ════════════════════════════════════════════════════════════════ | |
| def url_to_public_id(image_url: str, cloud_name: str) -> str: | |
| try: | |
| path = urlparse(image_url).path | |
| parts = path.split("/") | |
| upload_idx = parts.index("upload") | |
| after = parts[upload_idx + 1:] | |
| if after and after[0].startswith("v") and after[0][1:].isdigit(): after = after[1:] | |
| return "/".join(after).rsplit(".", 1)[0] | |
| except Exception: return "" | |
| def _cld_delete_resource(public_id: str, creds: dict): | |
| return cloudinary.uploader.destroy(public_id, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| async def delete_image( | |
| request: Request, | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| image_url: str = Form(""), | |
| public_id: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.") | |
| pid = public_id or url_to_public_id(image_url, creds["cloud_name"]) | |
| if not pid: raise HTTPException(400, "Could not determine public_id.") | |
| await asyncio.to_thread(_cld_delete_resource, pid, creds) | |
| if actual_pc_key and image_url: | |
| try: | |
| pc = _get_pinecone(actual_pc_key) | |
| for idx_name in [IDX_OBJECTS, IDX_FACES]: | |
| await asyncio.to_thread(pc.Index(idx_name).delete, filter={"url": {"$eq": image_url}}) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Pinecone delete warning: {e}") | |
| log("INFO", "explorer.image_deleted", | |
| user_id=user_id or "anonymous", ip=ip, | |
| image_url=image_url, public_id=pid) | |
| return {"message": "Image deleted successfully."} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 7. DELETE ENTIRE FOLDER | |
| # ════════════════════════════════════════════════════════════════ | |
| def _cld_delete_folder(folder: str, creds: dict): | |
| return cloudinary.api.delete_resources_by_prefix(f"{folder}/", | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| def _cld_remove_folder(folder: str, creds: dict): | |
| try: | |
| return cloudinary.api.delete_folder(folder, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], cloud_name=creds["cloud_name"]) | |
| except Exception: pass | |
| def _cld_delete_all_paginated(creds: dict): | |
| """Delete ALL Cloudinary resources in batches of 100 until none left.""" | |
| deleted = 0 | |
| while True: | |
| try: | |
| res = cloudinary.api.resources( | |
| type="upload", max_results=100, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], | |
| cloud_name=creds["cloud_name"], | |
| ) | |
| resources = res.get("resources", []) | |
| if not resources: | |
| break | |
| public_ids = [r["public_id"] for r in resources] | |
| cloudinary.api.delete_resources( | |
| public_ids, | |
| api_key=creds["api_key"], api_secret=creds["api_secret"], | |
| cloud_name=creds["cloud_name"], | |
| ) | |
| deleted += len(public_ids) | |
| print(f"🗑️ Deleted {deleted} resources so far...") | |
| if not res.get("next_cursor"): | |
| break | |
| except Exception as e: | |
| print(f"Cloudinary batch delete error: {e}") | |
| break | |
| return deleted | |
| async def delete_folder( | |
| request: Request, | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| folder_name: str = Form(...), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| actual_pc_key = user_pinecone_key or os.getenv("DEFAULT_PINECONE_KEY", "") | |
| actual_cld_url = user_cloudinary_url or os.getenv("DEFAULT_CLOUDINARY_URL", "") | |
| creds = get_cloudinary_creds(actual_cld_url) | |
| if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.") | |
| all_images, next_cursor = [], None | |
| while True: | |
| result = await asyncio.to_thread(_cld_list_folder_images, folder_name, creds, next_cursor) | |
| all_images.extend(result.get("resources", [])) | |
| next_cursor = result.get("next_cursor") | |
| if not next_cursor: break | |
| await asyncio.to_thread(_cld_delete_folder, folder_name, creds) | |
| await asyncio.to_thread(_cld_remove_folder, folder_name, creds) | |
| if actual_pc_key: | |
| try: | |
| pc = _get_pinecone(actual_pc_key) | |
| for idx_name in [IDX_OBJECTS, IDX_FACES]: | |
| idx = pc.Index(idx_name) | |
| try: | |
| await asyncio.to_thread(idx.delete, filter={"folder": {"$eq": folder_name}}) | |
| except Exception: | |
| for img in all_images: | |
| try: | |
| if img.get("secure_url"): | |
| await asyncio.to_thread(idx.delete, filter={"url": {"$eq": img["secure_url"]}}) | |
| except Exception: pass | |
| except Exception as e: | |
| _log_fn("WARNING", f"Pinecone folder delete warning: {e}") | |
| log("INFO", "explorer.folder_deleted", | |
| user_id=user_id or "anonymous", ip=ip, | |
| folder_name=folder_name, deleted_count=len(all_images)) | |
| return {"message": f"Folder '{folder_name}' and all its contents deleted.", "deleted_count": len(all_images)} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 8. RESET DATABASE ⚠️ DESTRUCTIVE — triple-logged | |
| # ════════════════════════════════════════════════════════════════ | |
| async def reset_database( | |
| request: Request, | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| log("WARNING", "danger.reset_database.attempt", | |
| user_id=user_id or "anonymous", ip=ip) | |
| if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL): | |
| log("WARNING", "danger.reset_database.blocked_shared_db", | |
| user_id=user_id or "anonymous", ip=ip) | |
| raise HTTPException(403, "Reset is not allowed on the shared demo database.") | |
| creds = get_cloudinary_creds(user_cloudinary_url) | |
| if not creds.get("cloud_name"): raise HTTPException(400, "Invalid Cloudinary URL.") | |
| # ── Cloudinary: paginated delete ALL resources then folders ──── | |
| try: | |
| deleted = await asyncio.to_thread(_cld_delete_all_paginated, creds) | |
| _log_fn("INFO", f"Cloudinary: deleted {deleted} resources") | |
| except Exception as e: | |
| _log_fn("WARNING", f"Cloudinary wipe: {e}") | |
| try: | |
| folders_res = await asyncio.to_thread(_cld_root_folders, creds) | |
| # Delete all folders in parallel | |
| folder_tasks = [ | |
| asyncio.to_thread(_cld_remove_folder, f["name"], creds) | |
| for f in folders_res.get("folders", []) | |
| ] | |
| if folder_tasks: | |
| await asyncio.gather(*folder_tasks, return_exceptions=True) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Cloudinary folder cleanup: {e}") | |
| # ── Pinecone: delete both indexes + recreate ───────────────── | |
| try: | |
| pc = _get_pinecone(user_pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| tasks = [] | |
| if IDX_OBJECTS in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS)) | |
| if IDX_FACES in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES)) | |
| if tasks: await asyncio.gather(*tasks) | |
| await asyncio.sleep(3) # wait for Pinecone to fully delete | |
| await asyncio.gather( | |
| asyncio.to_thread(pc.create_index, name=IDX_OBJECTS, | |
| dimension=IDX_OBJECTS_DIM, metric="cosine", # 1536-D | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1")), | |
| asyncio.to_thread(pc.create_index, name=IDX_FACES, | |
| dimension=IDX_FACES_DIM, metric="cosine", # 1024-D V4 | |
| spec=ServerlessSpec(cloud="aws", region="us-east-1")), | |
| ) | |
| except Exception as e: | |
| log("ERROR", "danger.reset_database.pinecone_error", | |
| user_id=user_id or "anonymous", ip=ip, error=str(e)) | |
| raise HTTPException(500, f"Pinecone reset error: {e}") | |
| log("WARNING", "danger.reset_database.complete", | |
| user_id=user_id or "anonymous", ip=ip, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"message": "Database reset complete. All data wiped and indexes recreated."} | |
| # ════════════════════════════════════════════════════════════════ | |
| # 9. DELETE ACCOUNT ⚠️ DESTRUCTIVE — triple-logged | |
| # ════════════════════════════════════════════════════════════════ | |
| async def delete_account( | |
| request: Request, | |
| user_pinecone_key: str = Form(""), | |
| user_cloudinary_url: str = Form(""), | |
| user_id: str = Form(""), | |
| ): | |
| ip = get_ip(request) | |
| start = time.perf_counter() | |
| log("WARNING", "danger.delete_account.attempt", | |
| user_id=user_id or "anonymous", ip=ip) | |
| if _is_default_key(user_pinecone_key, DEFAULT_PC_KEY) or _is_default_key(user_cloudinary_url, DEFAULT_CLD_URL): | |
| log("WARNING", "danger.delete_account.blocked_shared_db", | |
| user_id=user_id or "anonymous", ip=ip) | |
| raise HTTPException(403, "Account deletion is not allowed on the shared demo database.") | |
| creds = get_cloudinary_creds(user_cloudinary_url) | |
| # ── Cloudinary: paginated delete ALL resources then folders ──── | |
| try: | |
| deleted = await asyncio.to_thread(_cld_delete_all_paginated, creds) | |
| _log_fn("INFO", f"Account delete Cloudinary: {deleted} resources removed") | |
| except Exception as e: | |
| _log_fn("WARNING", f"Account delete Cloudinary: {e}") | |
| try: | |
| folders_res = await asyncio.to_thread(_cld_root_folders, creds) | |
| folder_tasks = [ | |
| asyncio.to_thread(_cld_remove_folder, f["name"], creds) | |
| for f in folders_res.get("folders", []) | |
| ] | |
| if folder_tasks: | |
| await asyncio.gather(*folder_tasks, return_exceptions=True) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Account delete Cloudinary folders: {e}") | |
| # ── Pinecone: delete both indexes ──────────────────────────── | |
| try: | |
| pc = _get_pinecone(user_pinecone_key) | |
| existing = {idx.name for idx in await asyncio.to_thread(pc.list_indexes)} | |
| tasks = [] | |
| if IDX_OBJECTS in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_OBJECTS)) | |
| if IDX_FACES in existing: tasks.append(asyncio.to_thread(pc.delete_index, IDX_FACES)) | |
| if tasks: await asyncio.gather(*tasks) | |
| except Exception as e: | |
| _log_fn("WARNING", f"Account delete Pinecone: {e}") | |
| log("WARNING", "danger.delete_account.complete", | |
| user_id=user_id or "anonymous", ip=ip, | |
| duration_ms=round((time.perf_counter()-start)*1000)) | |
| return {"message": "Account data deleted. Sign out initiated."} |