Spaces:
Running
Running
| import os | |
| import json | |
| from pathlib import Path | |
| from fastapi import HTTPException | |
| import cv2 | |
| import numpy as np | |
| from datetime import datetime | |
| from exif import Image as ExifImage | |
| from io import BytesIO | |
| # ---------------- CONFIG IMPORTS ---------------- | |
| from .config import ( | |
| DETECT_MODEL, | |
| BUCK_DOE_MODEL, | |
| BUCK_TYPE_MODEL, | |
| ALLOWED_EXTENSIONS, | |
| MIN_IMAGES, | |
| MAX_IMAGES, | |
| UPLOAD_DIR, | |
| STORAGE_BACKEND, | |
| gcs_bucket, | |
| GCS_UPLOAD_DIR, | |
| logger | |
| ) | |
| # ---------------- VALIDATION ---------------- | |
| def validate_form(user_id, camera_name, images): | |
| if not user_id or not user_id.strip(): | |
| raise HTTPException(400, "user_id is required") | |
| if not camera_name or not camera_name.strip(): | |
| raise HTTPException(400, "camera_name is required") | |
| if not images or len(images) == 0: | |
| raise HTTPException(400, "At least one image is required") | |
| images = [f for f in images if f.filename and f.filename.strip()] | |
| if len(images) < MIN_IMAGES: | |
| raise HTTPException(400, f"At least {MIN_IMAGES} image(s) required") | |
| if len(images) > MAX_IMAGES: | |
| raise HTTPException(400, f"Maximum {MAX_IMAGES} images allowed") | |
| for f in images: | |
| if "." not in f.filename: | |
| raise HTTPException(400, f"Invalid file: {f.filename}") | |
| ext = f.filename.rsplit(".", 1)[1].lower() | |
| if ext not in ALLOWED_EXTENSIONS: | |
| raise HTTPException(400, f"Invalid file type: {f.filename}") | |
| return images | |
| def make_json_safe(value): | |
| """Convert EXIF values to JSON-serializable types""" | |
| if hasattr(value, "name"): | |
| return value.name | |
| if isinstance(value, (bytes, bytearray)): | |
| return value.decode(errors="ignore") | |
| if isinstance(value, (tuple, list)): | |
| return [make_json_safe(v) for v in value] | |
| if not isinstance(value, (str, int, float, bool, type(None))): | |
| return str(value) | |
| return value | |
| def extract_metadata(image_bytes): | |
| metadata = { | |
| "upload_datetime": datetime.utcnow().isoformat() + "Z" | |
| } | |
| try: | |
| exif_img = ExifImage(BytesIO(image_bytes)) | |
| if not exif_img.has_exif: | |
| return metadata | |
| exif_dict = {} | |
| for tag in exif_img.list_all(): | |
| try: | |
| value = getattr(exif_img, tag) | |
| value = make_json_safe(value) | |
| if value not in ("", None, [], {}): | |
| exif_dict[tag] = value | |
| except Exception: | |
| continue | |
| if exif_dict: | |
| metadata["exif"] = exif_dict | |
| except Exception: | |
| pass | |
| return metadata | |
| # ---------------- IMAGE PROCESSING ---------------- | |
| def process_image(image): | |
| """Run 3-stage detection and classification with dynamic confidence""" | |
| detections = [] | |
| results = DETECT_MODEL(image,conf=0.8 ,iou=0.4,agnostic_nms=True) # Stage 1: Deer detection | |
| for r in results: | |
| for box in r.boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| crop = image[y1:y2, x1:x2] | |
| if crop.size == 0: | |
| continue | |
| # ---------------- Stage 2: Buck/Doe ---------------- | |
| buck_res = BUCK_DOE_MODEL(crop) | |
| buck_probs = buck_res[0].probs | |
| top1_idx = buck_probs.top1 | |
| buck_name = buck_res[0].names[top1_idx] | |
| buck_conf = float(buck_probs.top1conf) | |
| if buck_name.lower() == "buck": | |
| # ---------------- Stage 3: Buck Type ---------------- | |
| type_res = BUCK_TYPE_MODEL(crop) | |
| type_probs = type_res[0].probs | |
| top1_type_idx = type_probs.top1 | |
| type_name = type_res[0].names[top1_type_idx] | |
| type_conf = float(type_probs.top1conf) | |
| label = f"Deer | Buck | {type_name}" | |
| final_conf = type_conf | |
| else: | |
| # Doe: use stage 2 confidence | |
| label = f"Deer | Doe " | |
| final_conf = buck_conf | |
| detections.append({ | |
| "label": label, | |
| "bbox": [x1, y1, x2, y2], | |
| "confidence": final_conf | |
| }) | |
| return detections | |
| # ---------------- CAMERA VALIDATION ---------------- | |
| def validate_user_and_camera(user_id: str, camera_name: str): | |
| if not user_exists(user_id): | |
| raise HTTPException(404, "User not found") | |
| cameras = load_cameras(user_id) | |
| if not any(c["camera_name"] == camera_name for c in cameras): | |
| raise HTTPException(404, "Camera not registered") | |
| # ---------------- IMAGE SAVE ---------------- | |
| def save_image(user_id, camera_name, filename, data): | |
| path = BASE_DIR / user_id / camera_name / "raw" | |
| path.mkdir(parents=True, exist_ok=True) | |
| local_path = path / filename | |
| with open(local_path, "wb") as f: | |
| f.write(data) | |
| if STORAGE_BACKEND == "gcs" and gcs_bucket: | |
| blob = gcs_bucket.blob(f"{GCS_UPLOAD_DIR}{user_id}/{camera_name}/{filename}") | |
| blob.upload_from_filename(local_path) | |
| return blob.public_url | |
| return f"/user_data/{user_id}/{camera_name}/raw/{filename}" | |
| # ---------------- JSON ---------------- | |
| def load_json(path): | |
| if Path(path).exists(): | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| return [] | |
| def save_json(path, data): | |
| with open(path, "w") as f: | |
| json.dump(data, f, indent=4) | |
| # ---------------- USER FOLDERS / CAMERAS ---------------- | |
| BASE_DIR = Path(UPLOAD_DIR) | |
| BASE_DIR.mkdir(exist_ok=True) | |
| def get_user_folder(user_id: str) -> Path: | |
| """Return path to user's folder WITHOUT creating it""" | |
| return BASE_DIR / f"{user_id}" | |
| def get_user_file(user_id: str) -> Path: | |
| """Return path to user's cameras.json WITHOUT creating it""" | |
| return get_user_folder(user_id) / "cameras.json" | |
| def user_exists(user_id: str) -> bool: | |
| return get_user_file(user_id).exists() | |
| def load_cameras(user_id: str) -> list: | |
| path = get_user_file(user_id) | |
| if not path.exists(): | |
| return [] | |
| try: | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| except json.JSONDecodeError: | |
| return [] | |
| def save_cameras(user_id: str, cameras: list): | |
| # Folder only created when we are saving ( Add Camera) | |
| folder = get_user_folder(user_id) | |
| folder.mkdir(exist_ok=True) | |
| with open(folder / "cameras.json", "w") as f: | |
| json.dump(cameras, f, indent=2) | |
| #>>>>>>>>dashboard>>>>>>>>>>>> | |
| def get_user_dashboard(user_id: str, camera_name: str = None) -> dict: | |
| """Return analytics for a user or a specific camera""" | |
| user_folder = Path(UPLOAD_DIR) / user_id | |
| cameras_file = user_folder / "cameras.json" | |
| if not cameras_file.exists(): | |
| raise HTTPException(404, f"User {user_id} not found") | |
| try: | |
| with open(cameras_file, "r") as f: | |
| cameras = json.load(f) | |
| except json.JSONDecodeError: | |
| cameras = [] | |
| total_cameras = len(cameras) | |
| total_images = 0 | |
| total_detections = 0 | |
| buck_type_distribution = {} | |
| buck_doe_distribution = {"Buck": 0, "Doe": 0} | |
| # New dashboard analytics | |
| from collections import defaultdict, Counter | |
| from datetime import datetime | |
| heatmap = defaultdict(lambda: [0]*24) # day -> 24 hours | |
| deer_per_day = Counter() | |
| bucks_per_day = Counter() | |
| does_per_day = Counter() | |
| hour_activity = [0]*24 # 0-23 hours | |
| for cam in cameras: | |
| cam_name = cam["camera_name"] | |
| if camera_name and cam_name != camera_name: | |
| continue | |
| raw_folder = user_folder / cam_name / "raw" | |
| detections_file = user_folder / cam_name / f"{cam_name}_detections.json" | |
| # Count images | |
| if raw_folder.exists(): | |
| total_images += len(list(raw_folder.glob("*.*"))) | |
| # Count detections and distributions | |
| if detections_file.exists(): | |
| try: | |
| dets = json.load(open(detections_file, "r")) | |
| for rec in dets: | |
| # --- Existing Buck/Doe counts --- | |
| for d in rec.get("detections", []): | |
| total_detections += 1 | |
| label = d.get("label", "") | |
| if "|" in label: | |
| parts = [p.strip() for p in label.split("|")] | |
| if len(parts) == 3: # Buck with type | |
| buck_doe_distribution["Buck"] += 1 | |
| buck_type_distribution[parts[2]] = buck_type_distribution.get(parts[2], 0) + 1 | |
| else: # Doe | |
| buck_doe_distribution["Doe"] += 1 | |
| # --- New analytics using datetime_original --- | |
| dt_str = rec.get("metadata", {}).get("exif", {}).get("datetime_original") | |
| if dt_str: | |
| dt = datetime.strptime(dt_str, "%Y:%m:%d %H:%M:%S") | |
| day = dt.date() | |
| hour = dt.hour | |
| # Heatmap count | |
| heatmap[day][hour] += len(rec.get("detections", [])) | |
| # Count deer, bucks, does per day | |
| for d in rec.get("detections", []): | |
| label = d.get("label", "") | |
| if "Deer" in label: | |
| deer_per_day[day] += 1 | |
| if "Buck" in label: | |
| bucks_per_day[day] += 1 | |
| if "Doe" in label: | |
| does_per_day[day] += 1 | |
| # Hourly aggregated activity | |
| hour_activity[hour] += len(rec.get("detections", [])) | |
| except json.JSONDecodeError: | |
| continue | |
| # Average activity by hour (morning/night) | |
| morning_hours = range(6, 18) | |
| night_hours = list(range(0,6)) + list(range(18,24)) | |
| morning_activity = sum(hour_activity[h] for h in morning_hours) / len(morning_hours) | |
| night_activity = sum(hour_activity[h] for h in night_hours) / len(night_hours) | |
| return { | |
| "user_id": user_id, | |
| "selected_camera": camera_name, | |
| "total_cameras": total_cameras, | |
| "images_uploaded": total_images, | |
| "total_detections": total_detections, | |
| "buck_type_distribution": buck_type_distribution, | |
| "buck_doe_distribution": buck_doe_distribution, | |
| # --- New analytics --- | |
| "activity_heatmap": dict(heatmap), | |
| "deer_per_day": dict(deer_per_day), | |
| "bucks_per_day": dict(bucks_per_day), | |
| "does_per_day": dict(does_per_day), | |
| "average_activity": { | |
| "morning": round(morning_activity,2), | |
| "night": round(night_activity,2) | |
| } | |
| } | |