Spaces:
Runtime error
Runtime error
| """ | |
| Database Module - MongoDB integration for file metadata storage | |
| Uses Motor async driver for high-performance operations | |
| """ | |
| import logging | |
| import os | |
| from datetime import datetime | |
| from typing import Optional, List, Dict | |
| from dataclasses import dataclass, field, asdict | |
| from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase | |
| from pymongo.errors import DuplicateKeyError | |
| logger = logging.getLogger(__name__) | |
| class FileMetadata: | |
| """File metadata structure""" | |
| unique_id: str | |
| filename: str | |
| total_size: int | |
| parts: List[Dict] | |
| part_count: int | |
| uploaded_at: datetime = field(default_factory=datetime.utcnow) | |
| def to_dict(self) -> dict: | |
| """Convert to dictionary for MongoDB""" | |
| data = asdict(self) | |
| data["uploaded_at"] = self.uploaded_at.isoformat() | |
| return data | |
| def from_dict(cls, data: dict) -> "FileMetadata": | |
| """Create from MongoDB document""" | |
| data["uploaded_at"] = datetime.fromisoformat(data["uploaded_at"]) | |
| return cls(**data) | |
| class Database: | |
| """MongoDB database manager""" | |
| def __init__(self): | |
| self.client: Optional[AsyncIOMotorClient] = None | |
| self.db: Optional[AsyncIOMotorDatabase] = None | |
| self.files_collection = None | |
| self.mongo_uri = os.getenv("MONGO_URI") | |
| self.database_name = os.getenv("MONGO_DATABASE", "telegram_streamer") | |
| async def connect(self): | |
| """Connect to MongoDB""" | |
| if not self.mongo_uri: | |
| raise ValueError("MONGO_URI environment variable is required") | |
| try: | |
| logger.info("Connecting to MongoDB...") | |
| self.client = AsyncIOMotorClient( | |
| self.mongo_uri, | |
| serverSelectionTimeoutMS=5000 | |
| ) | |
| # Test connection | |
| await self.client.admin.command('ping') | |
| self.db = self.client[self.database_name] | |
| self.files_collection = self.db["files"] | |
| # Create indexes | |
| await self._create_indexes() | |
| logger.info(f"Connected to MongoDB: {self.database_name}") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to MongoDB: {str(e)}") | |
| raise | |
| async def _create_indexes(self): | |
| """Create database indexes for performance""" | |
| try: | |
| # Unique index on unique_id | |
| await self.files_collection.create_index( | |
| "unique_id", | |
| unique=True, | |
| name="unique_id_index" | |
| ) | |
| # Index on uploaded_at for cleanup queries | |
| await self.files_collection.create_index( | |
| "uploaded_at", | |
| name="uploaded_at_index" | |
| ) | |
| logger.info("Database indexes created") | |
| except Exception as e: | |
| logger.warning(f"Failed to create indexes: {str(e)}") | |
| async def disconnect(self): | |
| """Disconnect from MongoDB""" | |
| if self.client: | |
| self.client.close() | |
| logger.info("Disconnected from MongoDB") | |
| def is_connected(self) -> bool: | |
| """Check if database is connected""" | |
| return self.client is not None and self.db is not None | |
| async def save_file_metadata(self, metadata: FileMetadata) -> bool: | |
| """Save file metadata to database""" | |
| try: | |
| await self.files_collection.insert_one(metadata.to_dict()) | |
| logger.info(f"Saved metadata: unique_id={metadata.unique_id}") | |
| return True | |
| except DuplicateKeyError: | |
| logger.error(f"Duplicate unique_id: {metadata.unique_id}") | |
| raise ValueError("File with this unique_id already exists") | |
| except Exception as e: | |
| logger.error(f"Failed to save metadata: {str(e)}") | |
| raise | |
| async def get_file_metadata(self, unique_id: str) -> Optional[FileMetadata]: | |
| """Retrieve file metadata by unique_id""" | |
| try: | |
| doc = await self.files_collection.find_one({"unique_id": unique_id}) | |
| if doc: | |
| # Remove MongoDB _id field | |
| doc.pop("_id", None) | |
| return FileMetadata.from_dict(doc) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get metadata: {str(e)}") | |
| return None | |
| async def update_file_metadata( | |
| self, | |
| unique_id: str, | |
| updates: dict | |
| ) -> bool: | |
| """Update file metadata""" | |
| try: | |
| result = await self.files_collection.update_one( | |
| {"unique_id": unique_id}, | |
| {"$set": updates} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to update metadata: {str(e)}") | |
| return False | |
| async def delete_file_metadata(self, unique_id: str) -> bool: | |
| """Delete file metadata""" | |
| try: | |
| result = await self.files_collection.delete_one( | |
| {"unique_id": unique_id} | |
| ) | |
| logger.info(f"Deleted metadata: unique_id={unique_id}") | |
| return result.deleted_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to delete metadata: {str(e)}") | |
| return False | |
| async def list_files( | |
| self, | |
| limit: int = 100, | |
| skip: int = 0 | |
| ) -> List[FileMetadata]: | |
| """List all files with pagination""" | |
| try: | |
| cursor = self.files_collection.find().skip(skip).limit(limit) | |
| cursor = cursor.sort("uploaded_at", -1) | |
| files = [] | |
| async for doc in cursor: | |
| doc.pop("_id", None) | |
| files.append(FileMetadata.from_dict(doc)) | |
| return files | |
| except Exception as e: | |
| logger.error(f"Failed to list files: {str(e)}") | |
| return [] | |
| async def get_total_storage(self) -> int: | |
| """Get total storage used across all files""" | |
| try: | |
| pipeline = [ | |
| { | |
| "$group": { | |
| "_id": None, | |
| "total_size": {"$sum": "$total_size"} | |
| } | |
| } | |
| ] | |
| result = await self.files_collection.aggregate(pipeline).to_list(1) | |
| if result: | |
| return result[0]["total_size"] | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"Failed to get total storage: {str(e)}") | |
| return 0 | |
| async def cleanup_old_files(self, days: int = 30) -> int: | |
| """Delete files older than specified days""" | |
| try: | |
| from datetime import timedelta | |
| cutoff_date = datetime.utcnow() - timedelta(days=days) | |
| result = await self.files_collection.delete_many( | |
| {"uploaded_at": {"$lt": cutoff_date.isoformat()}} | |
| ) | |
| deleted = result.deleted_count | |
| logger.info(f"Cleaned up {deleted} old files (older than {days} days)") | |
| return deleted | |
| except Exception as e: | |
| logger.error(f"Failed to cleanup old files: {str(e)}") | |
| return 0 | |
| async def get_stats(self) -> dict: | |
| """Get database statistics""" | |
| try: | |
| total_files = await self.files_collection.count_documents({}) | |
| total_storage = await self.get_total_storage() | |
| # Get largest file | |
| largest = await self.files_collection.find_one( | |
| {}, | |
| sort=[("total_size", -1)] | |
| ) | |
| return { | |
| "total_files": total_files, | |
| "total_storage": total_storage, | |
| "total_storage_gb": f"{total_storage / (1024**3):.2f}", | |
| "largest_file": { | |
| "unique_id": largest.get("unique_id"), | |
| "filename": largest.get("filename"), | |
| "size": largest.get("total_size") | |
| } if largest else None | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get stats: {str(e)}") | |
| return {} | |