arch / app.py
eho69's picture
Rename ap.py to app.py
a461c13 verified
import os
import sys
import cv2
import time
import asyncio
import logging
import argparse
import threading
import numpy as np
from enum import Enum
from typing import Optional, Dict, Any, Callable, Tuple, List
from dataclasses import dataclass, field
from datetime import datetime
from PIL import Image
# ─────────────────────────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s β”‚ %(name)-12s β”‚ %(levelname)-7s β”‚ %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("detection")
# ─────────────────────────────────────────────────────────────────
# Data Models
# ─────────────────────────────────────────────────────────────────
class Verdict(str, Enum):
PASS = "PASS"
FAIL = "FAIL"
UNKNOWN = "UNKNOWN"
ERROR = "ERROR"
@dataclass
class QualityMetrics:
"""Image quality measurements."""
brightness: float = 0.0
contrast: float = 0.0
sharpness: float = 0.0
is_blurred: bool = False
resolution: Tuple[int, int] = (0, 0)
@property
def quality_score(self) -> float:
return min(100.0, self.sharpness / 2.0)
@dataclass
class SegmentedROI:
"""A detected region of interest from segmentation."""
bbox: Tuple[int, int, int, int] # x, y, w, h
contour: Any = None
cropped_image: Optional[Image.Image] = None
mask: Optional[np.ndarray] = None
area: float = 0.0
circularity: float = 0.0
label: str = "part"
@dataclass
class DetectionResult:
"""Complete result of a single detection pass."""
verdict: Verdict = Verdict.UNKNOWN
confidence: float = 0.0
matched_class: str = ""
quality: QualityMetrics = field(default_factory=QualityMetrics)
visualization_b64: Optional[str] = None
all_scores: Dict[str, float] = field(default_factory=dict)
segments_found: int = 0
status_detail: str = ""
timestamp: str = ""
elapsed_ms: float = 0.0
def to_dict(self) -> Dict[str, Any]:
return {
"verdict": self.verdict.value,
"confidence": round(self.confidence, 4),
"matched_class": self.matched_class,
"quality": {
"brightness": round(self.quality.brightness, 2),
"contrast": round(self.quality.contrast, 2),
"sharpness": round(self.quality.sharpness, 2),
"is_blurred": self.quality.is_blurred,
"quality_score": round(self.quality.quality_score, 2),
"resolution": list(self.quality.resolution),
},
"visualization": self.visualization_b64,
"all_scores": self.all_scores,
"segments_found": self.segments_found,
"status_detail": self.status_detail,
"timestamp": self.timestamp,
"elapsed_ms": round(self.elapsed_ms, 1),
}
@dataclass
class SessionStats:
"""Running totals for an auto-inspection session."""
total: int = 0
passed: int = 0
failed: int = 0
unknown: int = 0
errors: int = 0
start_time: Optional[float] = None
@property
def elapsed_seconds(self) -> float:
if self.start_time is None:
return 0.0
return time.time() - self.start_time
def record(self, verdict: Verdict):
self.total += 1
if verdict == Verdict.PASS:
self.passed += 1
elif verdict == Verdict.FAIL:
self.failed += 1
elif verdict == Verdict.UNKNOWN:
self.unknown += 1
else:
self.errors += 1
def to_dict(self) -> Dict[str, Any]:
return {
"total": self.total,
"passed": self.passed,
"failed": self.failed,
"unknown": self.unknown,
"errors": self.errors,
"elapsed_seconds": round(self.elapsed_seconds, 1),
}
# ─────────────────────────────────────────────────────────────────
# Image Analyzer β€” Validation, Quality, and Segmentation
# ─────────────────────────────────────────────────────────────────
class ImageAnalyzer:
"""
Handles all pre-AI image analysis:
- Quality validation (brightness, contrast, sharpness)
- Part segmentation via contour + morphological analysis
- ROI extraction for focused detection
"""
# Thresholds
MIN_RESOLUTION = (320, 240)
MAX_INPUT_DIM = 1024
BRIGHTNESS_FLOOR = 15
BRIGHTNESS_CEIL = 245
CONTRAST_FLOOR = 5
BLUR_THRESHOLD = 100.0 # Laplacian variance below this = blurry
# Segmentation tunables
MORPHO_KERNEL = 5
MIN_CONTOUR_AREA_RATIO = 0.005 # Minimum area relative to image area
MAX_CONTOUR_AREA_RATIO = 0.85 # Maximum area relative to image area
CIRCULARITY_THRESHOLD = 0.15 # Minimum circularity for a valid part contour
def measure_quality(self, img: Image.Image) -> QualityMetrics:
"""Compute image quality metrics without modifying the image."""
arr = np.array(img.convert("RGB"))
gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
laplacian_var = float(cv2.Laplacian(gray, cv2.CV_64F).var())
return QualityMetrics(
brightness=float(np.mean(arr)),
contrast=float(np.std(arr)),
sharpness=laplacian_var,
is_blurred=laplacian_var < self.BLUR_THRESHOLD,
resolution=(img.width, img.height),
)
def validate(self, img: Image.Image) -> Tuple[bool, str]:
w, h = img.size
if w < self.MIN_RESOLUTION[0] or h < self.MIN_RESOLUTION[1]:
return False, f"Resolution too low: {w}Γ—{h} (need {self.MIN_RESOLUTION[0]}Γ—{self.MIN_RESOLUTION[1]})"
aspect = w / h
if aspect < 0.2 or aspect > 5.0:
return False, f"Unusual aspect ratio: {aspect:.2f}"
metrics = self.measure_quality(img)
if metrics.brightness < self.BRIGHTNESS_FLOOR:
return False, "Image too dark"
if metrics.brightness > self.BRIGHTNESS_CEIL:
return False, "Image too bright / overexposed"
if metrics.contrast < self.CONTRAST_FLOOR:
return False, "Insufficient contrast β€” blank or uniform image"
return True, "OK"
def prepare(self, img: Image.Image) -> Image.Image:
if img.mode != "RGB":
img = img.convert("RGB")
img.thumbnail((self.MAX_INPUT_DIM, self.MAX_INPUT_DIM), Image.Resampling.LANCZOS)
return img
# ── Part Segmentation ────────────────────────────────────────
def segment_parts(self, img: Image.Image) -> List[SegmentedROI]:
arr = np.array(img.convert("RGB"))
gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
img_area = gray.shape[0] * gray.shape[1]
# Adaptive threshold deals better with shadows than global Otsu
binary = cv2.adaptiveThreshold(
gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
blockSize=31,
C=10,
)
# Morphological closing fills holes inside parts
kernel = cv2.getStructuringElement(
cv2.MORPH_ELLIPSE,
(self.MORPHO_KERNEL, self.MORPHO_KERNEL),
)
closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=3)
# Optional: small opening to remove noise specks
opened = cv2.morphologyEx(closed, cv2.MORPH_OPEN, kernel, iterations=1)
contours, _ = cv2.findContours(opened, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
rois: List[SegmentedROI] = []
for cnt in contours:
area = cv2.contourArea(cnt)
ratio = area / img_area
# Filter by relative area
if ratio < self.MIN_CONTOUR_AREA_RATIO or ratio > self.MAX_CONTOUR_AREA_RATIO:
continue
# Circularity = 4Ο€ Γ— area / perimeterΒ² (1.0 for perfect circle)
perimeter = cv2.arcLength(cnt, True)
circularity = (4 * np.pi * area / (perimeter ** 2)) if perimeter > 0 else 0
if circularity < self.CIRCULARITY_THRESHOLD:
continue
x, y, w, h = cv2.boundingRect(cnt)
# Create a mask for this contour and crop
mask = np.zeros(gray.shape, dtype=np.uint8)
cv2.drawContours(mask, [cnt], -1, 255, thickness=cv2.FILLED)
# Crop the bounding box region
crop_arr = arr[y:y + h, x:x + w].copy()
crop_mask = mask[y:y + h, x:x + w]
# Apply mask β€” set background to black
crop_arr[crop_mask == 0] = 0
cropped_pil = Image.fromarray(crop_arr)
rois.append(SegmentedROI(
bbox=(x, y, w, h),
contour=cnt,
cropped_image=cropped_pil,
mask=crop_mask,
area=area,
circularity=circularity,
label=f"part_{len(rois)}",
))
# Sort by area descending β€” largest part first
rois.sort(key=lambda r: r.area, reverse=True)
log.info(f"Segmentation: found {len(rois)} part region(s) from {len(contours)} contours")
return rois
def draw_segmentation_overlay(
self, img: Image.Image, rois: List[SegmentedROI], verdict: Optional[Verdict] = None
) -> Image.Image:
arr = np.array(img.convert("RGB")).copy()
color_map = {
Verdict.PASS: (0, 200, 100),
Verdict.FAIL: (220, 60, 60),
Verdict.UNKNOWN: (220, 180, 0),
Verdict.ERROR: (128, 128, 128),
None: (100, 180, 255),
}
color = color_map.get(verdict, (100, 180, 255))
for roi in rois:
x, y, w, h = roi.bbox
cv2.rectangle(arr, (x, y), (x + w, y + h), color, 2)
# Label with area info
label = f"{roi.label} ({roi.circularity:.2f})"
font_scale = max(0.4, min(1.0, w / 300))
cv2.putText(arr, label, (x, max(y - 8, 15)),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, 1, cv2.LINE_AA)
# Verdict stamp in top-right
if verdict is not None:
stamp = verdict.value
(tw, th), _ = cv2.getTextSize(stamp, cv2.FONT_HERSHEY_SIMPLEX, 1.2, 3)
sx = arr.shape[1] - tw - 20
sy = th + 20
cv2.rectangle(arr, (sx - 10, sy - th - 10), (sx + tw + 10, sy + 10), color, cv2.FILLED)
cv2.putText(arr, stamp, (sx, sy),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 255), 3, cv2.LINE_AA)
return Image.fromarray(arr)
# ─────────────────────────────────────────────────────────────────
# Detection Engine β€” Orchestrates the full pipeline
# ─────────────────────────────────────────────────────────────────
class DetectionEngine:
def __init__(self, hf_client=None, threshold: float = 0.70):
self.analyzer = ImageAnalyzer()
self.threshold = threshold
# Lazy-init the HF client so index.py stays importable
# without triggering network calls at import time.
self._hf_client = hf_client
self._hf_initialized = hf_client is not None
@property
def hf(self):
if not self._hf_initialized:
from hf_client import HuggingFaceClient
self._hf_client = HuggingFaceClient()
self._hf_initialized = True
return self._hf_client
async def run(self, img: Image.Image, threshold: Optional[float] = None) -> DetectionResult:
t0 = time.time()
result = DetectionResult(timestamp=datetime.utcnow().isoformat())
thr = threshold if threshold is not None else self.threshold
# ── Step 1: Quality Gate ─────────────────────────────────
valid, reason = self.analyzer.validate(img)
result.quality = self.analyzer.measure_quality(img)
if not valid:
result.verdict = Verdict.ERROR
result.status_detail = f"Quality rejected: {reason}"
result.elapsed_ms = (time.time() - t0) * 1000
log.warning(f"Quality gate failed: {reason}")
return result
# ── Step 2: Segment Parts ────────────────────────────────
rois = self.analyzer.segment_parts(img)
result.segments_found = len(rois)
if len(rois) == 0:
log.info("No part segments found β€” sending full image to AI")
# ── Step 3: Prepare for AI ───────────────────────────────
# Send the full image (the backend has its own ROI logic).
# The segmentation here is for local overlay + future use.
prepared = self.analyzer.prepare(img)
# ── Step 4: AI Classification ────────────────────────────
try:
ai_result = await self.hf.detect_part(prepared, thr)
except Exception as exc:
result.verdict = Verdict.ERROR
result.status_detail = f"AI backend error: {exc}"
result.elapsed_ms = (time.time() - t0) * 1000
log.error(f"AI call failed: {exc}")
return result
if not ai_result.get("success"):
result.verdict = Verdict.ERROR
result.status_detail = f"AI returned failure: {ai_result.get('error', 'unknown')}"
result.elapsed_ms = (time.time() - t0) * 1000
return result
# ── Step 5: Interpret Verdict ────────────────────────────
best_match = str(ai_result.get("best_match", "")).strip()
confidence = float(ai_result.get("confidence", 0.0))
all_scores = ai_result.get("all_scores", {})
status_text = str(ai_result.get("status_text", ""))
result.confidence = confidence
result.matched_class = best_match
result.all_scores = all_scores
result.status_detail = status_text
result.verdict = self._interpret_verdict(best_match, status_text)
# ── Step 6: Visualization ────────────────────────────────
# Build a composite overlay: segmentation boxes + verdict stamp
vis_img = self.analyzer.draw_segmentation_overlay(prepared, rois, result.verdict)
# Convert overlay to base64 for transport
import io, base64
buf = io.BytesIO()
vis_img.save(buf, format="JPEG", quality=85)
result.visualization_b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
# Also attach the AI backend's visualization if available
vis_path = ai_result.get("visualization")
if vis_path and os.path.exists(vis_path):
try:
with open(vis_path, "rb") as f:
result.visualization_b64 = base64.b64encode(f.read()).decode("utf-8")
finally:
try:
os.remove(vis_path)
except OSError:
pass
# Cleanup any temp files from the HF client
for tmp in ai_result.get("_temp_paths", []):
if tmp and tmp != vis_path:
try:
os.remove(tmp)
except OSError:
pass
result.elapsed_ms = (time.time() - t0) * 1000
log.info(
f"Detection: {result.verdict.value} β”‚ "
f"class={best_match} β”‚ conf={confidence:.3f} β”‚ "
f"segments={len(rois)} β”‚ {result.elapsed_ms:.0f}ms"
)
return result
@staticmethod
def _interpret_verdict(best_match: str, status_text: str) -> Verdict:
match_upper = best_match.upper()
status_lower = status_text.lower()
# Localization failures (bolt holes not found, etc.)
failure_markers = ["no bolt holes", "localization failed", "insufficient hole"]
if any(marker in status_lower for marker in failure_markers):
return Verdict.UNKNOWN
# Empty / none match
if not match_upper or match_upper == "NONE" or match_upper == "UNKNOWN":
return Verdict.UNKNOWN
# Explicit verdict from backend status text
status_upper = status_text.upper()
if "PASS" in status_upper:
return Verdict.PASS
if "FAIL" in status_upper:
return Verdict.FAIL
# Fallback: class-name heuristic
if "PERFECT" in match_upper:
return Verdict.PASS
# Everything else (Defect, Damaged, etc.) is a FAIL
return Verdict.FAIL
# ─────────────────────────────────────────────────────────────────
# Camera Source β€” Manages OpenCV camera lifecycle
# ─────────────────────────────────────────────────────────────────
class CameraSource:
WARMUP_FRAMES = 5 # Discard first N frames (often garbled)
def __init__(self, camera_id: int = 0):
self.camera_id = camera_id
self._cap: Optional[cv2.VideoCapture] = None
@staticmethod
def detect_available(max_check: int = 5) -> List[int]:
"""Probe for available camera indices."""
available = []
for idx in range(max_check):
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW if os.name == "nt" else cv2.CAP_ANY)
if cap.isOpened():
ret, _ = cap.read()
if ret:
available.append(idx)
cap.release()
return available
def open(self) -> bool:
"""Open the camera and discard warm-up frames."""
backend = cv2.CAP_DSHOW if os.name == "nt" else cv2.CAP_ANY
self._cap = cv2.VideoCapture(self.camera_id, backend)
if not self._cap.isOpened():
log.error(f"Cannot open camera {self.camera_id}")
return False
# Set resolution hints
self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# Discard warm-up frames
for _ in range(self.WARMUP_FRAMES):
self._cap.read()
log.info(f"Camera {self.camera_id} opened β€” "
f"{int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH))}Γ—"
f"{int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}")
return True
def grab(self) -> Optional[Image.Image]:
"""Capture a single frame as a PIL Image (RGB)."""
if self._cap is None or not self._cap.isOpened():
return None
ret, frame = self._cap.read()
if not ret or frame is None:
return None
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return Image.fromarray(rgb)
def release(self):
"""Release the camera resource."""
if self._cap is not None:
self._cap.release()
self._cap = None
log.info(f"Camera {self.camera_id} released")
@property
def is_open(self) -> bool:
return self._cap is not None and self._cap.isOpened()
# ─────────────────────────────────────────────────────────────────
# Auto Inspector β€” Continuous detection loop
# ─────────────────────────────────────────────────────────────────
class AutoInspector:
def __init__(
self,
engine: DetectionEngine,
camera_id: int = 0,
interval: float = 3.0,
):
self.engine = engine
self.camera = CameraSource(camera_id)
self.interval = max(1.0, interval) # Floor at 1 second
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self.stats = SessionStats()
@property
def is_running(self) -> bool:
return self._thread is not None and self._thread.is_alive()
def start(self, on_result: Optional[Callable] = None):
if self.is_running:
log.warning("Auto-inspection is already running")
return
self._stop_event.clear()
self.stats = SessionStats(start_time=time.time())
self._thread = threading.Thread(
target=self._run_loop,
args=(on_result,),
daemon=True,
name="auto-inspector",
)
self._thread.start()
log.info(f"Auto-inspection started β€” camera={self.camera.camera_id}, interval={self.interval}s")
def stop(self):
"""Signal the loop to stop and wait for cleanup."""
if not self.is_running:
return
log.info("Stopping auto-inspection...")
self._stop_event.set()
if self._thread:
self._thread.join(timeout=10)
self.camera.release()
log.info(f"Auto-inspection stopped β€” {self.stats.to_dict()}")
def _run_loop(self, on_result: Optional[Callable]):
"""Internal loop that runs in a background thread."""
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loop = loop
try:
if not self.camera.open():
log.error("Failed to open camera β€” aborting auto-inspection")
return
while not self._stop_event.is_set():
frame = self.camera.grab()
if frame is None:
log.warning("Frame grab failed β€” retrying in 1s")
self._stop_event.wait(1.0)
continue
# Run detection synchronously within this thread's event loop
result = loop.run_until_complete(self.engine.run(frame))
self.stats.record(result.verdict)
if on_result:
try:
on_result(result, self.stats)
except Exception as cb_err:
log.error(f"Callback error: {cb_err}")
# Wait for the interval (interruptible)
self._stop_event.wait(self.interval)
except Exception as exc:
log.error(f"Auto-inspection loop crashed: {exc}", exc_info=True)
finally:
self.camera.release()
loop.close()
# ─────────────────────────────────────────────────────────────────
# Single-Image Detection (convenience function)
# ─────────────────────────────────────────────────────────────────
async def detect_image(
image_path: str,
threshold: float = 0.70,
engine: Optional[DetectionEngine] = None,
) -> DetectionResult:
if not os.path.isfile(image_path):
raise FileNotFoundError(f"Image not found: {image_path}")
img = Image.open(image_path)
eng = engine or DetectionEngine(threshold=threshold)
return await eng.run(img, threshold)
# ─────────────────────────────────────────────────────────────────
# CLI β€” Run as standalone script
# ─────────────────────────────────────────────────────────────────
def _print_result(result: DetectionResult, stats: Optional[SessionStats] = None):
v = result.verdict.value
color = {"PASS": "\033[92m", "FAIL": "\033[91m", "UNKNOWN": "\033[93m", "ERROR": "\033[90m"}
reset = "\033[0m"
c = color.get(v, "")
print(f"\n{'─' * 50}")
print(f" {c}β–ˆ {v}{reset} β”‚ class: {result.matched_class or 'β€”'} β”‚ conf: {result.confidence:.1%}")
print(f" segments: {result.segments_found} β”‚ quality: {result.quality.quality_score:.0f} β”‚ {result.elapsed_ms:.0f}ms")
if result.all_scores:
scores_str = " ".join(f"{k}: {v:.1%}" for k, v in result.all_scores.items())
print(f" scores: {scores_str}")
if result.status_detail:
detail = result.status_detail[:120].replace("\n", " ")
print(f" detail: {detail}")
if stats:
s = stats
print(f" session: {s.total} total β”‚ βœ“{s.passed} βœ—{s.failed} ?{s.unknown} β”‚ {s.elapsed_seconds:.0f}s")
print(f"{'─' * 50}")
def main():
parser = argparse.ArgumentParser(
description="Engine Part Detection β€” Standalone Detection Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python index.py # Auto-detect camera, run continuous
python index.py --camera 0 --interval 5 # Camera 0, every 5 seconds
python index.py --image part.jpg # Single image detection
python index.py --list-cameras # Show available cameras
""",
)
grp = parser.add_mutually_exclusive_group()
grp.add_argument("--image", "-i", type=str, help="Path to a single image file for detection")
grp.add_argument("--camera", "-c", type=int, default=None, help="Camera index (default: auto-detect)")
grp.add_argument("--list-cameras", action="store_true", help="List available cameras and exit")
parser.add_argument("--threshold", "-t", type=float, default=0.70, help="Detection threshold (default: 0.70)")
parser.add_argument("--interval", type=float, default=3.0, help="Seconds between captures in auto mode (default: 3.0)")
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress verbose output")
args = parser.parse_args()
if args.quiet:
logging.getLogger("detection").setLevel(logging.WARNING)
# ── List cameras ─────────────────────────────────────────────
if args.list_cameras:
print("Scanning for cameras...")
cams = CameraSource.detect_available()
if cams:
print(f"Found {len(cams)} camera(s): {cams}")
else:
print("No cameras detected.")
sys.exit(0)
# ── Single image mode ────────────────────────────────────────
if args.image:
print(f"Analyzing: {args.image}")
result = asyncio.run(detect_image(args.image, args.threshold))
_print_result(result)
sys.exit(0 if result.verdict != Verdict.ERROR else 1)
# ── Auto inspection mode (camera) ────────────────────────────
camera_id = args.camera
if camera_id is None:
print("Auto-detecting cameras...")
available = CameraSource.detect_available()
if not available:
print("No cameras found. Use --image for file-based detection.")
sys.exit(1)
camera_id = available[0]
print(f"Using camera {camera_id}")
engine = DetectionEngine(threshold=args.threshold)
inspector = AutoInspector(engine, camera_id=camera_id, interval=args.interval)
print(f"\n Auto Inspection Mode")
print(f" Camera: {camera_id} β”‚ Interval: {args.interval}s β”‚ Threshold: {args.threshold}")
print(f" Press Ctrl+C to stop\n")
inspector.start(on_result=_print_result)
try:
while inspector.is_running:
time.sleep(0.5)
except KeyboardInterrupt:
print("\n\nStopping...")
finally:
inspector.stop()
print(f"\nSession summary: {inspector.stats.to_dict()}")
if __name__ == "__main__":
main()