import os import sys import cv2 import time import asyncio import logging import argparse import threading import numpy as np from enum import Enum from typing import Optional, Dict, Any, Callable, Tuple, List from dataclasses import dataclass, field from datetime import datetime from PIL import Image # ───────────────────────────────────────────────────────────────── # Logging # ───────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s │ %(name)-12s │ %(levelname)-7s │ %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("detection") # ───────────────────────────────────────────────────────────────── # Data Models # ───────────────────────────────────────────────────────────────── class Verdict(str, Enum): PASS = "PASS" FAIL = "FAIL" UNKNOWN = "UNKNOWN" ERROR = "ERROR" @dataclass class QualityMetrics: """Image quality measurements.""" brightness: float = 0.0 contrast: float = 0.0 sharpness: float = 0.0 is_blurred: bool = False resolution: Tuple[int, int] = (0, 0) @property def quality_score(self) -> float: return min(100.0, self.sharpness / 2.0) @dataclass class SegmentedROI: """A detected region of interest from segmentation.""" bbox: Tuple[int, int, int, int] # x, y, w, h contour: Any = None cropped_image: Optional[Image.Image] = None mask: Optional[np.ndarray] = None area: float = 0.0 circularity: float = 0.0 label: str = "part" @dataclass class DetectionResult: """Complete result of a single detection pass.""" verdict: Verdict = Verdict.UNKNOWN confidence: float = 0.0 matched_class: str = "" quality: QualityMetrics = field(default_factory=QualityMetrics) visualization_b64: Optional[str] = None all_scores: Dict[str, float] = field(default_factory=dict) segments_found: int = 0 status_detail: str = "" timestamp: str = "" elapsed_ms: float = 0.0 def to_dict(self) -> Dict[str, Any]: return { "verdict": self.verdict.value, "confidence": round(self.confidence, 4), "matched_class": self.matched_class, "quality": { "brightness": round(self.quality.brightness, 2), "contrast": round(self.quality.contrast, 2), "sharpness": round(self.quality.sharpness, 2), "is_blurred": self.quality.is_blurred, "quality_score": round(self.quality.quality_score, 2), "resolution": list(self.quality.resolution), }, "visualization": self.visualization_b64, "all_scores": self.all_scores, "segments_found": self.segments_found, "status_detail": self.status_detail, "timestamp": self.timestamp, "elapsed_ms": round(self.elapsed_ms, 1), } @dataclass class SessionStats: """Running totals for an auto-inspection session.""" total: int = 0 passed: int = 0 failed: int = 0 unknown: int = 0 errors: int = 0 start_time: Optional[float] = None @property def elapsed_seconds(self) -> float: if self.start_time is None: return 0.0 return time.time() - self.start_time def record(self, verdict: Verdict): self.total += 1 if verdict == Verdict.PASS: self.passed += 1 elif verdict == Verdict.FAIL: self.failed += 1 elif verdict == Verdict.UNKNOWN: self.unknown += 1 else: self.errors += 1 def to_dict(self) -> Dict[str, Any]: return { "total": self.total, "passed": self.passed, "failed": self.failed, "unknown": self.unknown, "errors": self.errors, "elapsed_seconds": round(self.elapsed_seconds, 1), } # ───────────────────────────────────────────────────────────────── # Image Analyzer — Validation, Quality, and Segmentation # ───────────────────────────────────────────────────────────────── class ImageAnalyzer: """ Handles all pre-AI image analysis: - Quality validation (brightness, contrast, sharpness) - Part segmentation via contour + morphological analysis - ROI extraction for focused detection """ # Thresholds MIN_RESOLUTION = (320, 240) MAX_INPUT_DIM = 1024 BRIGHTNESS_FLOOR = 15 BRIGHTNESS_CEIL = 245 CONTRAST_FLOOR = 5 BLUR_THRESHOLD = 100.0 # Laplacian variance below this = blurry # Segmentation tunables MORPHO_KERNEL = 5 MIN_CONTOUR_AREA_RATIO = 0.005 # Minimum area relative to image area MAX_CONTOUR_AREA_RATIO = 0.85 # Maximum area relative to image area CIRCULARITY_THRESHOLD = 0.15 # Minimum circularity for a valid part contour def measure_quality(self, img: Image.Image) -> QualityMetrics: """Compute image quality metrics without modifying the image.""" arr = np.array(img.convert("RGB")) gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) laplacian_var = float(cv2.Laplacian(gray, cv2.CV_64F).var()) return QualityMetrics( brightness=float(np.mean(arr)), contrast=float(np.std(arr)), sharpness=laplacian_var, is_blurred=laplacian_var < self.BLUR_THRESHOLD, resolution=(img.width, img.height), ) def validate(self, img: Image.Image) -> Tuple[bool, str]: w, h = img.size if w < self.MIN_RESOLUTION[0] or h < self.MIN_RESOLUTION[1]: return False, f"Resolution too low: {w}×{h} (need {self.MIN_RESOLUTION[0]}×{self.MIN_RESOLUTION[1]})" aspect = w / h if aspect < 0.2 or aspect > 5.0: return False, f"Unusual aspect ratio: {aspect:.2f}" metrics = self.measure_quality(img) if metrics.brightness < self.BRIGHTNESS_FLOOR: return False, "Image too dark" if metrics.brightness > self.BRIGHTNESS_CEIL: return False, "Image too bright / overexposed" if metrics.contrast < self.CONTRAST_FLOOR: return False, "Insufficient contrast — blank or uniform image" return True, "OK" def prepare(self, img: Image.Image) -> Image.Image: if img.mode != "RGB": img = img.convert("RGB") img.thumbnail((self.MAX_INPUT_DIM, self.MAX_INPUT_DIM), Image.Resampling.LANCZOS) return img # ── Part Segmentation ──────────────────────────────────────── def segment_parts(self, img: Image.Image) -> List[SegmentedROI]: arr = np.array(img.convert("RGB")) gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) img_area = gray.shape[0] * gray.shape[1] # Adaptive threshold deals better with shadows than global Otsu binary = cv2.adaptiveThreshold( gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, blockSize=31, C=10, ) # Morphological closing fills holes inside parts kernel = cv2.getStructuringElement( cv2.MORPH_ELLIPSE, (self.MORPHO_KERNEL, self.MORPHO_KERNEL), ) closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=3) # Optional: small opening to remove noise specks opened = cv2.morphologyEx(closed, cv2.MORPH_OPEN, kernel, iterations=1) contours, _ = cv2.findContours(opened, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) rois: List[SegmentedROI] = [] for cnt in contours: area = cv2.contourArea(cnt) ratio = area / img_area # Filter by relative area if ratio < self.MIN_CONTOUR_AREA_RATIO or ratio > self.MAX_CONTOUR_AREA_RATIO: continue # Circularity = 4π × area / perimeter² (1.0 for perfect circle) perimeter = cv2.arcLength(cnt, True) circularity = (4 * np.pi * area / (perimeter ** 2)) if perimeter > 0 else 0 if circularity < self.CIRCULARITY_THRESHOLD: continue x, y, w, h = cv2.boundingRect(cnt) # Create a mask for this contour and crop mask = np.zeros(gray.shape, dtype=np.uint8) cv2.drawContours(mask, [cnt], -1, 255, thickness=cv2.FILLED) # Crop the bounding box region crop_arr = arr[y:y + h, x:x + w].copy() crop_mask = mask[y:y + h, x:x + w] # Apply mask — set background to black crop_arr[crop_mask == 0] = 0 cropped_pil = Image.fromarray(crop_arr) rois.append(SegmentedROI( bbox=(x, y, w, h), contour=cnt, cropped_image=cropped_pil, mask=crop_mask, area=area, circularity=circularity, label=f"part_{len(rois)}", )) # Sort by area descending — largest part first rois.sort(key=lambda r: r.area, reverse=True) log.info(f"Segmentation: found {len(rois)} part region(s) from {len(contours)} contours") return rois def draw_segmentation_overlay( self, img: Image.Image, rois: List[SegmentedROI], verdict: Optional[Verdict] = None ) -> Image.Image: arr = np.array(img.convert("RGB")).copy() color_map = { Verdict.PASS: (0, 200, 100), Verdict.FAIL: (220, 60, 60), Verdict.UNKNOWN: (220, 180, 0), Verdict.ERROR: (128, 128, 128), None: (100, 180, 255), } color = color_map.get(verdict, (100, 180, 255)) for roi in rois: x, y, w, h = roi.bbox cv2.rectangle(arr, (x, y), (x + w, y + h), color, 2) # Label with area info label = f"{roi.label} ({roi.circularity:.2f})" font_scale = max(0.4, min(1.0, w / 300)) cv2.putText(arr, label, (x, max(y - 8, 15)), cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, 1, cv2.LINE_AA) # Verdict stamp in top-right if verdict is not None: stamp = verdict.value (tw, th), _ = cv2.getTextSize(stamp, cv2.FONT_HERSHEY_SIMPLEX, 1.2, 3) sx = arr.shape[1] - tw - 20 sy = th + 20 cv2.rectangle(arr, (sx - 10, sy - th - 10), (sx + tw + 10, sy + 10), color, cv2.FILLED) cv2.putText(arr, stamp, (sx, sy), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 255), 3, cv2.LINE_AA) return Image.fromarray(arr) # ───────────────────────────────────────────────────────────────── # Detection Engine — Orchestrates the full pipeline # ───────────────────────────────────────────────────────────────── class DetectionEngine: def __init__(self, hf_client=None, threshold: float = 0.70): self.analyzer = ImageAnalyzer() self.threshold = threshold # Lazy-init the HF client so index.py stays importable # without triggering network calls at import time. self._hf_client = hf_client self._hf_initialized = hf_client is not None @property def hf(self): if not self._hf_initialized: from hf_client import HuggingFaceClient self._hf_client = HuggingFaceClient() self._hf_initialized = True return self._hf_client async def run(self, img: Image.Image, threshold: Optional[float] = None) -> DetectionResult: t0 = time.time() result = DetectionResult(timestamp=datetime.utcnow().isoformat()) thr = threshold if threshold is not None else self.threshold # ── Step 1: Quality Gate ───────────────────────────────── valid, reason = self.analyzer.validate(img) result.quality = self.analyzer.measure_quality(img) if not valid: result.verdict = Verdict.ERROR result.status_detail = f"Quality rejected: {reason}" result.elapsed_ms = (time.time() - t0) * 1000 log.warning(f"Quality gate failed: {reason}") return result # ── Step 2: Segment Parts ──────────────────────────────── rois = self.analyzer.segment_parts(img) result.segments_found = len(rois) if len(rois) == 0: log.info("No part segments found — sending full image to AI") # ── Step 3: Prepare for AI ─────────────────────────────── # Send the full image (the backend has its own ROI logic). # The segmentation here is for local overlay + future use. prepared = self.analyzer.prepare(img) # ── Step 4: AI Classification ──────────────────────────── try: ai_result = await self.hf.detect_part(prepared, thr) except Exception as exc: result.verdict = Verdict.ERROR result.status_detail = f"AI backend error: {exc}" result.elapsed_ms = (time.time() - t0) * 1000 log.error(f"AI call failed: {exc}") return result if not ai_result.get("success"): result.verdict = Verdict.ERROR result.status_detail = f"AI returned failure: {ai_result.get('error', 'unknown')}" result.elapsed_ms = (time.time() - t0) * 1000 return result # ── Step 5: Interpret Verdict ──────────────────────────── best_match = str(ai_result.get("best_match", "")).strip() confidence = float(ai_result.get("confidence", 0.0)) all_scores = ai_result.get("all_scores", {}) status_text = str(ai_result.get("status_text", "")) result.confidence = confidence result.matched_class = best_match result.all_scores = all_scores result.status_detail = status_text result.verdict = self._interpret_verdict(best_match, status_text) # ── Step 6: Visualization ──────────────────────────────── # Build a composite overlay: segmentation boxes + verdict stamp vis_img = self.analyzer.draw_segmentation_overlay(prepared, rois, result.verdict) # Convert overlay to base64 for transport import io, base64 buf = io.BytesIO() vis_img.save(buf, format="JPEG", quality=85) result.visualization_b64 = base64.b64encode(buf.getvalue()).decode("utf-8") # Also attach the AI backend's visualization if available vis_path = ai_result.get("visualization") if vis_path and os.path.exists(vis_path): try: with open(vis_path, "rb") as f: result.visualization_b64 = base64.b64encode(f.read()).decode("utf-8") finally: try: os.remove(vis_path) except OSError: pass # Cleanup any temp files from the HF client for tmp in ai_result.get("_temp_paths", []): if tmp and tmp != vis_path: try: os.remove(tmp) except OSError: pass result.elapsed_ms = (time.time() - t0) * 1000 log.info( f"Detection: {result.verdict.value} │ " f"class={best_match} │ conf={confidence:.3f} │ " f"segments={len(rois)} │ {result.elapsed_ms:.0f}ms" ) return result @staticmethod def _interpret_verdict(best_match: str, status_text: str) -> Verdict: match_upper = best_match.upper() status_lower = status_text.lower() # Localization failures (bolt holes not found, etc.) failure_markers = ["no bolt holes", "localization failed", "insufficient hole"] if any(marker in status_lower for marker in failure_markers): return Verdict.UNKNOWN # Empty / none match if not match_upper or match_upper == "NONE" or match_upper == "UNKNOWN": return Verdict.UNKNOWN # Explicit verdict from backend status text status_upper = status_text.upper() if "PASS" in status_upper: return Verdict.PASS if "FAIL" in status_upper: return Verdict.FAIL # Fallback: class-name heuristic if "PERFECT" in match_upper: return Verdict.PASS # Everything else (Defect, Damaged, etc.) is a FAIL return Verdict.FAIL # ───────────────────────────────────────────────────────────────── # Camera Source — Manages OpenCV camera lifecycle # ───────────────────────────────────────────────────────────────── class CameraSource: WARMUP_FRAMES = 5 # Discard first N frames (often garbled) def __init__(self, camera_id: int = 0): self.camera_id = camera_id self._cap: Optional[cv2.VideoCapture] = None @staticmethod def detect_available(max_check: int = 5) -> List[int]: """Probe for available camera indices.""" available = [] for idx in range(max_check): cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW if os.name == "nt" else cv2.CAP_ANY) if cap.isOpened(): ret, _ = cap.read() if ret: available.append(idx) cap.release() return available def open(self) -> bool: """Open the camera and discard warm-up frames.""" backend = cv2.CAP_DSHOW if os.name == "nt" else cv2.CAP_ANY self._cap = cv2.VideoCapture(self.camera_id, backend) if not self._cap.isOpened(): log.error(f"Cannot open camera {self.camera_id}") return False # Set resolution hints self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) # Discard warm-up frames for _ in range(self.WARMUP_FRAMES): self._cap.read() log.info(f"Camera {self.camera_id} opened — " f"{int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))}×" f"{int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}") return True def grab(self) -> Optional[Image.Image]: """Capture a single frame as a PIL Image (RGB).""" if self._cap is None or not self._cap.isOpened(): return None ret, frame = self._cap.read() if not ret or frame is None: return None rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return Image.fromarray(rgb) def release(self): """Release the camera resource.""" if self._cap is not None: self._cap.release() self._cap = None log.info(f"Camera {self.camera_id} released") @property def is_open(self) -> bool: return self._cap is not None and self._cap.isOpened() # ───────────────────────────────────────────────────────────────── # Auto Inspector — Continuous detection loop # ───────────────────────────────────────────────────────────────── class AutoInspector: def __init__( self, engine: DetectionEngine, camera_id: int = 0, interval: float = 3.0, ): self.engine = engine self.camera = CameraSource(camera_id) self.interval = max(1.0, interval) # Floor at 1 second self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None self._loop: Optional[asyncio.AbstractEventLoop] = None self.stats = SessionStats() @property def is_running(self) -> bool: return self._thread is not None and self._thread.is_alive() def start(self, on_result: Optional[Callable] = None): if self.is_running: log.warning("Auto-inspection is already running") return self._stop_event.clear() self.stats = SessionStats(start_time=time.time()) self._thread = threading.Thread( target=self._run_loop, args=(on_result,), daemon=True, name="auto-inspector", ) self._thread.start() log.info(f"Auto-inspection started — camera={self.camera.camera_id}, interval={self.interval}s") def stop(self): """Signal the loop to stop and wait for cleanup.""" if not self.is_running: return log.info("Stopping auto-inspection...") self._stop_event.set() if self._thread: self._thread.join(timeout=10) self.camera.release() log.info(f"Auto-inspection stopped — {self.stats.to_dict()}") def _run_loop(self, on_result: Optional[Callable]): """Internal loop that runs in a background thread.""" # Create a new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self._loop = loop try: if not self.camera.open(): log.error("Failed to open camera — aborting auto-inspection") return while not self._stop_event.is_set(): frame = self.camera.grab() if frame is None: log.warning("Frame grab failed — retrying in 1s") self._stop_event.wait(1.0) continue # Run detection synchronously within this thread's event loop result = loop.run_until_complete(self.engine.run(frame)) self.stats.record(result.verdict) if on_result: try: on_result(result, self.stats) except Exception as cb_err: log.error(f"Callback error: {cb_err}") # Wait for the interval (interruptible) self._stop_event.wait(self.interval) except Exception as exc: log.error(f"Auto-inspection loop crashed: {exc}", exc_info=True) finally: self.camera.release() loop.close() # ───────────────────────────────────────────────────────────────── # Single-Image Detection (convenience function) # ───────────────────────────────────────────────────────────────── async def detect_image( image_path: str, threshold: float = 0.70, engine: Optional[DetectionEngine] = None, ) -> DetectionResult: if not os.path.isfile(image_path): raise FileNotFoundError(f"Image not found: {image_path}") img = Image.open(image_path) eng = engine or DetectionEngine(threshold=threshold) return await eng.run(img, threshold) # ───────────────────────────────────────────────────────────────── # CLI — Run as standalone script # ───────────────────────────────────────────────────────────────── def _print_result(result: DetectionResult, stats: Optional[SessionStats] = None): v = result.verdict.value color = {"PASS": "\033[92m", "FAIL": "\033[91m", "UNKNOWN": "\033[93m", "ERROR": "\033[90m"} reset = "\033[0m" c = color.get(v, "") print(f"\n{'─' * 50}") print(f" {c}█ {v}{reset} │ class: {result.matched_class or '—'} │ conf: {result.confidence:.1%}") print(f" segments: {result.segments_found} │ quality: {result.quality.quality_score:.0f} │ {result.elapsed_ms:.0f}ms") if result.all_scores: scores_str = " ".join(f"{k}: {v:.1%}" for k, v in result.all_scores.items()) print(f" scores: {scores_str}") if result.status_detail: detail = result.status_detail[:120].replace("\n", " ") print(f" detail: {detail}") if stats: s = stats print(f" session: {s.total} total │ ✓{s.passed} ✗{s.failed} ?{s.unknown} │ {s.elapsed_seconds:.0f}s") print(f"{'─' * 50}") def main(): parser = argparse.ArgumentParser( description="Engine Part Detection — Standalone Detection Pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python index.py # Auto-detect camera, run continuous python index.py --camera 0 --interval 5 # Camera 0, every 5 seconds python index.py --image part.jpg # Single image detection python index.py --list-cameras # Show available cameras """, ) grp = parser.add_mutually_exclusive_group() grp.add_argument("--image", "-i", type=str, help="Path to a single image file for detection") grp.add_argument("--camera", "-c", type=int, default=None, help="Camera index (default: auto-detect)") grp.add_argument("--list-cameras", action="store_true", help="List available cameras and exit") parser.add_argument("--threshold", "-t", type=float, default=0.70, help="Detection threshold (default: 0.70)") parser.add_argument("--interval", type=float, default=3.0, help="Seconds between captures in auto mode (default: 3.0)") parser.add_argument("--quiet", "-q", action="store_true", help="Suppress verbose output") args = parser.parse_args() if args.quiet: logging.getLogger("detection").setLevel(logging.WARNING) # ── List cameras ───────────────────────────────────────────── if args.list_cameras: print("Scanning for cameras...") cams = CameraSource.detect_available() if cams: print(f"Found {len(cams)} camera(s): {cams}") else: print("No cameras detected.") sys.exit(0) # ── Single image mode ──────────────────────────────────────── if args.image: print(f"Analyzing: {args.image}") result = asyncio.run(detect_image(args.image, args.threshold)) _print_result(result) sys.exit(0 if result.verdict != Verdict.ERROR else 1) # ── Auto inspection mode (camera) ──────────────────────────── camera_id = args.camera if camera_id is None: print("Auto-detecting cameras...") available = CameraSource.detect_available() if not available: print("No cameras found. Use --image for file-based detection.") sys.exit(1) camera_id = available[0] print(f"Using camera {camera_id}") engine = DetectionEngine(threshold=args.threshold) inspector = AutoInspector(engine, camera_id=camera_id, interval=args.interval) print(f"\n Auto Inspection Mode") print(f" Camera: {camera_id} │ Interval: {args.interval}s │ Threshold: {args.threshold}") print(f" Press Ctrl+C to stop\n") inspector.start(on_result=_print_result) try: while inspector.is_running: time.sleep(0.5) except KeyboardInterrupt: print("\n\nStopping...") finally: inspector.stop() print(f"\nSession summary: {inspector.stats.to_dict()}") if __name__ == "__main__": main()