import pathlib import time from typing import Union import cv2 import numpy as np import torch import torch.nn as nn from dataclasses import dataclass from face_detection import RetinaFace from .utils import prep_input_numpy, getArch from .results import GazeResultContainer class Pipeline: def __init__( self, weights: pathlib.Path, arch: str, device: str = 'cpu', include_detector:bool = True, confidence_threshold:float = 0.5 ): # Save input parameters self.weights = weights self.include_detector = include_detector self.device = device self.confidence_threshold = confidence_threshold # Create L2CS model self.model = getArch(arch, 90) # PyTorch 2.6+ defaults weights_only=True; these checkpoints need full unpickle self.model.load_state_dict( torch.load(self.weights, map_location=device, weights_only=False) ) self.model.to(self.device) self.model.eval() # Half precision on GPU for ~2x speedup self._use_half = (device.type != 'cpu') if self._use_half: self.model.half() # Create RetinaFace if requested if self.include_detector: if device.type == 'cpu': self.detector = RetinaFace() else: self.detector = RetinaFace(gpu_id=device.index) self.softmax = nn.Softmax(dim=1) self.idx_tensor = [idx for idx in range(90)] self.idx_tensor = torch.FloatTensor(self.idx_tensor).to(self.device) # Warmup: dummy forward pass to avoid cold-start latency self._warmup() def _warmup(self): """Run a dummy forward pass to warm up the model and CUDA kernels.""" dummy = np.zeros((224, 224, 3), dtype=np.uint8) try: with torch.no_grad(): self.predict_gaze(dummy) print("[L2CS] Model warmup complete") except Exception as e: print(f"[L2CS] Warmup failed (non-fatal): {e}") def step(self, frame: np.ndarray) -> GazeResultContainer: # Creating containers face_imgs = [] bboxes = [] landmarks = [] scores = [] if self.include_detector: t0 = time.perf_counter() faces = self.detector(frame) t_detect = (time.perf_counter() - t0) * 1000 if faces is not None: t0 = time.perf_counter() for box, landmark, score in faces: # Apply threshold if score < self.confidence_threshold: continue # Extract safe min and max of x,y x_min=int(box[0]) if x_min < 0: x_min = 0 y_min=int(box[1]) if y_min < 0: y_min = 0 x_max=int(box[2]) y_max=int(box[3]) # Crop image img = frame[y_min:y_max, x_min:x_max] img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = cv2.resize(img, (224, 224)) face_imgs.append(img) # Save data bboxes.append(box) landmarks.append(landmark) scores.append(score) t_preprocess = (time.perf_counter() - t0) * 1000 # Predict gaze t0 = time.perf_counter() with torch.no_grad(): pitch, yaw = self.predict_gaze(np.stack(face_imgs)) t_inference = (time.perf_counter() - t0) * 1000 # Log timing every 30 frames (avoid spamming) if not hasattr(self, '_step_count'): self._step_count = 0 self._step_count += 1 if self._step_count % 30 == 1: print(f"[L2CS timing] detect={t_detect:.1f}ms preprocess={t_preprocess:.1f}ms inference={t_inference:.1f}ms total={t_detect+t_preprocess+t_inference:.1f}ms") else: pitch = np.empty((0,1)) yaw = np.empty((0,1)) else: with torch.no_grad(): pitch, yaw = self.predict_gaze(frame) # Save data results = GazeResultContainer( pitch=pitch, yaw=yaw, bboxes=np.stack(bboxes), landmarks=np.stack(landmarks), scores=np.stack(scores) ) return results def predict_gaze(self, frame: Union[np.ndarray, torch.Tensor]): # Prepare input if isinstance(frame, np.ndarray): img = prep_input_numpy(frame, self.device) elif isinstance(frame, torch.Tensor): img = frame else: raise RuntimeError("Invalid dtype for input") # Half precision on GPU if self._use_half: img = img.half() # Forward pass (caller should wrap in torch.no_grad()) gaze_pitch, gaze_yaw = self.model(img) pitch_predicted = self.softmax(gaze_pitch.float()) yaw_predicted = self.softmax(gaze_yaw.float()) # Get continuous predictions in degrees. pitch_predicted = torch.sum(pitch_predicted.data * self.idx_tensor, dim=1) * 4 - 180 yaw_predicted = torch.sum(yaw_predicted.data * self.idx_tensor, dim=1) * 4 - 180 pitch_predicted = pitch_predicted.cpu().detach().numpy() * np.pi / 180.0 yaw_predicted = yaw_predicted.cpu().detach().numpy() * np.pi / 180.0 return pitch_predicted, yaw_predicted