detection / app.py
Mayank395's picture
Upload 11 files
5615f0c verified
from flask import Flask, render_template, Response, jsonify,request
import os
import threading
import time
import cv2
import numpy as np
import torch
import yaml
from torchvision import transforms
from face_alignment.alignment import norm_crop
from face_detection.scrfd.detector import SCRFD
from face_detection.yolov5_face.detector import Yolov5Face
from face_recognition.arcface.model import iresnet_inference
from face_recognition.arcface.utils import compare_encodings, read_features
from face_tracking.tracker.byte_tracker import BYTETracker
from face_tracking.tracker.visualize import plot_tracking
app = Flask(__name__)
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
# Face detector (choose one)
detector = SCRFD(model_file="face_detection/scrfd/weights/scrfd_2.5g_bnkps.onnx")
# detector = Yolov5Face(model_file="face_detection/yolov5_face/weights/yolov5n-face.pt")
# Face recognizer
recognizer = iresnet_inference(model_name="r100", path="face_recognition/arcface/weights/arcface_r100.pth", device=device)
# Load precomputed face features and names
images_names, images_embs = read_features(feature_path="./datasets/face_features/feature")
# Mapping of face IDs to names
id_face_mapping = {}
# Data mapping for tracking information
data_mapping = {
"raw_image": [],
"tracking_ids": [],
"detection_bboxes": [],
"detection_landmarks": [],
"tracking_bboxes": [],
}
# Global variable to control pause/resume
paused = False
# Load config
def load_config(file_name):
with open(file_name, "r") as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
# Recognition and face name mapping
@torch.no_grad()
def get_feature1(face_image):
face_preprocess = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((112, 112)),
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])
face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
face_image = face_preprocess(face_image).unsqueeze(0).to(device)
emb_img_face = recognizer(face_image)[0].cpu().numpy()
return emb_img_face / np.linalg.norm(emb_img_face)
def add_persons(name, images):
images_name = []
images_emb = []
person_face_path = f"./datasets/data/{name}"
os.makedirs(person_face_path, exist_ok=True)
for image in images:
input_image = cv2.imdecode(np.frombuffer(image.read(), np.uint8), cv2.IMREAD_COLOR)
bboxes, _ = detector.detect(image=input_image)
for i in range(len(bboxes)):
x1, y1, x2, y2, _ = bboxes[i]
face_image = input_image[y1:y2, x1:x2]
cv2.imwrite(os.path.join(person_face_path, f"{len(images_name)}.jpg"), face_image)
images_emb.append(get_feature1(face_image))
images_name.append(name)
if not images_emb:
return "No valid images found."
images_emb = np.array(images_emb)
images_name = np.array(images_name)
features_path = "./datasets/face_features/feature.npz"
features = read_features(features_path)
if features is not None:
old_images_name, old_images_emb = features
images_name = np.hstack((old_images_name, images_name))
images_emb = np.vstack((old_images_emb, images_emb))
np.savez_compressed(features_path, images_name=images_name, images_emb=images_emb)
return "Successfully added new person!"
@app.route('/add_person', methods=['POST'])
def add_person():
name = request.form['name']
images = request.files.getlist('images')
if not name or not images:
return jsonify({"error": "Name and images are required."}), 400
message = add_persons(name, images)
return jsonify({"message": message})
@torch.no_grad()
def get_feature(face_image):
face_preprocess = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((112, 112)),
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])
# Convert to RGB
face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
# Preprocess image (BGR)
face_image = face_preprocess(face_image).unsqueeze(0).to(device)
# Inference to get feature
emb_img_face = recognizer(face_image).cpu().numpy()
# Normalize the feature
images_emb = emb_img_face / np.linalg.norm(emb_img_face)
return images_emb
def recognition(face_image):
query_emb = get_feature(face_image)
score, id_min = compare_encodings(query_emb, images_embs)
name = images_names[id_min]
score = score[0]
if score < 0.25:
return "UNKNOWN", score
else:
return name, score
# Tracking logic with landmarks and face name mapping
# def process_tracking(frame, detector, tracker, args, frame_id, fps):
# outputs, img_info, bboxes, landmarks = detector.detect_tracking(image=frame)
# tracking_tlwhs = []
# tracking_ids = []
# tracking_scores = []
# tracking_bboxes = []
# if outputs is not None:
# online_targets = tracker.update(outputs, [img_info["height"], img_info["width"]], (128, 128))
# for i in range(len(online_targets)):
# t = online_targets[i]
# tlwh = t.tlwh
# tid = t.track_id
# vertical = tlwh[2] / tlwh[3] > args["aspect_ratio_thresh"]
# if tlwh[2] * tlwh[3] > args["min_box_area"] and not vertical:
# x1, y1, w, h = tlwh
# tracking_bboxes.append([x1, y1, x1 + w, y1 + h])
# tracking_tlwhs.append(tlwh)
# tracking_ids.append(tid)
# tracking_scores.append(t.score)
# tracking_image = plot_tracking(
# img_info["raw_img"],
# tracking_tlwhs,
# tracking_ids,
# names=id_face_mapping, # This maps the face names to tracking IDs
# frame_id=frame_id + 1,
# fps=fps,
# )
# else:
# tracking_image = img_info["raw_img"]
# # For every detected face, recognize the face and map the ID
# for i, bbox in enumerate(bboxes):
# if i < len(landmarks):
# face_alignment = norm_crop(img=img_info["raw_img"], landmark=landmarks[i])
# name, score = recognition(face_image=face_alignment)
# id_face_mapping[tracking_ids[i]] = f"{name}:{score:.2f}"
# # Print the tracking ID, face name, and detection landmarks
# data_mapping["raw_image"] = img_info["raw_img"]
# data_mapping["detection_bboxes"] = bboxes
# data_mapping["detection_landmarks"] = landmarks
# data_mapping["tracking_ids"] = tracking_ids
# data_mapping["tracking_bboxes"] = tracking_bboxes
# return tracking_image
def process_tracking(frame, detector, tracker, args, frame_id, fps):
outputs, img_info, bboxes, landmarks = detector.detect_tracking(image=frame)
tracking_tlwhs = []
tracking_ids = []
tracking_scores = []
tracking_bboxes = []
if outputs is not None and len(bboxes) > 0:
# Perform face tracking
online_targets = tracker.update(outputs, [img_info["height"], img_info["width"]], (128, 128))
for i in range(len(online_targets)):
t = online_targets[i]
tlwh = t.tlwh
tid = t.track_id
vertical = tlwh[2] / tlwh[3] > args["aspect_ratio_thresh"]
# Ensure box area is large enough and not vertical
if tlwh[2] * tlwh[3] > args["min_box_area"] and not vertical:
x1, y1, w, h = tlwh
tracking_bboxes.append([x1, y1, x1 + w, y1 + h])
tracking_tlwhs.append(tlwh)
tracking_ids.append(tid)
tracking_scores.append(t.score)
# Draw tracking results with names
tracking_image = plot_tracking(
img_info["raw_img"],
tracking_tlwhs,
tracking_ids,
names=id_face_mapping, # Map face names to tracking IDs
frame_id=frame_id + 1,
fps=fps,
)
else:
# If no detections, just return the raw image
tracking_image = img_info["raw_img"]
# For every detected face, perform recognition and map the face ID
for i, bbox in enumerate(bboxes):
if i < len(landmarks):
try:
face_alignment = norm_crop(img=img_info["raw_img"], landmark=landmarks[i])
name, score = recognition(face_image=face_alignment)
# Ensure we do not get index errors when assigning names to IDs
if i < len(tracking_ids):
id_face_mapping[tracking_ids[i]] = f"{name}:{score:.2f}"
# Draw the name on the live image near the bounding box
x1, y1, x2, y2 = tracking_bboxes[i]
cv2.putText(tracking_image, (int(x1), int(y1) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
else:
print(f"Tracking ID {i} not found. Skipping name assignment.")
except IndexError as e:
print(f"Index error during face recognition: {e}. Skipping this face.")
except Exception as e:
print(f"Unexpected error during recognition: {e}. Skipping this face.")
# Update data mapping for other parts of the system
data_mapping["raw_image"] = img_info["raw_img"]
data_mapping["detection_bboxes"] = bboxes
data_mapping["detection_landmarks"] = landmarks
data_mapping["tracking_ids"] = tracking_ids
data_mapping["tracking_bboxes"] = tracking_bboxes
return tracking_image
# Flask route to display video stream
@app.route('/video_feed')
def video_feed():
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
# Frame generator
def generate_frames():
config_tracking = load_config("./face_tracking/config/config_tracking.yaml")
tracker = BYTETracker(args=config_tracking, frame_rate=30)
frame_id = 0
cap = cv2.VideoCapture(0)
start_time = time.time_ns()
frame_count = 0
fps = -1
global paused
while True:
if not paused:
_, img = cap.read()
tracking_image = process_tracking(img, detector, tracker, config_tracking, frame_id, fps)
frame_count += 1
if frame_count >= 30:
fps = 1e9 * frame_count / (time.time_ns() - start_time)
frame_count = 0
start_time = time.time_ns()
# Encode frame as JPEG
ret, buffer = cv2.imencode('.jpg', tracking_image)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
else:
time.sleep(0.1) # Small sleep to avoid busy-waiting
# Route to toggle pause state
@app.route('/toggle_pause', methods=['POST'])
def toggle_pause():
global paused
paused = not paused
return jsonify({'paused': paused})
# Home route
@app.route('/')
def index():
return render_template('indx.html')
@app.route('/data')
def data():
return render_template('data.html')
# Main entry point
if __name__ == '__main__':
app.run(debug=True)