|
|
| from datetime import datetime |
| from fastapi import FastAPI, HTTPException, BackgroundTasks |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import FileResponse |
| from pydantic import BaseModel |
| from google import genai |
| from dotenv import load_dotenv |
| import os |
| import json |
| import logging |
| import re |
| import mistune |
| import requests |
| from data_service import DataService |
|
|
| |
| load_dotenv() |
| load_dotenv("../.env.local") |
|
|
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI(title="3GPP Innovation Backend") |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
| |
| data_service = DataService() |
|
|
|
|
| def ask_gemini(prompt, content): |
| MAX_LEN = 10000 |
|
|
| if len(prompt) + len(content) <= MAX_LEN: |
| client = genai.Client(api_key=GEMINI_API_KEY) |
| response = client.models.generate_content( |
| model="gemma-3-27b-it", |
| contents=prompt + "\n\n" + content |
| ) |
| return response.text |
|
|
| chunk = content[:MAX_LEN - len(prompt)] |
| rest = content[MAX_LEN - len(prompt):] |
|
|
| first_answer = ask_gemini(prompt, chunk) |
| |
|
|
| return first_answer |
|
|
| PROCESS_PROMPT = """ |
| Task : |
| Using the text provided, |
| create chunk that are dense in relevant information and minimize near-duplicate or |
| loosely related passages, provide a paragraph on whats new to this document using |
| the SUGGESTION START and END. |
| """ |
|
|
| def format_answer(answer): |
| return f"We obtained the following methodology:"+answer["methodology"]+"\n\nThe context is :"+answer["context"]+"\n\nThe problem description is :"+answer["problem"] |
|
|
| def extract_json(text: str) -> dict: |
| match = re.search(r'\{.*\}', text, re.DOTALL) |
| if not match: |
| raise ValueError("Aucun JSON trouvé") |
| return json.loads(match.group()) |
|
|
| |
|
|
| class ProcessRequest(BaseModel): |
| file_id: str |
| filename: str |
| working_group: str |
| meeting: str |
| type: str |
| status: str |
| agenda_item: str |
| url: str |
|
|
| class InnovationResponse(BaseModel): |
| id: str |
| file_name: str |
| answer: str |
| classification: str |
|
|
| class PatternResponse(BaseModel): |
| pattern_id: int |
| pattern_name: str |
| prompt: str |
|
|
| class AnalyzeRequest(BaseModel): |
| file_id: str = None |
| text: str = None |
| pattern_id: int |
|
|
| class AnalyzeResponse(BaseModel): |
| id: int |
| file_name: str |
| content: str |
| methodology: str |
| context: str |
| problem: str |
| pattern_name: str |
|
|
| class ClassificationRequest(BaseModel): |
| result_id: int |
| classification: str |
|
|
| class ResultResponse(BaseModel): |
| id: int |
| file_name: str |
| content: str |
| classification: str |
| pattern_name: str |
| methodology: str |
| context: str |
| problem: str |
|
|
| |
|
|
| def fetch_text_content(req: AnalyzeRequest): |
| """ |
| fetches text content from request or database. |
| """ |
| if req.text: |
| return req.text |
| elif req.file_id: |
| content = data_service.get_file_content(req.file_id) |
| if content: |
| return content |
| else: |
| refined = data_service.get_refined_output(req.file_id) |
| if refined: |
| return refined |
| return None |
|
|
| |
|
|
| @app.get("/get_all") |
| def get_all(): |
| return data_service.get_all_files() |
|
|
| @app.get("/patterns", response_model=list[PatternResponse]) |
| def get_patterns(): |
| return data_service.get_patterns() |
|
|
| class PatternRequest(BaseModel): |
| pattern_name: str |
| prompt: str |
|
|
| @app.post("/patterns", response_model=PatternResponse) |
| def create_pattern(req: PatternRequest): |
| try: |
| pattern_id = data_service.add_pattern(req.pattern_name, req.prompt) |
| return { |
| "pattern_id": pattern_id, |
| "pattern_name": req.pattern_name, |
| "prompt": req.prompt |
| } |
| except Exception as e: |
| logger.error(f"Error creating pattern: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.put("/patterns/{pattern_id}", response_model=PatternResponse) |
| def update_pattern(pattern_id: int, req: PatternRequest): |
| try: |
| updated = data_service.update_pattern(pattern_id, req.pattern_name, req.prompt) |
| if not updated: |
| raise HTTPException(status_code=404, detail="Pattern not found") |
|
|
| return { |
| "pattern_id": pattern_id, |
| "pattern_name": req.pattern_name, |
| "prompt": req.prompt |
| } |
| except HTTPException as he: |
| raise he |
| except Exception as e: |
| logger.error(f"Error updating pattern: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/analyze", response_model=AnalyzeResponse) |
| async def analyze_content(req: AnalyzeRequest): |
| print("Start of analyse") |
| |
| try: |
| |
| existing_result, refined_id, file_name = data_service.get_existing_result(req.file_id) |
| |
| if existing_result: |
| |
| return { |
| "id": existing_result['result_id'], |
| "file_name": file_name, |
| "content": existing_result['content'], |
| "methodology": existing_result['methodology'], |
| "context": existing_result['context'], |
| "problem": existing_result['problem'], |
| "pattern_name": existing_result['pattern_name'] |
| } |
|
|
| |
| print('Performing new analysis') |
| text_content = fetch_text_content(req) |
| |
| if not text_content: |
| raise HTTPException(status_code=400, detail="No content found to analyze") |
|
|
| pattern = data_service.get_pattern(req.pattern_id) |
| if not pattern: |
| raise HTTPException(status_code=404, detail="Pattern not found") |
|
|
| pattern_name = pattern['pattern_name'] |
| pattern_prompt = pattern['prompt'] |
| |
| |
| response = ask_gemini(f"Pattern: {pattern_name}\nPrompt: {pattern_prompt}\n\nContext:\n", text_content) |
| json_response = extract_json(response) |
| answer = format_answer(json_response) |
| |
| methodology = json_response["methodology"] |
| context = json_response["context"] |
| problem = json_response["problem"] |
|
|
| |
| |
| |
| |
| |
| |
| if not refined_id and req.file_id: |
| ref_row = data_service.get_refined_by_file_id(req.file_id) |
| if ref_row: |
| refined_id = ref_row["refined_id"] |
|
|
| result_id = data_service.add_result(req.pattern_id, refined_id, answer, methodology, context, problem) |
|
|
| print("End of analyse") |
| return { |
| "id": result_id, |
| "file_name": file_name, |
| "content": answer, |
| "methodology": methodology, |
| "context": context, |
| "problem": problem, |
| "pattern_name": pattern_name |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error during analysis: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/classify") |
| def classify_result(req: ClassificationRequest): |
| try: |
| updated = data_service.update_classification(req.result_id, req.classification) |
| if not updated: |
| raise HTTPException(status_code=404, detail="Result not found") |
| return {"id": req.result_id, "status": "updated"} |
| except Exception as e: |
| logger.error(f"Error updating classification: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/results", response_model=list[ResultResponse]) |
| def get_results(): |
| try: |
| return data_service.get_all_results_joined() |
| except Exception as e: |
| logger.error(f"Error fetching results: {e}") |
| return [] |
|
|
|
|
| @app.post("/process", response_model=InnovationResponse) |
| async def process_document(req: ProcessRequest): |
| try: |
| existing_content = data_service.get_file_content(req.file_id) |
| |
| text_content = "" |
| content = "" |
|
|
| if existing_content: |
| logger.info(f"File {req.file_id} found in DB.") |
| text_content = existing_content |
| else: |
| try: |
| print(req.url) |
| hf_response = requests.post( |
| 'https://organizedprogrammers-docxtract.hf.space/docs/extract_text_from_url', |
| json={"url": req.url}, |
| timeout=30 |
| ) |
|
|
| if hf_response.status_code == 200: |
| data = hf_response.json() |
| text_content = data.get('text') or data.get('content') or "" |
| else: |
| logger.error(f"Failed to fetch content from HF: {hf_response.text}") |
| text_content = "Extraction failed." |
| except Exception as e: |
| logger.error(f"Error fetching content: {e}") |
| text_content = "Extraction error." |
|
|
| print(req) |
| |
| data_service.add_file({ |
| "file_id": req.file_id, |
| "working_group": req.working_group, |
| "meeting": req.meeting, |
| "type": req.type, |
| "status": req.status, |
| "agenda_item": req.agenda_item, |
| "content": text_content, |
| "filename": req.filename, |
| "timestamp": datetime.now().isoformat() |
| }) |
|
|
| refined_output = data_service.get_refined_output(req.file_id) |
| |
| md = mistune.create_markdown() |
| if refined_output: |
| content = md(refined_output) |
| else: |
| print(text_content) |
| answer = ask_gemini(PROCESS_PROMPT, text_content) |
|
|
| content = md(answer) |
|
|
| data_service.add_refined(req.file_id, answer) |
| |
| return { |
| "id": req.file_id, |
| "file_name": req.filename, |
| "answer": content, |
| "classification": "UNCLASSIFIED", |
| } |
|
|
| except Exception as e: |
| logger.error(f"Error processing: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
| static_dir = "static" |
| if os.path.exists(static_dir): |
| |
| if os.path.exists(os.path.join(static_dir, "assets")): |
| app.mount("/assets", StaticFiles(directory=os.path.join(static_dir, "assets")), name="assets") |
| |
| |
| @app.get("/{full_path:path}") |
| async def serve_frontend(full_path: str): |
| |
| file_path = os.path.join(static_dir, full_path) |
| if os.path.isfile(file_path): |
| return FileResponse(file_path) |
| |
| |
| return FileResponse(os.path.join(static_dir, "index.html")) |