echo-tutor / auth_module /database.py
Nanny7's picture
feat: replace auth module with updated Nova bridge and wire auth UI
dbfeb5e
"""
πŸ—„οΈ Auth Module β€” Database Layer (SQLite)
═══════════════════════════════════════════
Standalone users/sessions/OTP database.
"""
import sqlite3
import logging
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from .security import get_password_hash, verify_password_and_update
from .config import auth_settings
DB_PATH = auth_settings.DATABASE_PATH
logger = logging.getLogger("auth_module.database")
def _normalize_email(email: str) -> str:
return (email or "").strip().lower()
class AuthDatabase:
def __init__(self, db_path: str = None):
self.db_path = db_path or DB_PATH
self._init_db()
def _get_conn(self):
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def _init_db(self):
"""Initialize SQLite database with users, sessions, OTP tables."""
with self._get_conn() as conn:
c = conn.cursor()
# Users Table
c.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
full_name TEXT,
password_hash TEXT NOT NULL,
phone TEXT,
role TEXT DEFAULT 'user',
balance INTEGER DEFAULT 100,
daily_used INTEGER DEFAULT 0,
daily_reset_date TEXT,
subscription_tier TEXT DEFAULT 'free',
subscription_expires TEXT,
is_verified INTEGER DEFAULT 0,
telegram_chat_id TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
# Safe migrations for existing DBs
for col, defn in [
("role", "TEXT DEFAULT 'user'"),
("daily_used", "INTEGER DEFAULT 0"),
("daily_reset_date", "TEXT"),
("subscription_tier", "TEXT DEFAULT 'free'"),
("subscription_expires", "TEXT"),
("updated_at", "TIMESTAMP"),
("is_verified", "INTEGER DEFAULT 0"),
("telegram_chat_id", "TEXT"),
("phone", "TEXT"),
]:
try:
c.execute(f"ALTER TABLE users ADD COLUMN {col} {defn}")
except sqlite3.OperationalError:
pass
# Sessions Table
c.execute(
"""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
token TEXT UNIQUE,
expires_at TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
"""
)
# OTP Codes Table
c.execute(
"""
CREATE TABLE IF NOT EXISTS otp_codes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
code TEXT,
purpose TEXT,
expires_at TIMESTAMP,
used INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
# Indexes
for idx in [
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)",
"CREATE INDEX IF NOT EXISTS idx_sessions_token ON sessions(token)",
"CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id)",
"CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at)",
"CREATE INDEX IF NOT EXISTS idx_otp_user_purpose ON otp_codes(user_id, purpose, expires_at)",
]:
c.execute(idx)
conn.commit()
logger.info("βœ… Auth database initialized")
# ═══════════════════════════════════════════════════════════════
# πŸ‘€ USER CRUD
# ═══════════════════════════════════════════════════════════════
async def create_user(
self,
email: str,
password: str,
full_name: str,
phone: str = None,
role: str = "user",
) -> Optional[Dict[str, Any]]:
try:
email = _normalize_email(email)
password_hash = get_password_hash(password)
today = datetime.now().strftime("%Y-%m-%d")
with self._get_conn() as conn:
c = conn.cursor()
c.execute(
"""INSERT INTO users (email, password_hash, full_name, phone, role, balance, daily_used, daily_reset_date)
VALUES (?, ?, ?, ?, ?, 100, 0, ?)""",
(email, password_hash, full_name, phone, role, today),
)
conn.commit()
return {
"id": c.lastrowid,
"email": email,
"full_name": full_name,
"role": role,
"balance": 100,
}
except sqlite3.IntegrityError:
return None
except Exception as e:
logger.exception(f"create_user error: {e}")
raise
async def authenticate_user(
self, email: str, password: str
) -> Optional[Dict[str, Any]]:
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(
"SELECT * FROM users WHERE lower(email) = lower(?)",
(_normalize_email(email),),
)
user = c.fetchone()
if not user:
return None
ok, new_hash = verify_password_and_update(password, user["password_hash"])
if not ok:
return None
if new_hash:
try:
c.execute(
"UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(new_hash, user["id"]),
)
conn.commit()
except Exception:
pass
return dict(user)
return None
async def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(
"SELECT id, email, full_name, role, balance, daily_used, subscription_tier "
"FROM users WHERE lower(email) = lower(?)",
(_normalize_email(email),),
)
user = c.fetchone()
return dict(user) if user else None
async def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(
"SELECT id, email, full_name, role, balance, daily_used, subscription_tier "
"FROM users WHERE id = ?",
(user_id,),
)
user = c.fetchone()
return dict(user) if user else None
async def get_user_by_email_unverified(
self, email: str
) -> Optional[Dict[str, Any]]:
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(
"SELECT id, email, full_name, is_verified FROM users WHERE lower(email) = lower(?)",
(_normalize_email(email),),
)
user = c.fetchone()
return dict(user) if user else None
async def get_user_by_telegram_or_phone(
self, telegram_chat_id: str, phone: str
) -> Optional[Dict[str, Any]]:
"""Find user by Telegram chat ID or phone number."""
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(
"SELECT id, email, full_name, is_verified FROM users WHERE telegram_chat_id = ?",
(telegram_chat_id,),
)
user = c.fetchone()
if user:
return dict(user)
# Try phone
phone_clean = phone.lstrip("+")
c.execute(
"SELECT id, email, full_name, is_verified FROM users "
"WHERE phone = ? OR phone = ? OR phone = ?",
(phone, "+" + phone_clean, phone_clean),
)
user = c.fetchone()
return dict(user) if user else None
async def delete_user_account(self, user_id: int) -> bool:
with self._get_conn() as conn:
c = conn.cursor()
uid = str(user_id)
c.execute("DELETE FROM sessions WHERE user_id = ?", (user_id,))
c.execute("DELETE FROM otp_codes WHERE user_id = ?", (uid,))
c.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
return c.rowcount > 0
# ═══════════════════════════════════════════════════════════════
# πŸ”‘ SESSION MANAGEMENT
# ═══════════════════════════════════════════════════════════════
async def create_session(self, user_id: int, token: str, expires_at: str):
with self._get_conn() as conn:
c = conn.cursor()
c.execute(
"INSERT INTO sessions (user_id, token, expires_at) VALUES (?, ?, ?)",
(user_id, token, expires_at),
)
conn.commit()
async def get_session(self, token: str) -> Optional[Dict[str, Any]]:
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("SELECT * FROM sessions WHERE token = ?", (token,))
row = c.fetchone()
return dict(row) if row else None
async def delete_session(self, token: str):
with self._get_conn() as conn:
c = conn.cursor()
c.execute("DELETE FROM sessions WHERE token = ?", (token,))
conn.commit()
async def cleanup_expired_sessions(self):
with self._get_conn() as conn:
c = conn.cursor()
c.execute(
"DELETE FROM sessions WHERE expires_at < ?",
(datetime.now().isoformat(),),
)
conn.commit()
# ═══════════════════════════════════════════════════════════════
# πŸ“² OTP / VERIFICATION
# ═══════════════════════════════════════════════════════════════
async def set_user_verified(
self, user_id: int, telegram_chat_id: str = None
) -> bool:
with self._get_conn() as conn:
c = conn.cursor()
if telegram_chat_id:
c.execute(
"UPDATE users SET is_verified = 1, telegram_chat_id = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(telegram_chat_id, user_id),
)
else:
c.execute(
"UPDATE users SET is_verified = 1, updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(user_id,),
)
conn.commit()
return c.rowcount > 0
async def store_otp(
self,
user_id: int,
code: str,
purpose: str = "telegram_verify",
minutes: int = 10,
):
expires_at = (datetime.now() + timedelta(minutes=minutes)).isoformat()
with self._get_conn() as conn:
c = conn.cursor()
c.execute(
"UPDATE otp_codes SET used = 1 WHERE user_id = ? AND purpose = ? AND used = 0",
(str(user_id), purpose),
)
c.execute(
"INSERT INTO otp_codes (user_id, code, purpose, expires_at) VALUES (?, ?, ?, ?)",
(str(user_id), code, purpose, expires_at),
)
conn.commit()
async def verify_otp(
self, user_id: int, code: str, purpose: str = "telegram_verify"
) -> bool:
now = datetime.now().isoformat()
with self._get_conn() as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute(
"SELECT id FROM otp_codes WHERE user_id = ? AND code = ? AND purpose = ? "
"AND used = 0 AND expires_at > ? ORDER BY id DESC LIMIT 1",
(str(user_id), code, purpose, now),
)
row = c.fetchone()
if not row:
return False
c.execute("UPDATE otp_codes SET used = 1 WHERE id = ?", (row["id"],))
conn.commit()
return True
async def cleanup_expired_otps(self):
with self._get_conn() as conn:
c = conn.cursor()
c.execute(
"DELETE FROM otp_codes WHERE used = 1 OR expires_at < ?",
(datetime.now().isoformat(),),
)
deleted = c.rowcount
conn.commit()
return deleted
# Singleton
auth_db = AuthDatabase()