from flask import Flask, render_template, Response, jsonify,request import os import threading import time import cv2 import numpy as np import torch import yaml from torchvision import transforms from face_alignment.alignment import norm_crop from face_detection.scrfd.detector import SCRFD from face_detection.yolov5_face.detector import Yolov5Face from face_recognition.arcface.model import iresnet_inference from face_recognition.arcface.utils import compare_encodings, read_features from face_tracking.tracker.byte_tracker import BYTETracker from face_tracking.tracker.visualize import plot_tracking app = Flask(__name__) # Device configuration device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(device) # Face detector (choose one) detector = SCRFD(model_file="face_detection/scrfd/weights/scrfd_2.5g_bnkps.onnx") # detector = Yolov5Face(model_file="face_detection/yolov5_face/weights/yolov5n-face.pt") # Face recognizer recognizer = iresnet_inference(model_name="r100", path="face_recognition/arcface/weights/arcface_r100.pth", device=device) # Load precomputed face features and names images_names, images_embs = read_features(feature_path="./datasets/face_features/feature") # Mapping of face IDs to names id_face_mapping = {} # Data mapping for tracking information data_mapping = { "raw_image": [], "tracking_ids": [], "detection_bboxes": [], "detection_landmarks": [], "tracking_bboxes": [], } # Global variable to control pause/resume paused = False # Load config def load_config(file_name): with open(file_name, "r") as stream: try: return yaml.safe_load(stream) except yaml.YAMLError as exc: print(exc) # Recognition and face name mapping @torch.no_grad() def get_feature1(face_image): face_preprocess = transforms.Compose([ transforms.ToTensor(), transforms.Resize((112, 112)), transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), ]) face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB) face_image = face_preprocess(face_image).unsqueeze(0).to(device) emb_img_face = recognizer(face_image)[0].cpu().numpy() return emb_img_face / np.linalg.norm(emb_img_face) def add_persons(name, images): images_name = [] images_emb = [] person_face_path = f"./datasets/data/{name}" os.makedirs(person_face_path, exist_ok=True) for image in images: input_image = cv2.imdecode(np.frombuffer(image.read(), np.uint8), cv2.IMREAD_COLOR) bboxes, _ = detector.detect(image=input_image) for i in range(len(bboxes)): x1, y1, x2, y2, _ = bboxes[i] face_image = input_image[y1:y2, x1:x2] cv2.imwrite(os.path.join(person_face_path, f"{len(images_name)}.jpg"), face_image) images_emb.append(get_feature1(face_image)) images_name.append(name) if not images_emb: return "No valid images found." images_emb = np.array(images_emb) images_name = np.array(images_name) features_path = "./datasets/face_features/feature.npz" features = read_features(features_path) if features is not None: old_images_name, old_images_emb = features images_name = np.hstack((old_images_name, images_name)) images_emb = np.vstack((old_images_emb, images_emb)) np.savez_compressed(features_path, images_name=images_name, images_emb=images_emb) return "Successfully added new person!" @app.route('/add_person', methods=['POST']) def add_person(): name = request.form['name'] images = request.files.getlist('images') if not name or not images: return jsonify({"error": "Name and images are required."}), 400 message = add_persons(name, images) return jsonify({"message": message}) @torch.no_grad() def get_feature(face_image): face_preprocess = transforms.Compose([ transforms.ToTensor(), transforms.Resize((112, 112)), transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), ]) # Convert to RGB face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB) # Preprocess image (BGR) face_image = face_preprocess(face_image).unsqueeze(0).to(device) # Inference to get feature emb_img_face = recognizer(face_image).cpu().numpy() # Normalize the feature images_emb = emb_img_face / np.linalg.norm(emb_img_face) return images_emb def recognition(face_image): query_emb = get_feature(face_image) score, id_min = compare_encodings(query_emb, images_embs) name = images_names[id_min] score = score[0] if score < 0.25: return "UNKNOWN", score else: return name, score # Tracking logic with landmarks and face name mapping # def process_tracking(frame, detector, tracker, args, frame_id, fps): # outputs, img_info, bboxes, landmarks = detector.detect_tracking(image=frame) # tracking_tlwhs = [] # tracking_ids = [] # tracking_scores = [] # tracking_bboxes = [] # if outputs is not None: # online_targets = tracker.update(outputs, [img_info["height"], img_info["width"]], (128, 128)) # for i in range(len(online_targets)): # t = online_targets[i] # tlwh = t.tlwh # tid = t.track_id # vertical = tlwh[2] / tlwh[3] > args["aspect_ratio_thresh"] # if tlwh[2] * tlwh[3] > args["min_box_area"] and not vertical: # x1, y1, w, h = tlwh # tracking_bboxes.append([x1, y1, x1 + w, y1 + h]) # tracking_tlwhs.append(tlwh) # tracking_ids.append(tid) # tracking_scores.append(t.score) # tracking_image = plot_tracking( # img_info["raw_img"], # tracking_tlwhs, # tracking_ids, # names=id_face_mapping, # This maps the face names to tracking IDs # frame_id=frame_id + 1, # fps=fps, # ) # else: # tracking_image = img_info["raw_img"] # # For every detected face, recognize the face and map the ID # for i, bbox in enumerate(bboxes): # if i < len(landmarks): # face_alignment = norm_crop(img=img_info["raw_img"], landmark=landmarks[i]) # name, score = recognition(face_image=face_alignment) # id_face_mapping[tracking_ids[i]] = f"{name}:{score:.2f}" # # Print the tracking ID, face name, and detection landmarks # data_mapping["raw_image"] = img_info["raw_img"] # data_mapping["detection_bboxes"] = bboxes # data_mapping["detection_landmarks"] = landmarks # data_mapping["tracking_ids"] = tracking_ids # data_mapping["tracking_bboxes"] = tracking_bboxes # return tracking_image def process_tracking(frame, detector, tracker, args, frame_id, fps): outputs, img_info, bboxes, landmarks = detector.detect_tracking(image=frame) tracking_tlwhs = [] tracking_ids = [] tracking_scores = [] tracking_bboxes = [] if outputs is not None and len(bboxes) > 0: # Perform face tracking online_targets = tracker.update(outputs, [img_info["height"], img_info["width"]], (128, 128)) for i in range(len(online_targets)): t = online_targets[i] tlwh = t.tlwh tid = t.track_id vertical = tlwh[2] / tlwh[3] > args["aspect_ratio_thresh"] # Ensure box area is large enough and not vertical if tlwh[2] * tlwh[3] > args["min_box_area"] and not vertical: x1, y1, w, h = tlwh tracking_bboxes.append([x1, y1, x1 + w, y1 + h]) tracking_tlwhs.append(tlwh) tracking_ids.append(tid) tracking_scores.append(t.score) # Draw tracking results with names tracking_image = plot_tracking( img_info["raw_img"], tracking_tlwhs, tracking_ids, names=id_face_mapping, # Map face names to tracking IDs frame_id=frame_id + 1, fps=fps, ) else: # If no detections, just return the raw image tracking_image = img_info["raw_img"] # For every detected face, perform recognition and map the face ID for i, bbox in enumerate(bboxes): if i < len(landmarks): try: face_alignment = norm_crop(img=img_info["raw_img"], landmark=landmarks[i]) name, score = recognition(face_image=face_alignment) # Ensure we do not get index errors when assigning names to IDs if i < len(tracking_ids): id_face_mapping[tracking_ids[i]] = f"{name}:{score:.2f}" # Draw the name on the live image near the bounding box x1, y1, x2, y2 = tracking_bboxes[i] cv2.putText(tracking_image, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2) else: print(f"Tracking ID {i} not found. Skipping name assignment.") except IndexError as e: print(f"Index error during face recognition: {e}. Skipping this face.") except Exception as e: print(f"Unexpected error during recognition: {e}. Skipping this face.") # Update data mapping for other parts of the system data_mapping["raw_image"] = img_info["raw_img"] data_mapping["detection_bboxes"] = bboxes data_mapping["detection_landmarks"] = landmarks data_mapping["tracking_ids"] = tracking_ids data_mapping["tracking_bboxes"] = tracking_bboxes return tracking_image # Flask route to display video stream @app.route('/video_feed') def video_feed(): return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') # Frame generator def generate_frames(): config_tracking = load_config("./face_tracking/config/config_tracking.yaml") tracker = BYTETracker(args=config_tracking, frame_rate=30) frame_id = 0 cap = cv2.VideoCapture(0) start_time = time.time_ns() frame_count = 0 fps = -1 global paused while True: if not paused: _, img = cap.read() tracking_image = process_tracking(img, detector, tracker, config_tracking, frame_id, fps) frame_count += 1 if frame_count >= 30: fps = 1e9 * frame_count / (time.time_ns() - start_time) frame_count = 0 start_time = time.time_ns() # Encode frame as JPEG ret, buffer = cv2.imencode('.jpg', tracking_image) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') else: time.sleep(0.1) # Small sleep to avoid busy-waiting # Route to toggle pause state @app.route('/toggle_pause', methods=['POST']) def toggle_pause(): global paused paused = not paused return jsonify({'paused': paused}) # Home route @app.route('/') def index(): return render_template('indx.html') @app.route('/data') def data(): return render_template('data.html') # Main entry point if __name__ == '__main__': app.run(debug=True)