| from flask import Flask, render_template, Response, jsonify,request
|
| import os
|
| import threading
|
| import time
|
| import cv2
|
| import numpy as np
|
| import torch
|
| import yaml
|
| from torchvision import transforms
|
| from face_alignment.alignment import norm_crop
|
| from face_detection.scrfd.detector import SCRFD
|
| from face_detection.yolov5_face.detector import Yolov5Face
|
| from face_recognition.arcface.model import iresnet_inference
|
| from face_recognition.arcface.utils import compare_encodings, read_features
|
| from face_tracking.tracker.byte_tracker import BYTETracker
|
| from face_tracking.tracker.visualize import plot_tracking
|
|
|
| app = Flask(__name__)
|
|
|
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
| print(device)
|
|
|
|
|
| detector = SCRFD(model_file="face_detection/scrfd/weights/scrfd_2.5g_bnkps.onnx")
|
|
|
|
|
|
|
| recognizer = iresnet_inference(model_name="r100", path="face_recognition/arcface/weights/arcface_r100.pth", device=device)
|
|
|
|
|
| images_names, images_embs = read_features(feature_path="./datasets/face_features/feature")
|
|
|
|
|
| id_face_mapping = {}
|
|
|
|
|
| data_mapping = {
|
| "raw_image": [],
|
| "tracking_ids": [],
|
| "detection_bboxes": [],
|
| "detection_landmarks": [],
|
| "tracking_bboxes": [],
|
| }
|
|
|
|
|
| paused = False
|
|
|
|
|
| def load_config(file_name):
|
| with open(file_name, "r") as stream:
|
| try:
|
| return yaml.safe_load(stream)
|
| except yaml.YAMLError as exc:
|
| print(exc)
|
|
|
|
|
| @torch.no_grad()
|
| def get_feature1(face_image):
|
| face_preprocess = transforms.Compose([
|
| transforms.ToTensor(),
|
| transforms.Resize((112, 112)),
|
| transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
|
| ])
|
|
|
| face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
|
| face_image = face_preprocess(face_image).unsqueeze(0).to(device)
|
| emb_img_face = recognizer(face_image)[0].cpu().numpy()
|
| return emb_img_face / np.linalg.norm(emb_img_face)
|
|
|
| def add_persons(name, images):
|
| images_name = []
|
| images_emb = []
|
|
|
| person_face_path = f"./datasets/data/{name}"
|
| os.makedirs(person_face_path, exist_ok=True)
|
|
|
| for image in images:
|
| input_image = cv2.imdecode(np.frombuffer(image.read(), np.uint8), cv2.IMREAD_COLOR)
|
| bboxes, _ = detector.detect(image=input_image)
|
|
|
| for i in range(len(bboxes)):
|
| x1, y1, x2, y2, _ = bboxes[i]
|
| face_image = input_image[y1:y2, x1:x2]
|
| cv2.imwrite(os.path.join(person_face_path, f"{len(images_name)}.jpg"), face_image)
|
|
|
| images_emb.append(get_feature1(face_image))
|
| images_name.append(name)
|
|
|
| if not images_emb:
|
| return "No valid images found."
|
|
|
| images_emb = np.array(images_emb)
|
| images_name = np.array(images_name)
|
|
|
| features_path = "./datasets/face_features/feature.npz"
|
| features = read_features(features_path)
|
|
|
| if features is not None:
|
| old_images_name, old_images_emb = features
|
| images_name = np.hstack((old_images_name, images_name))
|
| images_emb = np.vstack((old_images_emb, images_emb))
|
|
|
| np.savez_compressed(features_path, images_name=images_name, images_emb=images_emb)
|
| return "Successfully added new person!"
|
|
|
| @app.route('/add_person', methods=['POST'])
|
| def add_person():
|
| name = request.form['name']
|
| images = request.files.getlist('images')
|
|
|
| if not name or not images:
|
| return jsonify({"error": "Name and images are required."}), 400
|
|
|
| message = add_persons(name, images)
|
| return jsonify({"message": message})
|
| @torch.no_grad()
|
| def get_feature(face_image):
|
| face_preprocess = transforms.Compose([
|
| transforms.ToTensor(),
|
| transforms.Resize((112, 112)),
|
| transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
|
| ])
|
|
|
|
|
| face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
|
|
|
|
|
| face_image = face_preprocess(face_image).unsqueeze(0).to(device)
|
|
|
|
|
| emb_img_face = recognizer(face_image).cpu().numpy()
|
|
|
|
|
| images_emb = emb_img_face / np.linalg.norm(emb_img_face)
|
|
|
| return images_emb
|
|
|
| def recognition(face_image):
|
| query_emb = get_feature(face_image)
|
| score, id_min = compare_encodings(query_emb, images_embs)
|
| name = images_names[id_min]
|
| score = score[0]
|
|
|
| if score < 0.25:
|
| return "UNKNOWN", score
|
| else:
|
| return name, score
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def process_tracking(frame, detector, tracker, args, frame_id, fps):
|
| outputs, img_info, bboxes, landmarks = detector.detect_tracking(image=frame)
|
|
|
| tracking_tlwhs = []
|
| tracking_ids = []
|
| tracking_scores = []
|
| tracking_bboxes = []
|
|
|
| if outputs is not None and len(bboxes) > 0:
|
|
|
| online_targets = tracker.update(outputs, [img_info["height"], img_info["width"]], (128, 128))
|
|
|
| for i in range(len(online_targets)):
|
| t = online_targets[i]
|
| tlwh = t.tlwh
|
| tid = t.track_id
|
| vertical = tlwh[2] / tlwh[3] > args["aspect_ratio_thresh"]
|
|
|
|
|
| if tlwh[2] * tlwh[3] > args["min_box_area"] and not vertical:
|
| x1, y1, w, h = tlwh
|
| tracking_bboxes.append([x1, y1, x1 + w, y1 + h])
|
| tracking_tlwhs.append(tlwh)
|
| tracking_ids.append(tid)
|
| tracking_scores.append(t.score)
|
|
|
|
|
| tracking_image = plot_tracking(
|
| img_info["raw_img"],
|
| tracking_tlwhs,
|
| tracking_ids,
|
| names=id_face_mapping,
|
| frame_id=frame_id + 1,
|
| fps=fps,
|
| )
|
| else:
|
|
|
| tracking_image = img_info["raw_img"]
|
|
|
|
|
| for i, bbox in enumerate(bboxes):
|
| if i < len(landmarks):
|
| try:
|
| face_alignment = norm_crop(img=img_info["raw_img"], landmark=landmarks[i])
|
| name, score = recognition(face_image=face_alignment)
|
|
|
|
|
| if i < len(tracking_ids):
|
| id_face_mapping[tracking_ids[i]] = f"{name}:{score:.2f}"
|
|
|
|
|
| x1, y1, x2, y2 = tracking_bboxes[i]
|
| cv2.putText(tracking_image, (int(x1), int(y1) - 10),
|
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
|
| else:
|
| print(f"Tracking ID {i} not found. Skipping name assignment.")
|
| except IndexError as e:
|
| print(f"Index error during face recognition: {e}. Skipping this face.")
|
| except Exception as e:
|
| print(f"Unexpected error during recognition: {e}. Skipping this face.")
|
|
|
|
|
| data_mapping["raw_image"] = img_info["raw_img"]
|
| data_mapping["detection_bboxes"] = bboxes
|
| data_mapping["detection_landmarks"] = landmarks
|
| data_mapping["tracking_ids"] = tracking_ids
|
| data_mapping["tracking_bboxes"] = tracking_bboxes
|
|
|
| return tracking_image
|
|
|
|
|
| @app.route('/video_feed')
|
| def video_feed():
|
| return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
|
|
|
|
|
| def generate_frames():
|
| config_tracking = load_config("./face_tracking/config/config_tracking.yaml")
|
| tracker = BYTETracker(args=config_tracking, frame_rate=30)
|
| frame_id = 0
|
| cap = cv2.VideoCapture(0)
|
| start_time = time.time_ns()
|
| frame_count = 0
|
| fps = -1
|
|
|
| global paused
|
|
|
| while True:
|
| if not paused:
|
| _, img = cap.read()
|
| tracking_image = process_tracking(img, detector, tracker, config_tracking, frame_id, fps)
|
|
|
| frame_count += 1
|
| if frame_count >= 30:
|
| fps = 1e9 * frame_count / (time.time_ns() - start_time)
|
| frame_count = 0
|
| start_time = time.time_ns()
|
|
|
|
|
| ret, buffer = cv2.imencode('.jpg', tracking_image)
|
| frame = buffer.tobytes()
|
|
|
| yield (b'--frame\r\n'
|
| b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
|
| else:
|
| time.sleep(0.1)
|
|
|
|
|
| @app.route('/toggle_pause', methods=['POST'])
|
| def toggle_pause():
|
| global paused
|
| paused = not paused
|
| return jsonify({'paused': paused})
|
|
|
|
|
| @app.route('/')
|
| def index():
|
| return render_template('indx.html')
|
| @app.route('/data')
|
| def data():
|
| return render_template('data.html')
|
|
|
| if __name__ == '__main__':
|
| app.run(debug=True)
|
|
|