TelegramSteamBot / database.py
mrpoddaa's picture
Upload 13 files
18b952c verified
"""
Database Module - MongoDB integration for file metadata storage
Uses Motor async driver for high-performance operations
"""
import logging
import os
from datetime import datetime
from typing import Optional, List, Dict
from dataclasses import dataclass, field, asdict
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pymongo.errors import DuplicateKeyError
logger = logging.getLogger(__name__)
@dataclass
class FileMetadata:
"""File metadata structure"""
unique_id: str
filename: str
total_size: int
parts: List[Dict]
part_count: int
uploaded_at: datetime = field(default_factory=datetime.utcnow)
def to_dict(self) -> dict:
"""Convert to dictionary for MongoDB"""
data = asdict(self)
data["uploaded_at"] = self.uploaded_at.isoformat()
return data
@classmethod
def from_dict(cls, data: dict) -> "FileMetadata":
"""Create from MongoDB document"""
data["uploaded_at"] = datetime.fromisoformat(data["uploaded_at"])
return cls(**data)
class Database:
"""MongoDB database manager"""
def __init__(self):
self.client: Optional[AsyncIOMotorClient] = None
self.db: Optional[AsyncIOMotorDatabase] = None
self.files_collection = None
self.mongo_uri = os.getenv("MONGO_URI")
self.database_name = os.getenv("MONGO_DATABASE", "telegram_streamer")
async def connect(self):
"""Connect to MongoDB"""
if not self.mongo_uri:
raise ValueError("MONGO_URI environment variable is required")
try:
logger.info("Connecting to MongoDB...")
self.client = AsyncIOMotorClient(
self.mongo_uri,
serverSelectionTimeoutMS=5000
)
# Test connection
await self.client.admin.command('ping')
self.db = self.client[self.database_name]
self.files_collection = self.db["files"]
# Create indexes
await self._create_indexes()
logger.info(f"Connected to MongoDB: {self.database_name}")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {str(e)}")
raise
async def _create_indexes(self):
"""Create database indexes for performance"""
try:
# Unique index on unique_id
await self.files_collection.create_index(
"unique_id",
unique=True,
name="unique_id_index"
)
# Index on uploaded_at for cleanup queries
await self.files_collection.create_index(
"uploaded_at",
name="uploaded_at_index"
)
logger.info("Database indexes created")
except Exception as e:
logger.warning(f"Failed to create indexes: {str(e)}")
async def disconnect(self):
"""Disconnect from MongoDB"""
if self.client:
self.client.close()
logger.info("Disconnected from MongoDB")
def is_connected(self) -> bool:
"""Check if database is connected"""
return self.client is not None and self.db is not None
async def save_file_metadata(self, metadata: FileMetadata) -> bool:
"""Save file metadata to database"""
try:
await self.files_collection.insert_one(metadata.to_dict())
logger.info(f"Saved metadata: unique_id={metadata.unique_id}")
return True
except DuplicateKeyError:
logger.error(f"Duplicate unique_id: {metadata.unique_id}")
raise ValueError("File with this unique_id already exists")
except Exception as e:
logger.error(f"Failed to save metadata: {str(e)}")
raise
async def get_file_metadata(self, unique_id: str) -> Optional[FileMetadata]:
"""Retrieve file metadata by unique_id"""
try:
doc = await self.files_collection.find_one({"unique_id": unique_id})
if doc:
# Remove MongoDB _id field
doc.pop("_id", None)
return FileMetadata.from_dict(doc)
return None
except Exception as e:
logger.error(f"Failed to get metadata: {str(e)}")
return None
async def update_file_metadata(
self,
unique_id: str,
updates: dict
) -> bool:
"""Update file metadata"""
try:
result = await self.files_collection.update_one(
{"unique_id": unique_id},
{"$set": updates}
)
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to update metadata: {str(e)}")
return False
async def delete_file_metadata(self, unique_id: str) -> bool:
"""Delete file metadata"""
try:
result = await self.files_collection.delete_one(
{"unique_id": unique_id}
)
logger.info(f"Deleted metadata: unique_id={unique_id}")
return result.deleted_count > 0
except Exception as e:
logger.error(f"Failed to delete metadata: {str(e)}")
return False
async def list_files(
self,
limit: int = 100,
skip: int = 0
) -> List[FileMetadata]:
"""List all files with pagination"""
try:
cursor = self.files_collection.find().skip(skip).limit(limit)
cursor = cursor.sort("uploaded_at", -1)
files = []
async for doc in cursor:
doc.pop("_id", None)
files.append(FileMetadata.from_dict(doc))
return files
except Exception as e:
logger.error(f"Failed to list files: {str(e)}")
return []
async def get_total_storage(self) -> int:
"""Get total storage used across all files"""
try:
pipeline = [
{
"$group": {
"_id": None,
"total_size": {"$sum": "$total_size"}
}
}
]
result = await self.files_collection.aggregate(pipeline).to_list(1)
if result:
return result[0]["total_size"]
return 0
except Exception as e:
logger.error(f"Failed to get total storage: {str(e)}")
return 0
async def cleanup_old_files(self, days: int = 30) -> int:
"""Delete files older than specified days"""
try:
from datetime import timedelta
cutoff_date = datetime.utcnow() - timedelta(days=days)
result = await self.files_collection.delete_many(
{"uploaded_at": {"$lt": cutoff_date.isoformat()}}
)
deleted = result.deleted_count
logger.info(f"Cleaned up {deleted} old files (older than {days} days)")
return deleted
except Exception as e:
logger.error(f"Failed to cleanup old files: {str(e)}")
return 0
async def get_stats(self) -> dict:
"""Get database statistics"""
try:
total_files = await self.files_collection.count_documents({})
total_storage = await self.get_total_storage()
# Get largest file
largest = await self.files_collection.find_one(
{},
sort=[("total_size", -1)]
)
return {
"total_files": total_files,
"total_storage": total_storage,
"total_storage_gb": f"{total_storage / (1024**3):.2f}",
"largest_file": {
"unique_id": largest.get("unique_id"),
"filename": largest.get("filename"),
"size": largest.get("total_size")
} if largest else None
}
except Exception as e:
logger.error(f"Failed to get stats: {str(e)}")
return {}