final_v2 / models /L2CS-Net /l2cs /pipeline.py
k22056537
feat: UI nav, onboarding, L2CS weights path + torch.load; trim dev files
ac0baac
import pathlib
import time
from typing import Union
import cv2
import numpy as np
import torch
import torch.nn as nn
from dataclasses import dataclass
from face_detection import RetinaFace
from .utils import prep_input_numpy, getArch
from .results import GazeResultContainer
class Pipeline:
def __init__(
self,
weights: pathlib.Path,
arch: str,
device: str = 'cpu',
include_detector:bool = True,
confidence_threshold:float = 0.5
):
# Save input parameters
self.weights = weights
self.include_detector = include_detector
self.device = device
self.confidence_threshold = confidence_threshold
# Create L2CS model
self.model = getArch(arch, 90)
# PyTorch 2.6+ defaults weights_only=True; these checkpoints need full unpickle
self.model.load_state_dict(
torch.load(self.weights, map_location=device, weights_only=False)
)
self.model.to(self.device)
self.model.eval()
# Half precision on GPU for ~2x speedup
self._use_half = (device.type != 'cpu')
if self._use_half:
self.model.half()
# Create RetinaFace if requested
if self.include_detector:
if device.type == 'cpu':
self.detector = RetinaFace()
else:
self.detector = RetinaFace(gpu_id=device.index)
self.softmax = nn.Softmax(dim=1)
self.idx_tensor = [idx for idx in range(90)]
self.idx_tensor = torch.FloatTensor(self.idx_tensor).to(self.device)
# Warmup: dummy forward pass to avoid cold-start latency
self._warmup()
def _warmup(self):
"""Run a dummy forward pass to warm up the model and CUDA kernels."""
dummy = np.zeros((224, 224, 3), dtype=np.uint8)
try:
with torch.no_grad():
self.predict_gaze(dummy)
print("[L2CS] Model warmup complete")
except Exception as e:
print(f"[L2CS] Warmup failed (non-fatal): {e}")
def step(self, frame: np.ndarray) -> GazeResultContainer:
# Creating containers
face_imgs = []
bboxes = []
landmarks = []
scores = []
if self.include_detector:
t0 = time.perf_counter()
faces = self.detector(frame)
t_detect = (time.perf_counter() - t0) * 1000
if faces is not None:
t0 = time.perf_counter()
for box, landmark, score in faces:
# Apply threshold
if score < self.confidence_threshold:
continue
# Extract safe min and max of x,y
x_min=int(box[0])
if x_min < 0:
x_min = 0
y_min=int(box[1])
if y_min < 0:
y_min = 0
x_max=int(box[2])
y_max=int(box[3])
# Crop image
img = frame[y_min:y_max, x_min:x_max]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (224, 224))
face_imgs.append(img)
# Save data
bboxes.append(box)
landmarks.append(landmark)
scores.append(score)
t_preprocess = (time.perf_counter() - t0) * 1000
# Predict gaze
t0 = time.perf_counter()
with torch.no_grad():
pitch, yaw = self.predict_gaze(np.stack(face_imgs))
t_inference = (time.perf_counter() - t0) * 1000
# Log timing every 30 frames (avoid spamming)
if not hasattr(self, '_step_count'):
self._step_count = 0
self._step_count += 1
if self._step_count % 30 == 1:
print(f"[L2CS timing] detect={t_detect:.1f}ms preprocess={t_preprocess:.1f}ms inference={t_inference:.1f}ms total={t_detect+t_preprocess+t_inference:.1f}ms")
else:
pitch = np.empty((0,1))
yaw = np.empty((0,1))
else:
with torch.no_grad():
pitch, yaw = self.predict_gaze(frame)
# Save data
results = GazeResultContainer(
pitch=pitch,
yaw=yaw,
bboxes=np.stack(bboxes),
landmarks=np.stack(landmarks),
scores=np.stack(scores)
)
return results
def predict_gaze(self, frame: Union[np.ndarray, torch.Tensor]):
# Prepare input
if isinstance(frame, np.ndarray):
img = prep_input_numpy(frame, self.device)
elif isinstance(frame, torch.Tensor):
img = frame
else:
raise RuntimeError("Invalid dtype for input")
# Half precision on GPU
if self._use_half:
img = img.half()
# Forward pass (caller should wrap in torch.no_grad())
gaze_pitch, gaze_yaw = self.model(img)
pitch_predicted = self.softmax(gaze_pitch.float())
yaw_predicted = self.softmax(gaze_yaw.float())
# Get continuous predictions in degrees.
pitch_predicted = torch.sum(pitch_predicted.data * self.idx_tensor, dim=1) * 4 - 180
yaw_predicted = torch.sum(yaw_predicted.data * self.idx_tensor, dim=1) * 4 - 180
pitch_predicted = pitch_predicted.cpu().detach().numpy() * np.pi / 180.0
yaw_predicted = yaw_predicted.cpu().detach().numpy() * np.pi / 180.0
return pitch_predicted, yaw_predicted