| import numpy as np |
| from deepface import DeepFace |
| from PIL import Image |
| from io import BytesIO |
| from typing import Union, List |
| import os |
| from fastapi import HTTPException |
| import uuid |
| import cv2 |
| from .preprocess import load_dataset, RegNet |
| from sklearn.linear_model import LogisticRegression |
| from concrete.ml.torch.compile import compile_torch_model |
| import logging |
|
|
|
|
| class InferenceService: |
| def __init__(self): |
| self.model = None |
|
|
| def save_video(self, video_content: bytes, user_id: str) -> None: |
| """ |
| Extract frames from the video content asynchronously using FFmpeg. |
| """ |
| try: |
| video_id = str(uuid.uuid4()) |
| os.makedirs(f"temp/{user_id}", exist_ok=True) |
| with open(file=f"temp/{user_id}/video.mp4", mode="wb") as f: |
| f.write(video_content) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error saving video: {e}") |
|
|
| def save_frames(self,user_id: str) -> str: |
| """ |
| Save 10 random frames from the video content using cv2 and return the path to the frames. |
| """ |
| video_path = f"temp/{user_id}/video.mp4" |
| if not os.path.exists(video_path): |
| raise HTTPException(status_code=404, detail="Video not found") |
|
|
| vidcap = cv2.VideoCapture(video_path) |
| success, image = vidcap.read() |
| count = 0 |
| while success and count < 10: |
| cv2.imwrite( |
| f"temp/{user_id}/frame%d.jpg" % count, image |
| ) |
| success, image = vidcap.read() |
| count += 1 |
| return f"temp/{user_id}/" |
|
|
| def train_model(self, frames_path: str, user_id: str) -> str: |
| """ |
| Train a model on the frames and return the path to the model. |
| """ |
| """ |
| embeddings, labels = load_dataset(frames_path, cache=True) |
| embeddings = (embeddings - np.mean(embeddings, axis=0)) / np.std( |
| embeddings, axis=0 |
| ) |
| model = LogisticRegression(C=1 / 5) |
| model.fit(embeddings, y=labels) |
| nb_sample = 100 |
| W = model.coef_ |
| b = model.intercept_.reshape(-1, 1) |
| X_train_rand = np.random.normal(0, 1, [nb_sample, embeddings.shape[1]]) |
| W_rand = np.random.normal(0, 1, [nb_sample, W.shape[1]]) |
| X_rand_stack = np.hstack([X_train_rand, W_rand]) |
| reg_net = RegNet(b) |
| quantized_module = compile_torch_model( |
| reg_net, # our model |
| X_rand_stack, # a representative input-set to be used for both quantization and compilation |
| n_bits=6, |
| rounding_threshold_bits={"n_bits": 6, "method": "approximate"}, |
| ) |
| save_model_path: str = f"temp/model/{user_id}.zip" |
| quantized_module.fhe_circuit.server.save(save_model_path) |
| return save_model_path |
| """ |
| return "temp/model/fake_id.zip" |
|
|
| def push_model(self, model_path: str, user_id: str) -> None: |
| """ |
| Push the model to the servrer |
| """ |
| |
|
|