""" Database Module - MongoDB integration for file metadata storage Uses Motor async driver for high-performance operations """ import logging import os from datetime import datetime from typing import Optional, List, Dict from dataclasses import dataclass, field, asdict from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from pymongo.errors import DuplicateKeyError logger = logging.getLogger(__name__) @dataclass class FileMetadata: """File metadata structure""" unique_id: str filename: str total_size: int parts: List[Dict] part_count: int uploaded_at: datetime = field(default_factory=datetime.utcnow) def to_dict(self) -> dict: """Convert to dictionary for MongoDB""" data = asdict(self) data["uploaded_at"] = self.uploaded_at.isoformat() return data @classmethod def from_dict(cls, data: dict) -> "FileMetadata": """Create from MongoDB document""" data["uploaded_at"] = datetime.fromisoformat(data["uploaded_at"]) return cls(**data) class Database: """MongoDB database manager""" def __init__(self): self.client: Optional[AsyncIOMotorClient] = None self.db: Optional[AsyncIOMotorDatabase] = None self.files_collection = None self.mongo_uri = os.getenv("MONGO_URI") self.database_name = os.getenv("MONGO_DATABASE", "telegram_streamer") async def connect(self): """Connect to MongoDB""" if not self.mongo_uri: raise ValueError("MONGO_URI environment variable is required") try: logger.info("Connecting to MongoDB...") self.client = AsyncIOMotorClient( self.mongo_uri, serverSelectionTimeoutMS=5000 ) # Test connection await self.client.admin.command('ping') self.db = self.client[self.database_name] self.files_collection = self.db["files"] # Create indexes await self._create_indexes() logger.info(f"Connected to MongoDB: {self.database_name}") except Exception as e: logger.error(f"Failed to connect to MongoDB: {str(e)}") raise async def _create_indexes(self): """Create database indexes for performance""" try: # Unique index on unique_id await self.files_collection.create_index( "unique_id", unique=True, name="unique_id_index" ) # Index on uploaded_at for cleanup queries await self.files_collection.create_index( "uploaded_at", name="uploaded_at_index" ) logger.info("Database indexes created") except Exception as e: logger.warning(f"Failed to create indexes: {str(e)}") async def disconnect(self): """Disconnect from MongoDB""" if self.client: self.client.close() logger.info("Disconnected from MongoDB") def is_connected(self) -> bool: """Check if database is connected""" return self.client is not None and self.db is not None async def save_file_metadata(self, metadata: FileMetadata) -> bool: """Save file metadata to database""" try: await self.files_collection.insert_one(metadata.to_dict()) logger.info(f"Saved metadata: unique_id={metadata.unique_id}") return True except DuplicateKeyError: logger.error(f"Duplicate unique_id: {metadata.unique_id}") raise ValueError("File with this unique_id already exists") except Exception as e: logger.error(f"Failed to save metadata: {str(e)}") raise async def get_file_metadata(self, unique_id: str) -> Optional[FileMetadata]: """Retrieve file metadata by unique_id""" try: doc = await self.files_collection.find_one({"unique_id": unique_id}) if doc: # Remove MongoDB _id field doc.pop("_id", None) return FileMetadata.from_dict(doc) return None except Exception as e: logger.error(f"Failed to get metadata: {str(e)}") return None async def update_file_metadata( self, unique_id: str, updates: dict ) -> bool: """Update file metadata""" try: result = await self.files_collection.update_one( {"unique_id": unique_id}, {"$set": updates} ) return result.modified_count > 0 except Exception as e: logger.error(f"Failed to update metadata: {str(e)}") return False async def delete_file_metadata(self, unique_id: str) -> bool: """Delete file metadata""" try: result = await self.files_collection.delete_one( {"unique_id": unique_id} ) logger.info(f"Deleted metadata: unique_id={unique_id}") return result.deleted_count > 0 except Exception as e: logger.error(f"Failed to delete metadata: {str(e)}") return False async def list_files( self, limit: int = 100, skip: int = 0 ) -> List[FileMetadata]: """List all files with pagination""" try: cursor = self.files_collection.find().skip(skip).limit(limit) cursor = cursor.sort("uploaded_at", -1) files = [] async for doc in cursor: doc.pop("_id", None) files.append(FileMetadata.from_dict(doc)) return files except Exception as e: logger.error(f"Failed to list files: {str(e)}") return [] async def get_total_storage(self) -> int: """Get total storage used across all files""" try: pipeline = [ { "$group": { "_id": None, "total_size": {"$sum": "$total_size"} } } ] result = await self.files_collection.aggregate(pipeline).to_list(1) if result: return result[0]["total_size"] return 0 except Exception as e: logger.error(f"Failed to get total storage: {str(e)}") return 0 async def cleanup_old_files(self, days: int = 30) -> int: """Delete files older than specified days""" try: from datetime import timedelta cutoff_date = datetime.utcnow() - timedelta(days=days) result = await self.files_collection.delete_many( {"uploaded_at": {"$lt": cutoff_date.isoformat()}} ) deleted = result.deleted_count logger.info(f"Cleaned up {deleted} old files (older than {days} days)") return deleted except Exception as e: logger.error(f"Failed to cleanup old files: {str(e)}") return 0 async def get_stats(self) -> dict: """Get database statistics""" try: total_files = await self.files_collection.count_documents({}) total_storage = await self.get_total_storage() # Get largest file largest = await self.files_collection.find_one( {}, sort=[("total_size", -1)] ) return { "total_files": total_files, "total_storage": total_storage, "total_storage_gb": f"{total_storage / (1024**3):.2f}", "largest_file": { "unique_id": largest.get("unique_id"), "filename": largest.get("filename"), "size": largest.get("total_size") } if largest else None } except Exception as e: logger.error(f"Failed to get stats: {str(e)}") return {}