| from pathlib import Path |
| from PIL import Image |
| import PyPDF2 |
| from config.settings import settings |
| from typing import Dict |
| import tempfile |
| import os |
|
|
| class FileHandler: |
| def __init__(self): |
| self.temp_dir = Path(settings.TEMP_DIR) |
| self.max_size_mb = settings.MAX_FILE_SIZE_MB |
|
|
| def validate_file(self, uploaded_file) -> Dict: |
| validation = {"valid": False, "error": None, "file_info": None} |
| if not uploaded_file: |
| validation["error"] = "No file" |
| return validation |
| file_size_mb = len(uploaded_file.getbuffer()) / (1024 * 1024) |
| if file_size_mb > self.max_size_mb: |
| validation["error"] = "File too large" |
| return validation |
| file_extension = uploaded_file.name.split('.')[-1].lower() |
| if file_extension not in settings.SUPPORTED_FILE_TYPES: |
| validation["error"] = "Unsupported type" |
| return validation |
| validation["valid"] = True |
| |
| import os |
| filename = os.path.basename(uploaded_file.name) |
| validation["file_info"] = {"name": filename, "size_mb": file_size_mb, "type": file_extension} |
| return validation |
|
|
| def save_uploaded_file(self, uploaded_file, session_id: str) -> str: |
| |
| if not session_id: |
| import uuid |
| session_id = str(uuid.uuid4())[:8] |
| |
| |
| session_dir = self.temp_dir / session_id / "input" |
| session_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| import os |
| import logging |
| logger = logging.getLogger(__name__) |
| |
| filename = os.path.basename(uploaded_file.name) |
| file_path = session_dir / filename |
| |
| logger.info(f"Moving file from Gradio temp: {uploaded_file.name}") |
| logger.info(f"To session directory: {file_path}") |
| |
| with open(file_path, "wb") as f: |
| |
| if hasattr(uploaded_file, 'getbuffer'): |
| f.write(uploaded_file.getbuffer()) |
| elif hasattr(uploaded_file, 'read'): |
| f.write(uploaded_file.read()) |
| else: |
| |
| with open(uploaded_file.name, 'rb') as src: |
| f.write(src.read()) |
| return str(file_path) |
|
|
| def get_file_preview(self, file_path: str, file_type: str) -> str: |
| if file_type == 'pdf': |
| try: |
| with open(file_path, 'rb') as file: |
| reader = PyPDF2.PdfReader(file) |
| if len(reader.pages) > 0: |
| text = reader.pages[0].extract_text() |
| return text[:500] + "..." if len(text) > 500 else text |
| except Exception: |
| return "PDF preview not available" |
| elif file_type == 'txt': |
| try: |
| with open(file_path, 'r', encoding='utf-8') as file: |
| text = file.read() |
| return text[:500] + "..." if len(text) > 500 else text |
| except Exception: |
| return "Text preview not available" |
| |
| return "Preview not available" |
|
|
| def cleanup_temp_files(self): |
| """Clean up old temporary files.""" |
| try: |
| import time |
| current_time = time.time() |
| |
| for session_dir in self.temp_dir.iterdir(): |
| if session_dir.is_dir(): |
| |
| dir_age = current_time - session_dir.stat().st_mtime |
| if dir_age > 24 * 3600: |
| import shutil |
| shutil.rmtree(session_dir) |
| except Exception: |
| pass |