Spaces:
Configuration error
Configuration error
| import os | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List | |
| from pymongo import MongoClient, ReturnDocument | |
| from pymongo.errors import DuplicateKeyError, ConnectionFailure | |
| import bcrypt | |
| from dotenv import load_dotenv | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| class Database: | |
| def __init__(self): | |
| """Initialize database connection and ensure indexes""" | |
| try: | |
| mongo_uri = os.getenv("MONGO_URI") | |
| if not mongo_uri: | |
| raise ValueError("MONGO_URI environment variable is not set") | |
| self.client = MongoClient( | |
| mongo_uri, | |
| serverSelectionTimeoutMS=5000, # 5 second timeout | |
| connectTimeoutMS=30000, # 30 second connection timeout | |
| socketTimeoutMS=45000, # 45 second socket timeout | |
| connect=False # Lazy connection | |
| ) | |
| # Test the connection | |
| self.client.admin.command('ping') | |
| self.db = self.client[os.getenv("DB_NAME", "rag_system")] | |
| self.users = self.db["users"] | |
| self._create_indexes() | |
| logger.info("Successfully connected to MongoDB") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to MongoDB: {str(e)}") | |
| raise | |
| def _create_indexes(self): | |
| """Create necessary database indexes""" | |
| try: | |
| # Create unique index on username | |
| self.users.create_index("username", unique=True) | |
| logger.info("Created database indexes") | |
| except Exception as e: | |
| logger.error(f"Error creating indexes: {str(e)}") | |
| raise | |
| def add_user(self, username: str, password: str, role: str) -> bool: | |
| """Add a new user to the database""" | |
| if not username or not password or not role: | |
| logger.warning("Missing required fields for user creation") | |
| return False | |
| try: | |
| hashed = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) | |
| user_data = { | |
| "username": username, | |
| "password": hashed.decode('utf-8'), | |
| "role": role.lower(), | |
| "created_at": datetime.utcnow(), | |
| "last_login": None | |
| } | |
| result = self.users.insert_one(user_data) | |
| if result.inserted_id: | |
| logger.info(f"Created new user: {username}") | |
| return True | |
| return False | |
| except DuplicateKeyError: | |
| logger.warning(f"Username already exists: {username}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error adding user {username}: {str(e)}") | |
| return False | |
| def verify_user(self, username: str, password: str) -> Optional[Dict[str, Any]]: | |
| """Verify user credentials""" | |
| try: | |
| user = self.users.find_one({"username": username}) | |
| if not user: | |
| logger.warning(f"Login attempt for non-existent user: {username}") | |
| return None | |
| if bcrypt.checkpw(password.encode('utf-8'), user["password"].encode('utf-8')): | |
| # Update last login time | |
| self.users.update_one( | |
| {"_id": user["_id"]}, | |
| {"$set": {"last_login": datetime.utcnow()}} | |
| ) | |
| logger.info(f"Successful login for user: {username}") | |
| return { | |
| "username": user["username"], | |
| "role": user["role"], | |
| "last_login": user.get("last_login") | |
| } | |
| logger.warning(f"Failed login attempt for user: {username}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error verifying user {username}: {str(e)}") | |
| return None | |
| def get_user(self, username: str) -> Optional[Dict[str, Any]]: | |
| """Get user by username (without sensitive data)""" | |
| try: | |
| user = self.users.find_one( | |
| {"username": username}, | |
| {"password": 0} # Exclude password from results | |
| ) | |
| return user | |
| except Exception as e: | |
| logger.error(f"Error fetching user {username}: {str(e)}") | |
| return None | |
| # Initialize database connection | |
| db = Database() | |
| def initialize_users(): | |
| """ | |
| Initialize default users if they don't exist. | |
| Returns tuple of (success_count, total_users, errors) | |
| """ | |
| from datetime import datetime | |
| default_users = [ | |
| {"username": "Tony", "password": "password123", "role": "engineering"}, | |
| {"username": "Bruce", "password": "securepass", "role": "marketing"}, | |
| {"username": "Sam", "password": "financepass", "role": "finance"}, | |
| {"username": "Peter", "password": "pete123", "role": "engineering"}, | |
| {"username": "Sid", "password": "sidpass123", "role": "marketing"}, | |
| {"username": "Natasha", "password": "hrpass123", "role": "hr"} | |
| ] | |
| success_count = 0 | |
| errors = [] | |
| for user in default_users: | |
| try: | |
| if db.add_user(user["username"], user["password"], user["role"]): | |
| success_count += 1 | |
| logger.info(f"Initialized user: {user['username']}") | |
| else: | |
| errors.append(f"Failed to add user: {user['username']}") | |
| except Exception as e: | |
| error_msg = f"Error initializing user {user['username']}: {str(e)}" | |
| logger.error(error_msg) | |
| errors.append(error_msg) | |
| logger.info(f"User initialization complete. Success: {success_count}/{len(default_users)}") | |
| if errors: | |
| logger.warning(f"Encountered {len(errors)} errors during user initialization") | |
| return success_count, len(default_users), errors | |