ishaq101's picture
[NOTICKET] Feat: score card
028a629
from externals.databases.database import get_db
from fastapi import APIRouter, UploadFile, File, Depends, status, HTTPException
from interfaces.api.deps import get_current_user
from services.knowledge.knowledge_setup import KnowledgeService
from typing import List
from utils.logger import get_logger
logger = get_logger("File API")
router = APIRouter(
prefix="/file",
tags=["File"],
)
@router.post(
"/upload",
status_code=status.HTTP_201_CREATED,
summary="Upload multiple PDF knowledge files",
)
async def upload_knowledge_many(
files: List[UploadFile] = File(...),
db=Depends(get_db),
current_user=Depends(get_current_user),
):
"""
Upload multiple PDF files to Azure Blob Storage and log metadata.
"""
knowledge_service = KnowledgeService(
db=db,
user=current_user,
)
results = []
for file in files:
if file.content_type != "application/pdf":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File '{file.filename}' is not a PDF",
)
else:
result = await knowledge_service.file.upload(file)
results.append(result)
return {
"message": "Upload completed",
"uploaded_by": current_user.email,
"files": results,
}
@router.get(
"/user/{user_id}",
status_code=status.HTTP_200_OK,
summary="Get file metadata by user ID",
)
async def get_file_metadata_by_user(
user_id: str,
db=Depends(get_db),
current_user=Depends(get_current_user),
):
"""
Retrieve all file metadata uploaded by a specific user.
"""
knowledge_service = KnowledgeService(
db=db,
user=current_user,
)
files = await knowledge_service.file.get_files_by_user(user_id)
return {
"message": "File metadata retrieved successfully",
"requested_by": current_user.email,
"files": files,
}
@router.delete(
"/{filename}",
status_code=status.HTTP_200_OK,
summary="Delete knowledge file by filename",
)
async def delete_knowledge_file(
filename: str,
db=Depends(get_db),
current_user=Depends(get_current_user),
):
"""
Delete a PDF knowledge file from Azure Blob and database.
"""
knowledge_service = KnowledgeService(
db=db,
user=current_user,
)
await knowledge_service.delete.delete(filename)
return {
"message": "File deleted successfully",
"filename": filename,
"deleted_by": current_user.email,
}
@router.get(
"/score_card",
status_code=status.HTTP_200_OK,
summary="Get score card data for dashboard summary",
)
async def get_score_card(
db=Depends(get_db),
current_user=Depends(get_current_user),
):
"""
Retrieve all file metadata uploaded by a specific user.
Return:
- total file
- total profile extracted
- percent profile extracted
"""
try:
knowledge_service = KnowledgeService(
db=db,
user=current_user,
)
data = await knowledge_service.score_card.get_score_card()
return {
"status": "success",
"message": "Score card retrieved successfully",
"data": data,
}
except Exception as E:
logger.error(f"get score card error: {E}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"get score card error: {E}"
)