| import os |
| import sys |
| import logging |
| import traceback |
| import json |
| import time |
| from datetime import datetime |
| import platform |
|
|
| |
| try: |
| import psutil |
| PSUTIL_AVAILABLE = True |
| except ImportError: |
| PSUTIL_AVAILABLE = False |
| logging.warning("psutil module not available. System monitoring features will be limited.") |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| class DebugInfo: |
| """Class containing debug information""" |
| |
| @staticmethod |
| def get_system_info(): |
| """Get system information""" |
| try: |
| info = { |
| "os": platform.system(), |
| "os_version": platform.version(), |
| "python_version": platform.python_version(), |
| "cpu_count": os.cpu_count(), |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| |
| if PSUTIL_AVAILABLE: |
| info.update({ |
| "total_memory": round(psutil.virtual_memory().total / (1024 * 1024 * 1024), 2), |
| "available_memory": round(psutil.virtual_memory().available / (1024 * 1024 * 1024), 2), |
| "cpu_usage": psutil.cpu_percent(interval=0.1), |
| "memory_usage": psutil.virtual_memory().percent, |
| "disk_usage": psutil.disk_usage('/').percent, |
| }) |
| else: |
| info.update({ |
| "total_memory": "psutil not available", |
| "available_memory": "psutil not available", |
| "cpu_usage": "psutil not available", |
| "memory_usage": "psutil not available", |
| "disk_usage": "psutil not available", |
| }) |
| |
| return info |
| except Exception as e: |
| logger.error(f"Error getting system info: {e}") |
| return {"error": str(e)} |
| |
| @staticmethod |
| def get_env_info(): |
| """Get environment variable information (masking sensitive information)""" |
| try: |
| |
| sensitive_vars = [ |
| "API_KEY", "SECRET", "PASSWORD", "TOKEN", "AUTH", "MONGODB_URL", |
| "AIVEN_DB_URL", "PINECONE_API_KEY", "GOOGLE_API_KEY" |
| ] |
| |
| env_vars = {} |
| for key, value in os.environ.items(): |
| |
| is_sensitive = any(s in key.upper() for s in sensitive_vars) |
| |
| if is_sensitive and value: |
| |
| masked_value = value[:4] + "****" if len(value) > 4 else "****" |
| env_vars[key] = masked_value |
| else: |
| env_vars[key] = value |
| |
| return env_vars |
| except Exception as e: |
| logger.error(f"Error getting environment info: {e}") |
| return {"error": str(e)} |
| |
| @staticmethod |
| def get_database_status(): |
| """Get database connection status""" |
| try: |
| from app.database.postgresql import check_db_connection as check_postgresql |
| from app.database.mongodb import check_db_connection as check_mongodb |
| from app.database.pinecone import check_db_connection as check_pinecone |
| |
| return { |
| "postgresql": check_postgresql(), |
| "mongodb": check_mongodb(), |
| "pinecone": check_pinecone(), |
| "timestamp": datetime.now().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error getting database status: {e}") |
| return {"error": str(e)} |
|
|
| class PerformanceMonitor: |
| """Performance monitoring class""" |
| |
| def __init__(self): |
| self.start_time = time.time() |
| self.checkpoints = [] |
| |
| def checkpoint(self, name): |
| """Mark a checkpoint and record the time""" |
| current_time = time.time() |
| elapsed = current_time - self.start_time |
| self.checkpoints.append({ |
| "name": name, |
| "time": current_time, |
| "elapsed": elapsed |
| }) |
| logger.debug(f"Checkpoint '{name}' at {elapsed:.4f}s") |
| return elapsed |
| |
| def get_report(self): |
| """Generate performance report""" |
| if not self.checkpoints: |
| return {"error": "No checkpoints recorded"} |
| |
| total_time = time.time() - self.start_time |
| |
| |
| intervals = [] |
| prev_time = self.start_time |
| |
| for checkpoint in self.checkpoints: |
| interval = checkpoint["time"] - prev_time |
| intervals.append({ |
| "name": checkpoint["name"], |
| "interval": interval, |
| "elapsed": checkpoint["elapsed"] |
| }) |
| prev_time = checkpoint["time"] |
| |
| return { |
| "total_time": total_time, |
| "checkpoint_count": len(self.checkpoints), |
| "intervals": intervals |
| } |
|
|
| class ErrorTracker: |
| """Class to track and record errors""" |
| |
| def __init__(self, max_errors=100): |
| self.errors = [] |
| self.max_errors = max_errors |
| |
| def track_error(self, error, context=None): |
| """Record error information""" |
| error_info = { |
| "error_type": type(error).__name__, |
| "error_message": str(error), |
| "traceback": traceback.format_exc(), |
| "timestamp": datetime.now().isoformat(), |
| "context": context or {} |
| } |
| |
| |
| self.errors.append(error_info) |
| |
| |
| if len(self.errors) > self.max_errors: |
| self.errors.pop(0) |
| |
| return error_info |
| |
| def get_errors(self, limit=None): |
| """Get list of recorded errors""" |
| if limit is None or limit >= len(self.errors): |
| return self.errors |
| return self.errors[-limit:] |
|
|
| |
| error_tracker = ErrorTracker() |
| performance_monitor = PerformanceMonitor() |
|
|
| def debug_view(request=None): |
| """Create a full debug report""" |
| debug_data = { |
| "system_info": DebugInfo.get_system_info(), |
| "database_status": DebugInfo.get_database_status(), |
| "performance": performance_monitor.get_report(), |
| "recent_errors": error_tracker.get_errors(limit=10), |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| |
| if request: |
| debug_data["request"] = { |
| "method": request.method, |
| "url": str(request.url), |
| "headers": dict(request.headers), |
| "client": { |
| "host": request.client.host if request.client else "unknown", |
| "port": request.client.port if request.client else "unknown" |
| } |
| } |
| |
| return debug_data |