| | import os |
| | import uuid |
| | import shutil |
| | import re |
| | from datetime import datetime, timedelta, date |
| | from io import BytesIO |
| | from typing import Dict, List, Optional,Any |
| | import numpy as np |
| | from fastapi import ( |
| | FastAPI, |
| | UploadFile, |
| | File, |
| | HTTPException, |
| | Depends, |
| | Header, |
| | Request, |
| | Form, |
| | ) |
| | from fastapi.responses import FileResponse, JSONResponse |
| | from pydantic import BaseModel |
| | from PIL import Image, UnidentifiedImageError |
| | import cv2 |
| | import logging |
| | from gridfs import GridFS |
| | from gridfs.errors import NoFile |
| |
|
| | from bson import ObjectId |
| | from pymongo import MongoClient |
| | import time |
| |
|
| | |
| | try: |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| | except Exception: |
| | pass |
| |
|
| | logging.basicConfig(level=logging.INFO) |
| | log = logging.getLogger("api") |
| |
|
| | from src.core import process_inpaint |
| |
|
| | |
| | BASE_DIR = os.environ.get("DATA_DIR", "/data") |
| | if not os.path.isdir(BASE_DIR): |
| | |
| | BASE_DIR = "/tmp" |
| |
|
| | UPLOAD_DIR = os.path.join(BASE_DIR, "uploads") |
| | OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") |
| |
|
| | os.makedirs(UPLOAD_DIR, exist_ok=True) |
| | os.makedirs(OUTPUT_DIR, exist_ok=True) |
| |
|
| | |
| | ENV_TOKEN = os.environ.get("API_TOKEN") |
| |
|
| | app = FastAPI(title="Photo Object Removal API", version="1.0.0") |
| |
|
| | |
| | file_store: Dict[str, Dict[str, str]] = {} |
| | logs: List[Dict[str, str]] = [] |
| |
|
| | MONGO_URI = os.environ.get("MONGO_URI") or os.environ.get("MONGODB_URI") |
| | mongo_client = None |
| | mongo_db = None |
| | mongo_logs = None |
| | grid_fs = None |
| |
|
| | if MONGO_URI: |
| | try: |
| | mongo_client = MongoClient(MONGO_URI) |
| | |
| | try: |
| | mongo_db = mongo_client.get_default_database() |
| | log.info("Using database from connection string: %s", mongo_db.name) |
| | except Exception as db_err: |
| | mongo_db = None |
| | log.warning("Could not extract database from connection string: %s", db_err) |
| | |
| | |
| | if mongo_db is None: |
| | mongo_db = mongo_client["object_remover"] |
| | log.info("Using default database: object_remover") |
| | |
| | mongo_logs = mongo_db["api_logs"] |
| | grid_fs = GridFS(mongo_db) |
| | log.info("MongoDB connection initialized successfully - Database: %s, Collection: %s", mongo_db.name, mongo_logs.name) |
| | except Exception as err: |
| | log.error("Failed to initialize MongoDB connection: %s", err, exc_info=True) |
| | log.warning("GridFS operations will be disabled. Set MONGO_URI or MONGODB_URI environment variable.") |
| | else: |
| | log.warning("MONGO_URI not set. GridFS operations will be disabled. Upload endpoints will not work.") |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | API_LOGS_MONGO_URI = os.environ.get("API_LOGS_MONGODB_URL") |
| |
|
| | api_logs_client = None |
| | api_logs_db = None |
| | api_logs_collection = None |
| |
|
| | if API_LOGS_MONGO_URI: |
| | try: |
| | api_logs_client = MongoClient(API_LOGS_MONGO_URI) |
| | api_logs_db = api_logs_client["logs"] |
| | api_logs_collection = api_logs_db["objectRemover"] |
| | log.info("API Logs Mongo initialized → logs/objectRemover") |
| | except Exception as e: |
| | log.error("Failed to initialize API Logs MongoDB: %s", e) |
| | api_logs_collection = None |
| | else: |
| | log.warning("API_LOGS_MONGODB_URL not set. API logging disabled.") |
| | |
| | ADMIN_MONGO_URI = os.environ.get("MONGODB_ADMIN") |
| | DEFAULT_CATEGORY_ID = "69368f722e46bd68ae188984" |
| | admin_media_clicks = None |
| |
|
| | |
| | COLLAGE_MAKER_MONGO_URI = os.environ.get("MONGODB_COLLAGE_MAKER") |
| | COLLAGE_MAKER_DB_NAME = os.environ.get("MONGODB_COLLAGE_MAKER_DB_NAME", "collage-maker") |
| | COLLAGE_MAKER_ADMIN_DB_NAME = os.environ.get("MONGODB_COLLAGE_MAKER_ADMIN_DB_NAME", "adminPanel") |
| | collage_maker_client = None |
| | collage_maker_db = None |
| | collage_maker_admin_db = None |
| | collage_maker_media_clicks = None |
| | collage_maker_categories = None |
| |
|
| | |
| | AI_ENHANCER_MONGO_URI = os.environ.get("MONGODB_AI_ENHANCER") |
| | AI_ENHANCER_DB_NAME = os.environ.get("MONGODB_AI_ENHANCER_DB_NAME", "ai-enhancer") |
| | AI_ENHANCER_ADMIN_DB_NAME = os.environ.get("MONGODB_AI_ENHANCER_ADMIN_DB_NAME", "test") |
| | ai_enhancer_client = None |
| | ai_enhancer_db = None |
| | ai_enhancer_admin_db = None |
| | ai_enhancer_media_clicks = None |
| |
|
| |
|
| | def get_collage_maker_client() -> Optional[MongoClient]: |
| | """Get collage-maker MongoDB client.""" |
| | global collage_maker_client |
| | if collage_maker_client is None and COLLAGE_MAKER_MONGO_URI: |
| | try: |
| | collage_maker_client = MongoClient(COLLAGE_MAKER_MONGO_URI) |
| | log.info("Collage-maker MongoDB client initialized") |
| | except Exception as err: |
| | log.error("Failed to initialize collage-maker MongoDB client: %s", err) |
| | collage_maker_client = None |
| | return collage_maker_client |
| |
|
| |
|
| | def get_collage_maker_database() -> Optional[Any]: |
| | """Get collage-maker database instance.""" |
| | global collage_maker_db |
| | client = get_collage_maker_client() |
| | if client is None: |
| | return None |
| | if collage_maker_db is None: |
| | try: |
| | collage_maker_db = client[COLLAGE_MAKER_DB_NAME] |
| | log.info("Collage-maker database initialized: %s", COLLAGE_MAKER_DB_NAME) |
| | except Exception as err: |
| | log.error("Failed to get collage-maker database: %s", err) |
| | collage_maker_db = None |
| | return collage_maker_db |
| |
|
| |
|
| | def _init_collage_maker_mongo() -> None: |
| | """Initialize collage-maker MongoDB connections.""" |
| | global collage_maker_admin_db, collage_maker_media_clicks, collage_maker_categories |
| | client = get_collage_maker_client() |
| | if client is None: |
| | log.info("Collage-maker Mongo URI not provided; collage-maker features disabled") |
| | return |
| | try: |
| | collage_maker_admin_db = client[COLLAGE_MAKER_ADMIN_DB_NAME] |
| | collage_maker_media_clicks = collage_maker_admin_db["media_clicks"] |
| | collage_maker_categories = collage_maker_admin_db["categories"] |
| | log.info( |
| | "Collage-maker admin initialized: db=%s, media_clicks=%s, categories=%s", |
| | COLLAGE_MAKER_ADMIN_DB_NAME, |
| | collage_maker_media_clicks.name, |
| | collage_maker_categories.name, |
| | ) |
| | except Exception as err: |
| | log.error("Failed to init collage-maker admin Mongo: %s", err) |
| | collage_maker_admin_db = None |
| | collage_maker_media_clicks = None |
| | collage_maker_categories = None |
| |
|
| |
|
| | _init_collage_maker_mongo() |
| |
|
| |
|
| | def get_ai_enhancer_client() -> Optional[MongoClient]: |
| | """Get AI-Enhancer MongoDB client.""" |
| | global ai_enhancer_client |
| | if ai_enhancer_client is None and AI_ENHANCER_MONGO_URI: |
| | try: |
| | ai_enhancer_client = MongoClient(AI_ENHANCER_MONGO_URI) |
| | log.info("AI-Enhancer MongoDB client initialized") |
| | except Exception as err: |
| | log.error("Failed to initialize AI-Enhancer MongoDB client: %s", err) |
| | ai_enhancer_client = None |
| | return ai_enhancer_client |
| |
|
| |
|
| | def get_ai_enhancer_database() -> Optional[Any]: |
| | """Get AI-Enhancer database instance.""" |
| | global ai_enhancer_db |
| | client = get_ai_enhancer_client() |
| | if client is None: |
| | return None |
| | if ai_enhancer_db is None: |
| | try: |
| | ai_enhancer_db = client[AI_ENHANCER_DB_NAME] |
| | log.info("AI-Enhancer database initialized: %s", AI_ENHANCER_DB_NAME) |
| | except Exception as err: |
| | log.error("Failed to get AI-Enhancer database: %s", err) |
| | ai_enhancer_db = None |
| | return ai_enhancer_db |
| |
|
| |
|
| | def _init_ai_enhancer_mongo() -> None: |
| | """Initialize AI-Enhancer MongoDB connections.""" |
| | global ai_enhancer_admin_db, ai_enhancer_media_clicks |
| | client = get_ai_enhancer_client() |
| | if client is None: |
| | log.info("AI-Enhancer Mongo URI not provided; AI-Enhancer features disabled") |
| | return |
| | try: |
| | ai_enhancer_admin_db = client[AI_ENHANCER_ADMIN_DB_NAME] |
| | ai_enhancer_media_clicks = ai_enhancer_admin_db["media_clicks"] |
| | log.info( |
| | "AI-Enhancer admin initialized: db=%s, media_clicks=%s", |
| | AI_ENHANCER_ADMIN_DB_NAME, |
| | ai_enhancer_media_clicks.name, |
| | ) |
| | except Exception as err: |
| | log.error("Failed to init AI-Enhancer admin Mongo: %s", err) |
| | ai_enhancer_admin_db = None |
| | ai_enhancer_media_clicks = None |
| |
|
| |
|
| | _init_ai_enhancer_mongo() |
| |
|
| |
|
| | def get_category_id_from_collage_maker() -> Optional[str]: |
| | """Query category ID from collage-maker categories collection.""" |
| | if collage_maker_categories is None: |
| | log.warning("Collage-maker categories collection not initialized") |
| | return None |
| | try: |
| | |
| | |
| | category_doc = collage_maker_categories.find_one() |
| | if category_doc: |
| | category_id = str(category_doc.get("_id", "")) |
| | log.info("Found category ID from collage-maker: %s", category_id) |
| | return category_id |
| | else: |
| | log.warning("No categories found in collage-maker collection") |
| | return None |
| | except Exception as err: |
| | log.error("Failed to query collage-maker categories: %s", err) |
| | return None |
| |
|
| |
|
| | def _init_admin_mongo() -> None: |
| | global admin_media_clicks |
| | if not ADMIN_MONGO_URI: |
| | log.info("Admin Mongo URI not provided; media click logging disabled") |
| | return |
| | try: |
| | admin_client = MongoClient(ADMIN_MONGO_URI) |
| | |
| | try: |
| | admin_db = admin_client.get_default_database() |
| | except Exception as db_err: |
| | admin_db = None |
| | log.warning("Admin Mongo URI has no default DB; error=%s", db_err) |
| | if admin_db is None: |
| | |
| | admin_db = admin_client["object_remover"] |
| | log.warning("No database in connection string, defaulting to 'object_remover'") |
| | |
| | admin_media_clicks = admin_db["media_clicks"] |
| | log.info( |
| | "Admin media click logging initialized: db=%s collection=%s", |
| | admin_db.name, |
| | admin_media_clicks.name, |
| | ) |
| | try: |
| | admin_media_clicks.drop_index("user_id_1_header_1_media_id_1") |
| | log.info("Dropped legacy index user_id_1_header_1_media_id_1") |
| | except Exception as idx_err: |
| | |
| | if "Unauthorized" not in str(idx_err): |
| | log.info("Skipping legacy index drop: %s", idx_err) |
| | except Exception as err: |
| | log.error("Failed to init admin Mongo client: %s", err) |
| | admin_media_clicks = None |
| |
|
| |
|
| | _init_admin_mongo() |
| |
|
| |
|
| | def _admin_logging_status() -> Dict[str, object]: |
| | if admin_media_clicks is None: |
| | return { |
| | "enabled": False, |
| | "db": None, |
| | "collection": None, |
| | } |
| | return { |
| | "enabled": True, |
| | "db": admin_media_clicks.database.name, |
| | "collection": admin_media_clicks.name, |
| | } |
| |
|
| |
|
| | def _save_upload_to_gridfs(upload: UploadFile, file_type: str) -> str: |
| | """Store an uploaded file into GridFS and return its ObjectId string.""" |
| | if grid_fs is None: |
| | raise HTTPException( |
| | status_code=503, |
| | detail="MongoDB/GridFS not configured. Set MONGO_URI or MONGODB_URI environment variable." |
| | ) |
| | data = upload.file.read() |
| | if not data: |
| | raise HTTPException(status_code=400, detail=f"{file_type} file is empty") |
| | oid = grid_fs.put( |
| | data, |
| | filename=upload.filename or f"{file_type}.bin", |
| | contentType=upload.content_type, |
| | metadata={"type": file_type}, |
| | ) |
| | return str(oid) |
| |
|
| |
|
| | def _read_gridfs_bytes(file_id: str, expected_type: str) -> bytes: |
| | """Fetch raw bytes from GridFS and validate the stored type metadata.""" |
| | if grid_fs is None: |
| | raise HTTPException( |
| | status_code=503, |
| | detail="MongoDB/GridFS not configured. Set MONGO_URI or MONGODB_URI environment variable." |
| | ) |
| | try: |
| | oid = ObjectId(file_id) |
| | except Exception: |
| | raise HTTPException(status_code=404, detail=f"{expected_type}_id invalid") |
| |
|
| | try: |
| | grid_out = grid_fs.get(oid) |
| | except NoFile: |
| | raise HTTPException(status_code=404, detail=f"{expected_type}_id not found") |
| |
|
| | meta = grid_out.metadata or {} |
| | stored_type = meta.get("type") |
| | if stored_type and stored_type != expected_type: |
| | raise HTTPException(status_code=404, detail=f"{expected_type}_id not found") |
| |
|
| | return grid_out.read() |
| |
|
| |
|
| | def _load_rgba_image_from_gridfs(file_id: str, expected_type: str) -> Image.Image: |
| | """Load an image from GridFS and convert to RGBA.""" |
| | data = _read_gridfs_bytes(file_id, expected_type) |
| | try: |
| | img = Image.open(BytesIO(data)) |
| | except UnidentifiedImageError: |
| | raise HTTPException(status_code=422, detail=f"{expected_type} is not a valid image") |
| | return img.convert("RGBA") |
| |
|
| |
|
| | def _build_ai_edit_daily_count( |
| | existing: Optional[List[Dict[str, object]]], |
| | today: date, |
| | ) -> List[Dict[str, object]]: |
| | """ |
| | Build / extend the ai_edit_daily_count array with the following rules: |
| | |
| | - Case A (no existing data): return [{date: today, count: 1}] |
| | - Case B (today already recorded): return list unchanged |
| | - Case C (gap in days): fill missing days with count=0 and append today with count=1 |
| | |
| | Additionally, the returned list is capped to the most recent 32 entries. |
| | |
| | The stored "date" value is a midnight UTC (naive UTC) datetime for the given day. |
| | """ |
| |
|
| | def _to_date_only(value: object) -> date: |
| | if isinstance(value, datetime): |
| | return value.date() |
| | if isinstance(value, date): |
| | return value |
| | |
| | try: |
| | text = str(value) |
| | if len(text) == 10: |
| | return datetime.strptime(text, "%Y-%m-%d").date() |
| | return datetime.fromisoformat(text).date() |
| | except Exception: |
| | |
| | return today |
| |
|
| | |
| | if not existing: |
| | return [ |
| | { |
| | "date": datetime(today.year, today.month, today.day), |
| | "count": 1, |
| | } |
| | ] |
| |
|
| | |
| | result: List[Dict[str, object]] = list(existing) |
| |
|
| | last_entry = result[-1] if result else None |
| | if not last_entry or "date" not in last_entry: |
| | |
| | return [ |
| | { |
| | "date": datetime(today.year, today.month, today.day), |
| | "count": 1, |
| | } |
| | ] |
| |
|
| | last_date = _to_date_only(last_entry["date"]) |
| |
|
| | |
| | if last_date > today: |
| | return result |
| |
|
| | |
| | if last_date == today: |
| | return result |
| |
|
| | |
| | cursor = last_date + timedelta(days=1) |
| | while cursor < today: |
| | result.append( |
| | { |
| | "date": datetime(cursor.year, cursor.month, cursor.day), |
| | "count": 0, |
| | } |
| | ) |
| | cursor += timedelta(days=1) |
| |
|
| | |
| | result.append( |
| | { |
| | "date": datetime(today.year, today.month, today.day), |
| | "count": 1, |
| | } |
| | ) |
| |
|
| | |
| | |
| | try: |
| | result.sort(key=lambda entry: _to_date_only(entry.get("date"))) |
| | except Exception: |
| | |
| | pass |
| |
|
| | |
| | if len(result) > 32: |
| | result = result[-32:] |
| |
|
| | return result |
| |
|
| | def bearer_auth(authorization: Optional[str] = Header(default=None)) -> None: |
| | if not ENV_TOKEN: |
| | return |
| | if authorization is None or not authorization.lower().startswith("bearer "): |
| | raise HTTPException(status_code=401, detail="Unauthorized") |
| | token = authorization.split(" ", 1)[1] |
| | if token != ENV_TOKEN: |
| | raise HTTPException(status_code=403, detail="Forbidden") |
| |
|
| |
|
| | class InpaintRequest(BaseModel): |
| | image_id: str |
| | mask_id: str |
| | invert_mask: bool = True |
| | passthrough: bool = False |
| | prompt: Optional[str] = None |
| | user_id: Optional[str] = None |
| | category_id: Optional[str] = None |
| | appname: Optional[str] = None |
| |
|
| |
|
| | class SimpleRemoveRequest(BaseModel): |
| | image_id: str |
| |
|
| |
|
| | def _coerce_object_id(value: Optional[str]) -> ObjectId: |
| | if value is None: |
| | return ObjectId() |
| | value_str = str(value).strip() |
| | if re.fullmatch(r"[0-9a-fA-F]{24}", value_str): |
| | return ObjectId(value_str) |
| | if value_str.isdigit(): |
| | hex_str = format(int(value_str), "x") |
| | if len(hex_str) > 24: |
| | hex_str = hex_str[-24:] |
| | hex_str = hex_str.rjust(24, "0") |
| | return ObjectId(hex_str) |
| | return ObjectId() |
| |
|
| |
|
| | def _coerce_category_id(category_id: Optional[str]) -> ObjectId: |
| | raw = category_id or DEFAULT_CATEGORY_ID |
| | raw_str = str(raw).strip() |
| | if re.fullmatch(r"[0-9a-fA-F]{24}", raw_str): |
| | return ObjectId(raw_str) |
| | return _coerce_object_id(raw_str) |
| |
|
| |
|
| | def log_media_click(user_id: Optional[str], category_id: Optional[str], appname: Optional[str] = None) -> None: |
| | """Log to admin media_clicks collection only if user_id is provided. |
| | |
| | If appname='collage-maker', logs to collage-maker MongoDB instead of regular admin MongoDB. |
| | If appname='AI-Enhancer' (case-insensitive), logs to AI-Enhancer MongoDB. |
| | """ |
| | |
| | target_media_clicks = None |
| | appname_lower = appname.lower() if appname else None |
| | |
| | if appname_lower == "collage-maker": |
| | target_media_clicks = collage_maker_media_clicks |
| | if target_media_clicks is None: |
| | log.warning("Collage-maker media_clicks not initialized, skipping log") |
| | return |
| | elif appname_lower == "ai-enhancer": |
| | target_media_clicks = ai_enhancer_media_clicks |
| | if target_media_clicks is None: |
| | log.warning("AI-Enhancer media_clicks not initialized, skipping log") |
| | return |
| | else: |
| | target_media_clicks = admin_media_clicks |
| | if target_media_clicks is None: |
| | return |
| | |
| | |
| | if not user_id or not user_id.strip(): |
| | return |
| | try: |
| | user_obj = _coerce_object_id(user_id) |
| | category_obj = _coerce_category_id(category_id) |
| | now = datetime.utcnow() |
| | today = now.date() |
| |
|
| | doc = target_media_clicks.find_one({"userId": user_obj}) |
| | if doc: |
| | existing_daily = doc.get("ai_edit_daily_count") |
| | updated_daily = _build_ai_edit_daily_count(existing_daily, today) |
| | categories = doc.get("categories") or [] |
| | if any(cat.get("categoryId") == category_obj for cat in categories): |
| | |
| | target_media_clicks.update_one( |
| | {"_id": doc["_id"], "categories.categoryId": category_obj}, |
| | { |
| | "$inc": { |
| | "categories.$.click_count": 1, |
| | "ai_edit_complete": 1, |
| | }, |
| | "$set": { |
| | "categories.$.lastClickedAt": now, |
| | "updatedAt": now, |
| | "ai_edit_last_date": now, |
| | "ai_edit_daily_count": updated_daily, |
| | }, |
| | }, |
| | ) |
| | else: |
| | |
| | target_media_clicks.update_one( |
| | {"_id": doc["_id"]}, |
| | { |
| | "$push": { |
| | "categories": { |
| | "categoryId": category_obj, |
| | "click_count": 1, |
| | "lastClickedAt": now, |
| | } |
| | }, |
| | "$inc": {"ai_edit_complete": 1}, |
| | "$set": { |
| | "updatedAt": now, |
| | "ai_edit_last_date": now, |
| | "ai_edit_daily_count": updated_daily, |
| | }, |
| | }, |
| | ) |
| | else: |
| | |
| | daily_for_new = _build_ai_edit_daily_count(None, today) |
| | target_media_clicks.update_one( |
| | {"userId": user_obj}, |
| | { |
| | "$setOnInsert": { |
| | "userId": user_obj, |
| | "categories": [ |
| | { |
| | "categoryId": category_obj, |
| | "click_count": 1, |
| | "lastClickedAt": now, |
| | } |
| | ], |
| | "createdAt": now, |
| | "ai_edit_daily_count": daily_for_new, |
| | }, |
| | "$inc": {"ai_edit_complete": 1}, |
| | "$set": { |
| | "updatedAt": now, |
| | "ai_edit_last_date": now, |
| | }, |
| | }, |
| | upsert=True, |
| | ) |
| | except Exception as err: |
| | err_str = str(err) |
| | appname_lower = appname.lower() if appname else None |
| | if appname_lower == "collage-maker": |
| | target_name = "collage-maker" |
| | elif appname_lower == "ai-enhancer": |
| | target_name = "AI-Enhancer" |
| | else: |
| | target_name = "admin" |
| | if "Unauthorized" in err_str or "not authorized" in err_str.lower(): |
| | log.warning( |
| | "%s media click logging failed (permissions): user lacks read/write on db=%s collection=%s. " |
| | "Check MongoDB user permissions.", |
| | target_name, |
| | target_media_clicks.database.name, |
| | target_media_clicks.name, |
| | ) |
| | else: |
| | log.warning("%s media click logging failed: %s", target_name, err) |
| |
|
| |
|
| | @app.get("/") |
| | def root() -> Dict[str, Any]: |
| | return { |
| | "success": True, |
| | "message": "Object Remover API", |
| | "data": { |
| | "version": "1.0.0", |
| | "product_name": "Beauty Camera - GlowCam AI Studio", |
| | "released_by": "LogicGo Infotech" |
| | } |
| | } |
| |
|
| |
|
| |
|
| | @app.get("/health") |
| | def health() -> Dict[str, str]: |
| | return {"status": "healthy"} |
| |
|
| |
|
| | @app.get("/logging-status") |
| | def logging_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: |
| | """Helper endpoint to verify admin media logging wiring (no secrets exposed).""" |
| | return _admin_logging_status() |
| |
|
| |
|
| | @app.get("/mongo-status") |
| | def mongo_status(_: None = Depends(bearer_auth)) -> Dict[str, object]: |
| | """Check MongoDB connection status and verify data storage.""" |
| | status = { |
| | "mongo_configured": MONGO_URI is not None, |
| | "mongo_connected": mongo_client is not None, |
| | "database": mongo_db.name if mongo_db else None, |
| | "collection": mongo_logs.name if mongo_logs else None, |
| | "admin_logging": _admin_logging_status(), |
| | } |
| | |
| | |
| | if mongo_logs is not None: |
| | try: |
| | count = mongo_logs.count_documents({}) |
| | status["api_logs_count"] = count |
| | |
| | latest_docs = list(mongo_logs.find().sort("timestamp", -1).limit(5)) |
| | status["recent_logs"] = [] |
| | for doc in latest_docs: |
| | doc_dict = { |
| | "_id": str(doc.get("_id")), |
| | "output_id": doc.get("output_id"), |
| | "status": doc.get("status"), |
| | "timestamp": doc.get("timestamp").isoformat() if isinstance(doc.get("timestamp"), datetime) else str(doc.get("timestamp")), |
| | } |
| | if "input_image_id" in doc: |
| | doc_dict["input_image_id"] = doc.get("input_image_id") |
| | if "input_mask_id" in doc: |
| | doc_dict["input_mask_id"] = doc.get("input_mask_id") |
| | if "error" in doc: |
| | doc_dict["error"] = doc.get("error") |
| | status["recent_logs"].append(doc_dict) |
| | |
| | |
| | if latest_docs: |
| | latest = latest_docs[0] |
| | status["latest_log"] = { |
| | "_id": str(latest.get("_id")), |
| | "output_id": latest.get("output_id"), |
| | "status": latest.get("status"), |
| | "timestamp": latest.get("timestamp").isoformat() if isinstance(latest.get("timestamp"), datetime) else str(latest.get("timestamp")), |
| | } |
| | except Exception as err: |
| | status["api_logs_error"] = str(err) |
| | log.error("Error querying MongoDB: %s", err, exc_info=True) |
| | |
| | return status |
| |
|
| |
|
| | @app.post("/upload-image") |
| | def upload_image(image: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: |
| | file_id = _save_upload_to_gridfs(image, "image") |
| | logs.append({"id": file_id, "filename": image.filename, "type": "image", "timestamp": datetime.utcnow().isoformat()}) |
| | return {"id": file_id, "filename": image.filename} |
| |
|
| |
|
| | @app.post("/upload-mask") |
| | def upload_mask(mask: UploadFile = File(...), _: None = Depends(bearer_auth)) -> Dict[str, str]: |
| | file_id = _save_upload_to_gridfs(mask, "mask") |
| | logs.append({"id": file_id, "filename": mask.filename, "type": "mask", "timestamp": datetime.utcnow().isoformat()}) |
| | return {"id": file_id, "filename": mask.filename} |
| |
|
| |
|
| | def _compress_image(image_path: str, output_path: str, quality: int = 85) -> None: |
| | """ |
| | Compress an image to reduce file size. |
| | Converts to JPEG format with specified quality to achieve smaller file size. |
| | """ |
| | img = Image.open(image_path) |
| | |
| | if img.mode == "RGBA": |
| | rgb_img = Image.new("RGB", img.size, (255, 255, 255)) |
| | rgb_img.paste(img, mask=img.split()[3]) |
| | img = rgb_img |
| | elif img.mode != "RGB": |
| | img = img.convert("RGB") |
| | |
| | |
| | img.save(output_path, "JPEG", quality=quality, optimize=True) |
| |
|
| |
|
| | def _load_rgba_mask_from_image(img: Image.Image) -> np.ndarray: |
| | """ |
| | Convert mask image to RGBA format (black/white mask). |
| | Standard convention: white (255) = area to remove, black (0) = area to keep |
| | Returns RGBA with white in RGB channels where removal is needed, alpha=255 |
| | """ |
| | if img.mode != "RGBA": |
| | |
| | gray = img.convert("L") |
| | arr = np.array(gray) |
| | |
| | mask_bw = np.where(arr > 128, 255, 0).astype(np.uint8) |
| | |
| | rgba = np.zeros((img.height, img.width, 4), dtype=np.uint8) |
| | rgba[:, :, 0] = mask_bw |
| | rgba[:, :, 1] = mask_bw |
| | rgba[:, :, 2] = mask_bw |
| | rgba[:, :, 3] = 255 |
| | log.info(f"Loaded {img.mode} mask: {int((mask_bw > 0).sum())} white pixels (to remove)") |
| | return rgba |
| | |
| | |
| | arr = np.array(img) |
| | alpha = arr[:, :, 3] |
| | rgb = arr[:, :, :3] |
| | |
| | |
| | if alpha.mean() > 200: |
| | |
| | gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY) |
| | |
| | magenta = np.all(rgb == [255, 0, 255], axis=2).astype(np.uint8) * 255 |
| | mask_bw = np.maximum(np.where(gray > 128, 255, 0).astype(np.uint8), magenta) |
| | |
| | rgba = arr.copy() |
| | rgba[:, :, 0] = mask_bw |
| | rgba[:, :, 1] = mask_bw |
| | rgba[:, :, 2] = mask_bw |
| | rgba[:, :, 3] = 255 |
| | log.info(f"Loaded RGBA mask (RGB-based): {int((mask_bw > 0).sum())} white pixels (to remove)") |
| | return rgba |
| | |
| | |
| | |
| | mask_bw = np.where(alpha < 128, 255, 0).astype(np.uint8) |
| | rgba = arr.copy() |
| | rgba[:, :, 0] = mask_bw |
| | rgba[:, :, 1] = mask_bw |
| | rgba[:, :, 2] = mask_bw |
| | rgba[:, :, 3] = 255 |
| | log.info(f"Loaded RGBA mask (alpha-based): {int((mask_bw > 0).sum())} white pixels (to remove)") |
| | return rgba |
| |
|
| | @app.post("/inpaint") |
| | def inpaint(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: |
| | start_time = time.time() |
| | status = "success" |
| | error_msg = None |
| | output_name = None |
| | compressed_url = None |
| |
|
| | try: |
| | |
| | category_id = req.category_id |
| | if req.appname == "collage-maker" and not category_id: |
| | category_id = get_category_id_from_collage_maker() |
| | if category_id: |
| | log.info("Using category_id from collage-maker: %s", category_id) |
| |
|
| | img_rgba = _load_rgba_image_from_gridfs(req.image_id, "image") |
| | mask_img = _load_rgba_image_from_gridfs(req.mask_id, "mask") |
| | mask_rgba = _load_rgba_mask_from_image(mask_img) |
| |
|
| | if req.passthrough: |
| | result = np.array(img_rgba.convert("RGB")) |
| | else: |
| | result = process_inpaint( |
| | np.array(img_rgba), |
| | mask_rgba, |
| | invert_mask=req.invert_mask, |
| | prompt=req.prompt, |
| | ) |
| |
|
| | output_name = f"output_{uuid.uuid4().hex}.png" |
| | output_path = os.path.join(OUTPUT_DIR, output_name) |
| |
|
| | Image.fromarray(result).save( |
| | output_path, "PNG", optimize=False, compress_level=1 |
| | ) |
| |
|
| | |
| | compressed_name = f"compressed_{output_name.replace('.png', '.jpg')}" |
| | compressed_path = os.path.join(OUTPUT_DIR, compressed_name) |
| | try: |
| | |
| | |
| | |
| | _compress_image(output_path, compressed_path, quality=85) |
| | compressed_url = str( |
| | request.url_for("download_file", filename=compressed_name) |
| | ).replace("http://", "https://") |
| | except Exception as compress_err: |
| | log.warning("Failed to create compressed image: %s", compress_err) |
| | compressed_url = None |
| |
|
| | log_media_click(req.user_id, category_id, req.appname) |
| | response = {"result": output_name} |
| | if compressed_url: |
| | response["Compressed_Image_URL"] = compressed_url |
| | return response |
| |
|
| | except Exception as e: |
| | status = "fail" |
| | error_msg = str(e) |
| | raise |
| |
|
| | finally: |
| | end_time = time.time() |
| | response_time_ms = (end_time - start_time) * 1000 |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | log_doc = { |
| | "endpoint": "/inpaint", |
| | "status": status, |
| | "response_time_ms": float(response_time_ms), |
| | "timestamp": datetime.utcnow(), |
| | "appname": req.appname if req.appname else "None", |
| | "error": error_msg |
| | } |
| |
|
| | |
| | if req.appname: |
| | log_doc["appname"] = req.appname |
| |
|
| | if error_msg: |
| | log_doc["error"] = error_msg |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if api_logs_collection is not None: |
| | try: |
| | api_logs_collection.insert_one(log_doc) |
| | log.info("API log inserted into logs/objectRemover") |
| | except Exception as e: |
| | log.error("Failed to insert API log: %s", e) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| |
|
| | @app.post("/inpaint-url") |
| | def inpaint_url(req: InpaintRequest, request: Request, _: None = Depends(bearer_auth)) -> Dict[str, str]: |
| | """Same as /inpaint but returns a JSON with a public download URL instead of image bytes.""" |
| | start_time = time.time() |
| | status = "success" |
| | error_msg = None |
| | result_name = None |
| |
|
| | try: |
| | |
| | category_id = req.category_id |
| | if req.appname == "collage-maker" and not category_id: |
| | category_id = get_category_id_from_collage_maker() |
| | if category_id: |
| | log.info("Using category_id from collage-maker: %s", category_id) |
| |
|
| | img_rgba = _load_rgba_image_from_gridfs(req.image_id, "image") |
| | mask_img = _load_rgba_image_from_gridfs(req.mask_id, "mask") |
| | mask_rgba = _load_rgba_mask_from_image(mask_img) |
| |
|
| | if req.passthrough: |
| | result = np.array(img_rgba.convert("RGB")) |
| | else: |
| | result = process_inpaint( |
| | np.array(img_rgba), |
| | mask_rgba, |
| | invert_mask=req.invert_mask, |
| | prompt=req.prompt, |
| | ) |
| | result_name = f"output_{uuid.uuid4().hex}.png" |
| | result_path = os.path.join(OUTPUT_DIR, result_name) |
| | Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| |
|
| | url = str(request.url_for("download_file", filename=result_name)) |
| | logs.append({"result": result_name, "url": url, "timestamp": datetime.utcnow().isoformat()}) |
| | log_media_click(req.user_id, category_id, req.appname) |
| | return {"result": result_name, "url": url} |
| | except Exception as e: |
| | status = "fail" |
| | error_msg = str(e) |
| | raise |
| | finally: |
| | |
| | end_time = time.time() |
| | response_time_ms = (end_time - start_time) * 1000 |
| | log_doc = { |
| | "input_image_id": req.image_id, |
| | "input_mask_id": req.mask_id, |
| | "output_id": result_name, |
| | "status": status, |
| | "timestamp": datetime.utcnow(), |
| | "ts": int(time.time()), |
| | "response_time_ms": response_time_ms, |
| | } |
| | |
| | if req.appname: |
| | log_doc["appname"] = req.appname |
| | if error_msg: |
| | log_doc["error"] = error_msg |
| | if mongo_logs is not None: |
| | try: |
| | log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) |
| | result = mongo_logs.insert_one(log_doc) |
| | log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", |
| | result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) |
| | |
| | |
| | try: |
| | verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) |
| | if verify_doc: |
| | log.info("Verified: Document exists in MongoDB after insert") |
| | else: |
| | log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) |
| | except Exception as verify_err: |
| | log.warning("Could not verify insert: %s", verify_err) |
| | except Exception as mongo_err: |
| | log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) |
| | else: |
| | log.warning("MongoDB not configured, skipping log insert") |
| |
|
| |
|
| | @app.post("/inpaint-multipart") |
| | def inpaint_multipart( |
| | image: UploadFile = File(...), |
| | mask: UploadFile = File(...), |
| | request: Request = None, |
| | invert_mask: bool = True, |
| | mask_is_painted: bool = False, |
| | passthrough: bool = False, |
| | prompt: Optional[str] = Form(None), |
| | user_id: Optional[str] = Form(None), |
| | category_id: Optional[str] = Form(None), |
| | appname: Optional[str] = Form(None), |
| | _: None = Depends(bearer_auth), |
| | ) -> Dict[str, str]: |
| | start_time = time.time() |
| | status = "success" |
| | error_msg = None |
| | result_name = None |
| | |
| | try: |
| | |
| | final_category_id = category_id |
| | if appname == "collage-maker" and not final_category_id: |
| | final_category_id = get_category_id_from_collage_maker() |
| | if final_category_id: |
| | log.info("Using category_id from collage-maker: %s", final_category_id) |
| | |
| | |
| | img = Image.open(image.file).convert("RGBA") |
| | m = Image.open(mask.file).convert("RGBA") |
| |
|
| | if passthrough: |
| | |
| | result = np.array(img.convert("RGB")) |
| | result_name = f"output_{uuid.uuid4().hex}.png" |
| | result_path = os.path.join(OUTPUT_DIR, result_name) |
| | Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| |
|
| | url: Optional[str] = None |
| | try: |
| | if request is not None: |
| | url = str(request.url_for("download_file", filename=result_name)) |
| | except Exception: |
| | url = None |
| |
|
| | entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} |
| | if url: |
| | entry["url"] = url |
| | logs.append(entry) |
| | resp: Dict[str, str] = {"result": result_name} |
| | if url: |
| | resp["url"] = url |
| | log_media_click(user_id, final_category_id, appname) |
| | return resp |
| |
|
| | if mask_is_painted: |
| | |
| | |
| | log.info("Auto-detecting pink/magenta paint from uploaded image...") |
| | |
| | m_rgb = cv2.cvtColor(np.array(m), cv2.COLOR_RGBA2RGB) |
| | |
| | |
| | lower = np.array([150, 0, 100], dtype=np.uint8) |
| | upper = np.array([255, 120, 255], dtype=np.uint8) |
| | magenta_detected = ( |
| | (m_rgb[:, :, 0] >= lower[0]) & (m_rgb[:, :, 0] <= upper[0]) & |
| | (m_rgb[:, :, 1] >= lower[1]) & (m_rgb[:, :, 1] <= upper[1]) & |
| | (m_rgb[:, :, 2] >= lower[2]) & (m_rgb[:, :, 2] <= upper[2]) |
| | ).astype(np.uint8) * 255 |
| | |
| | |
| | if img is not None: |
| | img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) |
| | if img_rgb.shape == m_rgb.shape: |
| | diff = cv2.absdiff(img_rgb, m_rgb) |
| | gray_diff = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) |
| | |
| | diff_mask = (gray_diff > 50).astype(np.uint8) * 255 |
| | |
| | binmask = cv2.bitwise_or(magenta_detected, diff_mask) |
| | else: |
| | binmask = magenta_detected |
| | else: |
| | |
| | binmask = magenta_detected |
| | |
| | |
| | kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) |
| | |
| | binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) |
| | |
| | binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) |
| | |
| | nonzero = int((binmask > 0).sum()) |
| | log.info("Pink/magenta paint detected: %d pixels marked for removal (white)", nonzero) |
| | |
| | |
| | |
| | |
| | if nonzero < 50: |
| | log.error("CRITICAL: Could not detect pink/magenta paint! Returning original image.") |
| | result = np.array(img.convert("RGB")) if img else np.array(m.convert("RGB")) |
| | result_name = f"output_{uuid.uuid4().hex}.png" |
| | result_path = os.path.join(OUTPUT_DIR, result_name) |
| | Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| | return {"result": result_name, "error": "pink/magenta paint detection failed - very few pixels detected"} |
| | |
| | |
| | |
| | |
| | |
| | |
| | mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) |
| | mask_rgba[:, :, 0] = binmask |
| | mask_rgba[:, :, 1] = binmask |
| | mask_rgba[:, :, 2] = binmask |
| | |
| | mask_rgba[:, :, 3] = 255 - binmask |
| | |
| | log.info("Successfully created binary mask: %d pink pixels → white (255), %d pixels → black (0)", |
| | nonzero, binmask.shape[0] * binmask.shape[1] - nonzero) |
| | else: |
| | mask_rgba = _load_rgba_mask_from_image(m) |
| |
|
| | |
| | actual_invert = invert_mask |
| | log.info("Using invert_mask=%s (mask_is_painted=%s)", actual_invert, mask_is_painted) |
| | |
| | result = process_inpaint( |
| | np.array(img), |
| | mask_rgba, |
| | invert_mask=actual_invert, |
| | prompt=prompt, |
| | ) |
| | result_name = f"output_{uuid.uuid4().hex}.png" |
| | result_path = os.path.join(OUTPUT_DIR, result_name) |
| | Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| |
|
| | url: Optional[str] = None |
| | try: |
| | if request is not None: |
| | url = str(request.url_for("download_file", filename=result_name)) |
| | except Exception: |
| | url = None |
| |
|
| | entry: Dict[str, str] = {"result": result_name, "timestamp": datetime.utcnow().isoformat()} |
| | if url: |
| | entry["url"] = url |
| | logs.append(entry) |
| | resp: Dict[str, str] = {"result": result_name} |
| | if url: |
| | resp["url"] = url |
| | log_media_click(user_id, final_category_id, appname) |
| | return resp |
| | except Exception as e: |
| | status = "fail" |
| | error_msg = str(e) |
| | raise |
| | finally: |
| | |
| | end_time = time.time() |
| | response_time_ms = (end_time - start_time) * 1000 |
| | log_doc = { |
| | "endpoint": "inpaint-multipart", |
| | "output_id": result_name, |
| | "status": status, |
| | "timestamp": datetime.utcnow(), |
| | "ts": int(time.time()), |
| | "response_time_ms": response_time_ms, |
| | } |
| | |
| | if appname: |
| | log_doc["appname"] = appname |
| | if error_msg: |
| | log_doc["error"] = error_msg |
| | if mongo_logs is not None: |
| | try: |
| | log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) |
| | result = mongo_logs.insert_one(log_doc) |
| | log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", |
| | result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) |
| | |
| | |
| | try: |
| | verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) |
| | if verify_doc: |
| | log.info("Verified: Document exists in MongoDB after insert") |
| | else: |
| | log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) |
| | except Exception as verify_err: |
| | log.warning("Could not verify insert: %s", verify_err) |
| | except Exception as mongo_err: |
| | log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) |
| | else: |
| | log.warning("MongoDB not configured, skipping log insert") |
| |
|
| |
|
| | @app.post("/remove-pink") |
| | def remove_pink_segments( |
| | image: UploadFile = File(...), |
| | request: Request = None, |
| | user_id: Optional[str] = Form(None), |
| | category_id: Optional[str] = Form(None), |
| | appname: Optional[str] = Form(None), |
| | _: None = Depends(bearer_auth), |
| | ) -> Dict[str, str]: |
| | """ |
| | Simple endpoint: upload an image with pink/magenta segments to remove. |
| | - Pink/Magenta segments → automatically removed (white in mask) |
| | - Everything else → automatically kept (black in mask) |
| | Just paint pink/magenta on areas you want to remove, upload the image, and it works! |
| | """ |
| | start_time = time.time() |
| | status = "success" |
| | error_msg = None |
| | result_name = None |
| | |
| | try: |
| | |
| | final_category_id = category_id |
| | if appname == "collage-maker" and not final_category_id: |
| | final_category_id = get_category_id_from_collage_maker() |
| | if final_category_id: |
| | log.info("Using category_id from collage-maker: %s", final_category_id) |
| | |
| | log.info(f"Simple remove-pink: processing image {image.filename}") |
| | |
| | |
| | img = Image.open(image.file).convert("RGBA") |
| | img_rgb = cv2.cvtColor(np.array(img), cv2.COLOR_RGBA2RGB) |
| | |
| | |
| | |
| | |
| | |
| | |
| | lower = np.array([150, 0, 100], dtype=np.uint8) |
| | upper = np.array([255, 120, 255], dtype=np.uint8) |
| | binmask = ( |
| | (img_rgb[:, :, 0] >= lower[0]) & (img_rgb[:, :, 0] <= upper[0]) & |
| | (img_rgb[:, :, 1] >= lower[1]) & (img_rgb[:, :, 1] <= upper[1]) & |
| | (img_rgb[:, :, 2] >= lower[2]) & (img_rgb[:, :, 2] <= upper[2]) |
| | ).astype(np.uint8) * 255 |
| | |
| | |
| | kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) |
| | binmask = cv2.morphologyEx(binmask, cv2.MORPH_CLOSE, kernel, iterations=2) |
| | binmask = cv2.morphologyEx(binmask, cv2.MORPH_OPEN, kernel, iterations=1) |
| | |
| | nonzero = int((binmask > 0).sum()) |
| | total_pixels = binmask.shape[0] * binmask.shape[1] |
| | log.info(f"Detected {nonzero} pink pixels ({100*nonzero/total_pixels:.2f}% of image) to remove") |
| | |
| | |
| | log.info("Pink detection bounds used: lower=[150,0,100], upper=[255,120,255]") |
| | |
| | if nonzero < 50: |
| | log.error("No pink segments detected! Returning original image.") |
| | result = np.array(img.convert("RGB")) |
| | result_name = f"output_{uuid.uuid4().hex}.png" |
| | result_path = os.path.join(OUTPUT_DIR, result_name) |
| | Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| | return { |
| | "result": result_name, |
| | "error": "No pink/magenta segments detected. Please paint areas to remove with magenta/pink color (RGB 255,0,255)." |
| | } |
| | |
| | |
| | |
| | |
| | |
| | |
| | mask_rgba = np.zeros((binmask.shape[0], binmask.shape[1], 4), dtype=np.uint8) |
| | |
| | mask_rgba[:, :, 0] = binmask |
| | mask_rgba[:, :, 1] = binmask |
| | mask_rgba[:, :, 2] = binmask |
| | |
| | |
| | mask_rgba[:, :, 3] = 255 - binmask |
| | |
| | |
| | alpha_zero_count = int((mask_rgba[:,:,3] == 0).sum()) |
| | alpha_255_count = int((mask_rgba[:,:,3] == 255).sum()) |
| | total_pixels = binmask.shape[0] * binmask.shape[1] |
| | log.info(f"Mask encoding: {alpha_zero_count} pixels with alpha=0 (pink), {alpha_255_count} pixels with alpha=255 (keep)") |
| | log.info(f"After 255-alpha conversion: {alpha_zero_count} will become white (255/remove), {alpha_255_count} will become black (0/keep)") |
| | |
| | |
| | |
| | |
| | img_clean = np.array(img.convert("RGBA")) |
| | |
| | |
| | |
| | |
| | |
| | log.info(f"Starting inpainting process...") |
| | result = process_inpaint(img_clean, mask_rgba, invert_mask=True) |
| | log.info(f"Inpainting complete, result shape: {result.shape}") |
| | result_name = f"output_{uuid.uuid4().hex}.png" |
| | result_path = os.path.join(OUTPUT_DIR, result_name) |
| | Image.fromarray(result).save(result_path, "PNG", optimize=False, compress_level=1) |
| | |
| | url: Optional[str] = None |
| | try: |
| | if request is not None: |
| | url = str(request.url_for("download_file", filename=result_name)) |
| | except Exception: |
| | url = None |
| | |
| | logs.append({ |
| | "result": result_name, |
| | "filename": image.filename, |
| | "pink_pixels": nonzero, |
| | "timestamp": datetime.utcnow().isoformat() |
| | }) |
| | |
| | resp: Dict[str, str] = {"result": result_name, "pink_segments_detected": str(nonzero)} |
| | if url: |
| | resp["url"] = url |
| | log_media_click(user_id, final_category_id, appname) |
| | return resp |
| | except Exception as e: |
| | status = "fail" |
| | error_msg = str(e) |
| | raise |
| | finally: |
| | |
| | end_time = time.time() |
| | response_time_ms = (end_time - start_time) * 1000 |
| | log_doc = { |
| | "endpoint": "remove-pink", |
| | "output_id": result_name, |
| | "status": status, |
| | "timestamp": datetime.utcnow(), |
| | "ts": int(time.time()), |
| | "response_time_ms": response_time_ms, |
| | } |
| | |
| | if appname: |
| | log_doc["appname"] = appname |
| | if error_msg: |
| | log_doc["error"] = error_msg |
| | if mongo_logs is not None: |
| | try: |
| | log.info("Inserting log to MongoDB - Database: %s, Collection: %s", mongo_logs.database.name, mongo_logs.name) |
| | result = mongo_logs.insert_one(log_doc) |
| | log.info("Mongo log inserted successfully: _id=%s, output_id=%s, status=%s, db=%s, collection=%s", |
| | result.inserted_id, output_name, status, mongo_logs.database.name, mongo_logs.name) |
| | |
| | |
| | try: |
| | verify_doc = mongo_logs.find_one({"_id": result.inserted_id}) |
| | if verify_doc: |
| | log.info("Verified: Document exists in MongoDB after insert") |
| | else: |
| | log.error("WARNING: Document not found after insert! _id=%s", result.inserted_id) |
| | except Exception as verify_err: |
| | log.warning("Could not verify insert: %s", verify_err) |
| | except Exception as mongo_err: |
| | log.error("Mongo log insert failed: %s, log_doc=%s", mongo_err, log_doc, exc_info=True) |
| | else: |
| | log.warning("MongoDB not configured, skipping log insert") |
| |
|
| |
|
| | @app.get("/download/{filename}") |
| | def download_file(filename: str): |
| | path = os.path.join(OUTPUT_DIR, filename) |
| | if not os.path.isfile(path): |
| | raise HTTPException(status_code=404, detail="file not found") |
| | return FileResponse(path) |
| |
|
| |
|
| | @app.get("/result/{filename}") |
| | def view_result(filename: str): |
| | """View result image directly in browser (same as download but with proper content-type for viewing)""" |
| | path = os.path.join(OUTPUT_DIR, filename) |
| | if not os.path.isfile(path): |
| | raise HTTPException(status_code=404, detail="file not found") |
| | return FileResponse(path, media_type="image/png") |
| |
|
| |
|
| | @app.get("/logs") |
| | def get_logs(_: None = Depends(bearer_auth)) -> JSONResponse: |
| | return JSONResponse(content=logs) |
| |
|