Spaces:
Running
Running
| """ | |
| Database connection helpers for Supabase. | |
| Uses psycopg (v3) for direct PostgreSQL/pgvector queries (search), | |
| and supabase-py for standard CRUD operations. | |
| """ | |
| import os | |
| import psycopg | |
| from psycopg.rows import dict_row | |
| import numpy as np | |
| from supabase import create_client, Client | |
| from config import SUPABASE_URL, SUPABASE_KEY, DATABASE_URL | |
| # --- Supabase Client (for CRUD operations) --- | |
| _supabase_client: Client = None | |
| def get_supabase() -> Client: | |
| """Get or create the Supabase client for standard CRUD operations.""" | |
| global _supabase_client | |
| if _supabase_client is None: | |
| url = (os.environ.get("SUPABASE_URL") or SUPABASE_URL).strip() | |
| key = (os.environ.get("SUPABASE_KEY") or SUPABASE_KEY).strip() | |
| _supabase_client = create_client(url, key) | |
| return _supabase_client | |
| # --- Direct PostgreSQL connection (for pgvector queries) --- | |
| def get_db_connection(): | |
| """Create a new psycopg3 connection for pgvector queries.""" | |
| db_url = (os.environ.get("DATABASE_URL") or DATABASE_URL).strip() | |
| conn = psycopg.connect(db_url, row_factory=dict_row) | |
| return conn | |
| def close_db_pool(): | |
| """Placeholder for shutdown compatibility.""" | |
| pass | |
| # --- Helper: embedding <-> database conversion --- | |
| def embedding_to_pgvector(embedding: np.ndarray) -> str: | |
| """Convert a numpy embedding to pgvector string format: '[0.1,0.2,...]'""" | |
| return "[" + ",".join(f"{x:.8f}" for x in embedding.tolist()) + "]" | |
| def pgvector_to_embedding(pgvector_str: str) -> np.ndarray: | |
| """Convert a pgvector string back to numpy array.""" | |
| values = pgvector_str.strip("[]").split(",") | |
| return np.array([float(v) for v in values], dtype=np.float32) | |
| # --- pgvector similarity search --- | |
| def search_similar_products(query_embedding: np.ndarray, top_k: int = 50): | |
| """ | |
| Find the top_k most similar products to the query embedding | |
| using pgvector cosine similarity search. | |
| Returns list of dicts with product info + embedding. | |
| """ | |
| conn = get_db_connection() | |
| embedding_str = embedding_to_pgvector(query_embedding) | |
| query = """ | |
| SELECT | |
| id, seller_id, title, description, price, stock, images, | |
| embedding::text as embedding_text, | |
| 1 - (embedding <=> %s::vector) as similarity | |
| FROM products | |
| WHERE is_active = true AND status = 'approved' AND stock > 0 AND embedding IS NOT NULL | |
| ORDER BY embedding <=> %s::vector | |
| LIMIT %s | |
| """ | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute(query, (embedding_str, embedding_str, top_k)) | |
| rows = cur.fetchall() | |
| finally: | |
| conn.close() | |
| results = [] | |
| for row in rows: | |
| results.append({ | |
| "id": str(row["id"]), | |
| "seller_id": str(row["seller_id"]), | |
| "title": row["title"], | |
| "description": row["description"], | |
| "price": float(row["price"]), | |
| "stock": int(row["stock"]), | |
| "images": row["images"] or [], | |
| "embedding": pgvector_to_embedding(row["embedding_text"]), | |
| "similarity": float(row["similarity"]), | |
| }) | |
| return results | |
| def search_similar_products_filtered( | |
| query_embedding: np.ndarray, | |
| top_k: int = 50, | |
| price_min: float = None, | |
| price_max: float = None, | |
| brand: str = None, | |
| color: str = None, | |
| ): | |
| """ | |
| Find the top_k most similar products with optional structured filters. | |
| Extends search_similar_products with WHERE clauses from query rewriting. | |
| """ | |
| conn = get_db_connection() | |
| embedding_str = embedding_to_pgvector(query_embedding) | |
| # Build dynamic WHERE clause | |
| conditions = ["is_active = true", "status = 'approved'", "stock > 0", "embedding IS NOT NULL"] | |
| filter_params = [] | |
| if price_min is not None: | |
| conditions.append(f"price > %s") | |
| filter_params.append(price_min) | |
| if price_max is not None: | |
| conditions.append(f"price < %s") | |
| filter_params.append(price_max) | |
| if brand: | |
| conditions.append(f"(title ILIKE %s OR description ILIKE %s)") | |
| filter_params.append(f"%{brand}%") | |
| filter_params.append(f"%{brand}%") | |
| if color: | |
| conditions.append(f"(title ILIKE %s OR description ILIKE %s)") | |
| filter_params.append(f"%{color}%") | |
| filter_params.append(f"%{color}%") | |
| where_clause = " AND ".join(conditions) | |
| # Params must match SQL placeholder order: | |
| # 1) embedding for SELECT similarity, 2) filter params for WHERE, | |
| # 3) embedding for ORDER BY, 4) top_k for LIMIT | |
| params = [embedding_str] + filter_params + [embedding_str, top_k] | |
| query = f""" | |
| SELECT | |
| id, seller_id, title, description, price, stock, images, | |
| embedding::text as embedding_text, | |
| 1 - (embedding <=> %s::vector) as similarity | |
| FROM products | |
| WHERE {where_clause} | |
| ORDER BY embedding <=> %s::vector | |
| LIMIT %s | |
| """ | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute(query, tuple(params)) | |
| rows = cur.fetchall() | |
| finally: | |
| conn.close() | |
| results = [] | |
| for row in rows: | |
| results.append({ | |
| "id": str(row["id"]), | |
| "seller_id": str(row["seller_id"]), | |
| "title": row["title"], | |
| "description": row["description"], | |
| "price": float(row["price"]), | |
| "stock": int(row["stock"]), | |
| "images": row["images"] or [], | |
| "embedding": pgvector_to_embedding(row["embedding_text"]), | |
| "similarity": float(row["similarity"]), | |
| }) | |
| return results | |
| def store_product_embedding(product_id: str, embedding: np.ndarray): | |
| """Store/update the BERT embedding for a product.""" | |
| conn = get_db_connection() | |
| embedding_str = embedding_to_pgvector(embedding) | |
| query = """ | |
| UPDATE products SET embedding = %s::vector WHERE id = %s | |
| """ | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute(query, (embedding_str, product_id)) | |
| conn.commit() | |
| finally: | |
| conn.close() | |