# main.py - Updated Version with All Fixes
from fastapi import FastAPI, Depends, HTTPException, status, UploadFile, File, Form, Header, BackgroundTasks
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime, timedelta, timezone
import jwt
import bcrypt
import sqlite3
import os
import uuid
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import sys
from dotenv import load_dotenv
import asyncio
from concurrent.futures import ThreadPoolExecutor
import secrets
import httpx
import requests
# In-memory storage for OTPs (for MVP)
otp_storage = {}
# Setup paths
from process_pdf import process_new_pdf
# Thread pool for Qdrant operations
executor = ThreadPoolExecutor(max_workers=3)
sys.stdout.reconfigure(encoding='utf-8')
# Load environment variables
load_dotenv()
# =======================
# Configuration
# =======================
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
UPLOAD_DIR = "uploads"
LECTURES_DIR = "lectures"
DB_PATH = "university_chatbot.db"
# Email Configuration
BREVO_API_KEY = os.getenv("BREVO_API_KEY")
SENDER_EMAIL = "universityai.com@gmail.com"
app = FastAPI(title="University AI Chatbot API with Courses")
# OAuth2 Scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login", auto_error=False)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Serve only the chat image securely instead of the whole backend folder
@app.get("/static/ch.png")
async def get_chat_image():
return FileResponse("ch.png")
# =======================
# Database Setup with Auto-Migration
# =======================
def init_db():
# Ensure data directory exists to prevent crash
db_dir = os.path.dirname(DB_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
print("\n" + "="*60)
print("š Initializing Database...")
print("="*60 + "\n")
# Users table
c.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
role TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)''')
# Courses table
c.execute('''CREATE TABLE IF NOT EXISTS courses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
admin_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (admin_id) REFERENCES users(id)
)''')
# Conversations table
c.execute('''CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_deleted INTEGER DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
# Messages table - NOW WITH BOTH sender AND role
c.execute('''CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id INTEGER NOT NULL,
sender TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (conversation_id) REFERENCES conversations(id)
)''')
# Feedbacks table
c.execute('''CREATE TABLE IF NOT EXISTS feedbacks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
feedback_type TEXT NOT NULL,
comment TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
UNIQUE(message_id, user_id)
)''')
# Files table
c.execute('''CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
filename TEXT NOT NULL,
filepath TEXT NOT NULL,
file_type TEXT NOT NULL,
subject TEXT,
uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
# Lectures table
c.execute('''CREATE TABLE IF NOT EXISTS lectures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
admin_id INTEGER NOT NULL,
filename TEXT NOT NULL,
filepath TEXT NOT NULL,
subject TEXT,
uploaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processing_status TEXT DEFAULT 'pending',
total_chunks INTEGER DEFAULT 0,
total_characters INTEGER DEFAULT 0,
error_message TEXT,
FOREIGN KEY (admin_id) REFERENCES users(id)
)''')
# Password reset tokens table
c.execute('''CREATE TABLE IF NOT EXISTS password_reset_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TEXT NOT NULL,
used INTEGER DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
conn.commit()
# š§ AUTO-MIGRATION - Add missing columns
print("š Checking for missing columns...")
# š„ CRITICAL FIX: Add 'role' column to messages table
c.execute("PRAGMA table_info(messages)")
message_columns = {col[1] for col in c.fetchall()}
if 'role' not in message_columns:
try:
print(" š§ Adding 'role' column to messages table...")
c.execute("ALTER TABLE messages ADD COLUMN role TEXT DEFAULT 'user'")
# Migrate existing data: map sender -> role
c.execute("UPDATE messages SET role = CASE WHEN sender = 'ai' THEN 'assistant' ELSE 'user' END")
conn.commit()
print(" ā
Added column: role to messages (migrated existing data)")
except sqlite3.OperationalError as e:
if "duplicate column" not in str(e).lower():
print(f" ā ļø Error adding role: {e}")
else:
# Ensure existing role data is correct
try:
c.execute("UPDATE messages SET role = CASE WHEN sender = 'ai' THEN 'assistant' ELSE 'user' END WHERE role IS NULL OR role = ''")
conn.commit()
print(" ā
Role column exists and data verified")
except Exception as e:
print(f" ā ļø Error verifying role data: {e}")
# Check lectures table columns
c.execute("PRAGMA table_info(lectures)")
existing_columns = {col[1] for col in c.fetchall()}
required_columns = {
'processing_status': "TEXT DEFAULT 'pending'",
'total_chunks': "INTEGER DEFAULT 0",
'total_characters': "INTEGER DEFAULT 0",
'error_message': "TEXT"
}
for col_name, col_type in required_columns.items():
if col_name not in existing_columns:
try:
c.execute(f"ALTER TABLE lectures ADD COLUMN {col_name} {col_type}")
conn.commit()
print(f" ā
Added column: {col_name}")
except sqlite3.OperationalError as e:
if "duplicate column" not in str(e).lower():
print(f" ā ļø Error adding {col_name}: {e}")
# Check conversations table for is_deleted
c.execute("PRAGMA table_info(conversations)")
conv_columns = {col[1] for col in c.fetchall()}
if 'is_deleted' not in conv_columns:
try:
c.execute("ALTER TABLE conversations ADD COLUMN is_deleted INTEGER DEFAULT 0")
conn.commit()
print(" ā
Added column: is_deleted to conversations")
except Exception as e:
print(f" ā ļø Error adding is_deleted: {e}")
# Seed admin user
admin_email = "admin@university.edu"
admin_password = "Admin123"
hashed = bcrypt.hashpw(admin_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
try:
c.execute("SELECT id FROM users WHERE email = ?", (admin_email,))
existing_admin = c.fetchone()
if not existing_admin:
c.execute("INSERT INTO users (email, password_hash, role) VALUES (?, ?, ?)",
(admin_email, hashed, 'admin'))
conn.commit()
admin_id = c.lastrowid
print(f"\nā
Admin user created: {admin_email}")
print(f" Password: {admin_password}")
else:
admin_id = existing_admin[0]
print(f"\nā¹ļø Admin user already exists: {admin_email}")
# Seed 12 Fixed Courses
fixed_courses = [
("Android Development", "Basics of CS and programming"),
("Computer Networks", "Fundamental data structures and algorithms"),
("Information Security", "SQL, NoSQL, and database design"),
("Operating Systems", "Process management, memory, and concurrency"),
("Theory of Computation", "OSI model, TCP/IP, and network security"),
("Algorithms Design and Analysis", "SDLC, agile, and design patterns"),
("Computer Architecture", "Search, logic, and probabilistic reasoning"),
("Machine Learning", "Supervised and unsupervised learning"),
("Compiler Design", "HTML, CSS, JavaScript, and backend frameworks"),
("Computer Graphics", "Network security, cryptography, and ethical hacking"),
("Human Computer Interaction", "AWS, Azure, and cloud architecture")
]
print("\nš± Seeding fixed courses...")
for name, desc in fixed_courses:
c.execute("SELECT id FROM courses WHERE name = ?", (name,))
if not c.fetchone():
c.execute("INSERT INTO courses (name, description, admin_id) VALUES (?, ?, ?)",
(name, desc, admin_id))
print(f" ā
Added course: {name}")
conn.commit()
except Exception as e:
print(f"ā Error seeding data: {e}")
conn.close()
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(LECTURES_DIR, exist_ok=True)
print("\n" + "="*60)
print("ā
Database initialization completed!")
print("="*60 + "\n")
init_db()
# =======================
# Helper Functions
# =======================
def get_db():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return token
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
def get_current_user(
authorization: Optional[str] = Header(None),
token: Optional[str] = Depends(oauth2_scheme)
):
raw_token = None
if authorization and authorization.startswith("Bearer "):
raw_token = authorization.split(" ")[1]
elif token:
raw_token = token
if not raw_token:
raise HTTPException(status_code=401, detail="Authentication required")
payload = verify_token(raw_token)
user_id = payload.get("user_id")
role = payload.get("role")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
return {"user_id": user_id, "role": role}
def send_email(to_email: str, subject: str, html_content: str):
"""Sends an email using Brevo API (HTTP POST)."""
if not BREVO_API_KEY:
print("ā ļø Email configuration missing (BREVO_API_KEY). Skipping email.")
return
if not BREVO_API_KEY.startswith("xkeysib-"):
print("ā ļø Error: The provided BREVO_API_KEY looks like an SMTP password (starts with 'xsmtpsib').")
print("š Please generate a new API Key from Brevo Dashboard -> SMTP & API -> API Keys (it should start with 'xkeysib').")
return
endpoint = "https://api.brevo.com/v3/smtp/email"
headers = {
"api-key": BREVO_API_KEY,
"Content-Type": "application/json"
}
payload = {
"sender": {"name": "University AI", "email": SENDER_EMAIL},
"to": [{"email": to_email}],
"subject": subject,
"htmlContent": html_content
}
try:
response = requests.post(endpoint, json=payload, headers=headers, timeout=10)
if response.status_code == 201 or response.status_code == 200:
print(f"ā
Transaction Successful: Email queued for {to_email}")
else:
print(f"ā API Error {response.status_code}: {response.text}")
except Exception as e:
print(f"ā ļø Infrastructure Failure: {str(e)}")
# =======================
# Pydantic Models
# =======================
class UserRegister(BaseModel):
email: EmailStr
password: str
class ForgotPasswordRequest(BaseModel):
email: EmailStr
class ResetPasswordRequest(BaseModel):
email: EmailStr
token: str
new_password: str
class Token(BaseModel):
access_token: str
token_type: str
role: str
class ChatMessage(BaseModel):
conversation_id: Optional[int] = None
message: str
class FeedbackRequest(BaseModel):
message_id: int
feedback_type: str
comment: Optional[str] = None
class CourseCreate(BaseModel):
name: str
description: Optional[str] = None
# =======================
# Root Endpoint
# =======================
# Serve the Login page by default
@app.get("/")
async def root():
return FileResponse("index.html")
# Serve HTML Pages
@app.get("/index.html")
async def index_page():
return FileResponse("index.html")
@app.get("/login.html")
async def login_page():
return FileResponse("login.html")
@app.get("/chat.html")
async def chat_page():
return FileResponse("chat.html")
@app.get("/Admin-Dashboard.html")
async def admin_page():
return FileResponse("Admin-Dashboard.html")
@app.get("/register.html")
async def register_page():
return FileResponse("register.html")
@app.get("/forgot-password.html")
async def forgot_password_page():
return FileResponse("forgot-password.html")
@app.get("/reset-password.html")
async def reset_password_page():
return FileResponse("reset-password.html")
@app.get("/verify-email.html")
async def verify_email_page():
return FileResponse("verify-email.html")
# =======================
# Auth Endpoints
# =======================
@app.post("/auth/register")
async def register(background_tasks: BackgroundTasks, email: str = Form(...), password: str = Form(...)):
conn = get_db()
c = conn.cursor()
c.execute("SELECT id FROM users WHERE email = ?", (email,))
if c.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Email already registered")
hashed_pw = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
c.execute("INSERT INTO users (email, password_hash, role) VALUES (?, ?, ?)",
(email, hashed_pw, "student"))
conn.commit()
conn.close()
# Send Welcome Email
subject = "Welcome to University AI! š"
html_content = f"""
Welcome to University AI! š
Hi there,
Thank you for joining University AI. We are excited to have you on board!
You can now log in and start chatting with our AI assistant.
University AI Team
Your Learning Partner
"""
background_tasks.add_task(send_email, email, subject, html_content)
return {
"message": "Account created successfully. Redirecting to login...",
"email": email
}
@app.post("/auth/login", response_model=Token)
async def login(data: UserRegister):
conn = get_db()
c = conn.cursor()
c.execute("SELECT * FROM users WHERE email = ?", (data.email,))
user = c.fetchone()
conn.close()
if not user:
raise HTTPException(status_code=401, detail="Invalid email or password")
if not bcrypt.checkpw(data.password.encode(), user["password_hash"].encode()):
raise HTTPException(status_code=401, detail="Invalid email or password")
access = create_access_token({
"user_id": user["id"],
"role": user["role"]
})
print(f"ā
User logged in: {data.email} ({user['role']})")
return {
"access_token": access,
"token_type": "bearer",
"role": user["role"]
}
@app.post("/auth/forgot-password")
async def forgot_password(request: ForgotPasswordRequest):
"""Handles forgot password request"""
conn = get_db()
c = conn.cursor()
c.execute("SELECT id FROM users WHERE email = ?", (request.email,))
user = c.fetchone()
conn.close()
if user:
otp = f"{secrets.randbelow(1000000):06d}"
otp_storage[request.email] = {
"code": otp,
"timestamp": datetime.now(timezone.utc)
}
subject = "Reset Your Password - University AI š"
html_content = f"""
Password Reset Request
Hi,
We received a request to reset your password. Use this code:
{otp}
Expires in 5 minutes
If you didn't request this, ignore this email.
University AI Security Team
"""
send_email(request.email, subject, html_content)
print(f"š Password reset code sent to: {request.email}")
return {"message": "If account exists, password reset code has been sent."}
@app.post("/auth/reset-password")
async def reset_password(request: ResetPasswordRequest):
"""Resets user password"""
conn = get_db()
c = conn.cursor()
# Verify OTP
stored_otp = otp_storage.get(request.email)
if not stored_otp or stored_otp["code"] != request.token:
conn.close()
raise HTTPException(status_code=400, detail="Invalid or incorrect code.")
# Check TTL
if datetime.now(timezone.utc) - stored_otp["timestamp"] > timedelta(minutes=5):
del otp_storage[request.email]
conn.close()
raise HTTPException(status_code=400, detail="Code has expired.")
# Burn OTP
del otp_storage[request.email]
c.execute("SELECT id FROM users WHERE email = ?", (request.email,))
user = c.fetchone()
if not user:
conn.close()
raise HTTPException(status_code=404, detail="User not found.")
new_hashed_pw = bcrypt.hashpw(request.new_password.encode(), bcrypt.gensalt()).decode()
c.execute("UPDATE users SET password_hash = ? WHERE id = ?", (new_hashed_pw, user["id"]))
conn.commit()
conn.close()
print(f"š Password reset successful for: {request.email}")
return {"message": "Password reset successful. You can now login with your new password."}
# =======================
# Student Endpoints - FIXED
# =======================
@app.get("/student/conversations")
async def get_conversations(current_user: dict = Depends(get_current_user)):
"""Get all conversations with first message for title generation"""
if current_user['role'] != 'student':
raise HTTPException(status_code=403, detail="Access denied")
conn = get_db()
c = conn.cursor()
try:
c.execute("""
SELECT
c.id,
c.title,
c.created_at,
(SELECT content
FROM messages m
WHERE m.conversation_id = c.id
AND (m.role = 'user' OR m.sender = 'user')
ORDER BY m.created_at ASC
LIMIT 1) as first_message
FROM conversations c
WHERE c.user_id = ? AND c.is_deleted = 0
ORDER BY c.created_at DESC
""", (current_user['user_id'],))
conversations = []
for row in c.fetchall():
conv_dict = dict(row)
conversations.append(conv_dict)
conn.close()
return {"conversations": conversations}
except Exception as e:
conn.close()
print(f"ā Error loading conversations: {e}")
raise HTTPException(status_code=500, detail=f"Error loading conversations: {str(e)}")
@app.get("/student/conversation/{conversation_id}")
async def get_conversation(conversation_id: int, current_user: dict = Depends(get_current_user)):
"""Get conversation with all messages - handles both old and new format"""
if current_user['role'] != 'student':
raise HTTPException(status_code=403, detail="Access denied")
conn = get_db()
c = conn.cursor()
try:
# Verify conversation belongs to user
c.execute("SELECT * FROM conversations WHERE id = ? AND user_id = ? AND is_deleted = 0",
(conversation_id, current_user['user_id']))
conversation = c.fetchone()
if not conversation:
conn.close()
raise HTTPException(status_code=404, detail="Conversation not found")
# Get all messages with both sender and role
c.execute("""
SELECT id, conversation_id, sender, role, content, created_at
FROM messages
WHERE conversation_id = ?
ORDER BY created_at ASC
""", (conversation_id,))
messages = []
for row in c.fetchall():
msg = dict(row)
# Ensure role is set correctly (backward compatibility)
if not msg.get('role') or msg['role'] == '':
if msg['sender'] == 'ai':
msg['role'] = 'assistant'
else:
msg['role'] = 'user'
messages.append(msg)
conn.close()
print(f"ā
Loaded conversation {conversation_id} with {len(messages)} messages")
return {
"conversation": dict(conversation),
"messages": messages
}
except HTTPException:
raise
except Exception as e:
conn.close()
print(f"ā Error loading conversation: {e}")
raise HTTPException(status_code=500, detail=f"Error loading conversation: {str(e)}")
@app.delete("/student/conversations/{conversation_id}")
async def delete_conversation(
conversation_id: int,
current_user: dict = Depends(get_current_user)
):
"""Delete a conversation and its messages"""
if current_user['role'] != 'student':
raise HTTPException(status_code=403, detail="Access denied")
conn = get_db()
c = conn.cursor()
try:
# Verify ownership
c.execute("SELECT id FROM conversations WHERE id = ? AND user_id = ?",
(conversation_id, current_user['user_id']))
conversation = c.fetchone()
if not conversation:
conn.close()
raise HTTPException(status_code=404, detail="Conversation not found")
# Soft delete conversation (Hide from user, keep for admin/feedback)
c.execute("UPDATE conversations SET is_deleted = 1 WHERE id = ?", (conversation_id,))
conn.commit()
conn.close()
print(f"šļø Deleted conversation {conversation_id}")
return {
"success": True,
"message": "Conversation deleted successfully"
}
except HTTPException:
raise
except Exception as e:
conn.close()
print(f"ā Error deleting conversation: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/student/chat")
async def chat(data: ChatMessage, current_user: dict = Depends(get_current_user)):
"""Send message and get AI response - NOW SAVES BOTH sender AND role"""
if current_user['role'] != 'student':
raise HTTPException(status_code=403, detail="Access denied")
conn = get_db()
c = conn.cursor()
try:
# Create new conversation if needed
if data.conversation_id is None:
# Generate title from first 3 words
title_words = data.message.split()[:3]
title = " ".join(title_words)
if len(data.message.split()) > 3:
title += "..."
c.execute("INSERT INTO conversations (user_id, title) VALUES (?, ?)",
(current_user['user_id'], title))
conversation_id = c.lastrowid
print(f"ā
Created new conversation: {conversation_id} - '{title}'")
else:
conversation_id = data.conversation_id
# š„ CRITICAL: Save user message with BOTH sender AND role
c.execute(
"INSERT INTO messages (conversation_id, sender, role, content) VALUES (?, ?, ?, ?)",
(conversation_id, 'user', 'user', data.message)
)
user_message_id = c.lastrowid
conn.commit()
print(f"š¬ User message saved (ID: {user_message_id})")
# 1. Pre-create AI message with empty content to get an ID
c.execute(
"INSERT INTO messages (conversation_id, sender, role, content) VALUES (?, ?, ?, ?)",
(conversation_id, 'ai', 'assistant', '')
)
ai_message_id = c.lastrowid
conn.commit()
conn.close() # Close main connection, we will open a new one for update
# 2. Define Generator for Streaming
async def response_generator():
full_response = ""
RAG_URL = "http://127.0.0.1:8001/ask_stream"
try:
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream("POST", RAG_URL, json={"question": data.message, "conversation_id": conversation_id}) as r:
async for chunk in r.aiter_text():
full_response += chunk
yield chunk
except Exception as e:
error_msg = f"Error: {str(e)}"
full_response += error_msg
yield error_msg
# 3. Update DB with full response after stream ends
try:
# Must create new connection in async generator
update_conn = sqlite3.connect(DB_PATH)
update_c = update_conn.cursor()
update_c.execute("UPDATE messages SET content = ? WHERE id = ?", (full_response, ai_message_id))
update_conn.commit()
update_conn.close()
print(f"š¤ AI message updated (ID: {ai_message_id})")
except Exception as e:
print(f"ā Error updating DB: {e}")
# 4. Return Streaming Response with IDs in headers
return StreamingResponse(
response_generator(),
media_type="text/plain",
headers={
"X-Conversation-Id": str(conversation_id),
"X-Message-Id": str(ai_message_id)
}
)
except Exception as e:
conn.rollback()
print(f"ā Error in chat endpoint: {e}")
raise HTTPException(status_code=500, detail=f"Chat error: {str(e)}")
# =======================
# Feedback Endpoints
# =======================
@app.post("/feedback")
async def submit_feedback(
feedback: FeedbackRequest,
current_user: dict = Depends(get_current_user)
):
"""Submit or update feedback for a message"""
conn = get_db()
c = conn.cursor()
try:
# Check if feedback exists
c.execute(
"SELECT id FROM feedbacks WHERE message_id = ? AND user_id = ?",
(feedback.message_id, current_user['user_id'])
)
existing = c.fetchone()
if existing:
# Update existing
c.execute(
"""UPDATE feedbacks
SET feedback_type = ?, comment = ?, created_at = CURRENT_TIMESTAMP
WHERE id = ?""",
(feedback.feedback_type, feedback.comment, existing['id'])
)
print(f"ā
Updated feedback for message {feedback.message_id}")
else:
# Create new
c.execute(
"""INSERT INTO feedbacks (message_id, user_id, feedback_type, comment)
VALUES (?, ?, ?, ?)""",
(feedback.message_id, current_user['user_id'],
feedback.feedback_type, feedback.comment)
)
print(f"ā
Created feedback for message {feedback.message_id}")
conn.commit()
conn.close()
return {
"success": True,
"message": "Feedback submitted successfully"
}
except Exception as e:
conn.close()
print(f"ā Error submitting feedback: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/admin/get-feedbacks")
async def get_all_feedbacks(current_user: dict = Depends(get_current_user)):
"""Get all feedback with details"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
try:
c.execute("""
SELECT
f.id,
f.message_id,
f.feedback_type,
f.comment,
f.created_at,
m.content as message_content,
m.sender as message_sender,
m.role as message_role,
u.email as user_email,
c.title as conversation_title,
c.id as conversation_id
FROM feedbacks f
JOIN messages m ON f.message_id = m.id
JOIN users u ON f.user_id = u.id
JOIN conversations c ON m.conversation_id = c.id
ORDER BY f.created_at DESC
""")
feedbacks = [dict(row) for row in c.fetchall()]
conn.close()
return {"feedbacks": feedbacks}
except Exception as e:
conn.close()
print(f"ā Error fetching feedbacks: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =======================
# Admin Endpoints - Courses
# =======================
@app.post("/admin/create-course")
async def create_course(
course: CourseCreate,
current_user: dict = Depends(get_current_user)
):
"""Create a new course"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
try:
# Check if exists
c.execute("SELECT id FROM courses WHERE name = ?", (course.name,))
if c.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Course already exists")
# Insert
c.execute(
"INSERT INTO courses (name, description, admin_id) VALUES (?, ?, ?)",
(course.name, course.description, current_user['user_id'])
)
course_id = c.lastrowid
conn.commit()
conn.close()
print(f"ā
Course created: {course.name} (ID: {course_id})")
return {
"success": True,
"message": "Course created successfully",
"course_id": course_id,
"name": course.name
}
except HTTPException:
raise
except Exception as e:
conn.close()
print(f"ā Error creating course: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/admin/get-courses")
async def get_courses(current_user: dict = Depends(get_current_user)):
"""Get all courses with lecture counts"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
try:
c.execute("""
SELECT
c.id,
c.name,
c.description,
c.created_at,
COUNT(l.id) as lecture_count
FROM courses c
LEFT JOIN lectures l ON c.name = l.subject
GROUP BY c.id
ORDER BY c.created_at DESC
""")
courses = []
for row in c.fetchall():
courses.append({
"id": row[0],
"name": row[1],
"description": row[2],
"created_at": row[3],
"lecture_count": row[4]
})
conn.close()
return {"courses": courses}
except Exception as e:
conn.close()
print(f"ā Error fetching courses: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/admin/course/{course_name}/lectures")
async def get_course_lectures(
course_name: str,
current_user: dict = Depends(get_current_user)
):
"""Get lectures for a course"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
try:
c.execute("""
SELECT id, filename, filepath, subject, uploaded_at,
processing_status, total_chunks, total_characters
FROM lectures
WHERE subject = ?
ORDER BY uploaded_at DESC
""", (course_name,))
lectures = [dict(row) for row in c.fetchall()]
conn.close()
return {
"course_name": course_name,
"lectures": lectures
}
except Exception as e:
conn.close()
print(f"ā Error fetching course lectures: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/admin/delete-course/{course_id}")
async def delete_course(
course_id: int,
current_user: dict = Depends(get_current_user)
):
"""Delete a course"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
try:
c.execute("SELECT name FROM courses WHERE id = ?", (course_id,))
course = c.fetchone()
if not course:
conn.close()
raise HTTPException(status_code=404, detail="Course not found")
course_name = course[0]
c.execute("DELETE FROM courses WHERE id = ?", (course_id,))
conn.commit()
conn.close()
print(f"šļø Course deleted: {course_name}")
return {
"success": True,
"message": f"Course '{course_name}' deleted",
"course_id": course_id
}
except HTTPException:
raise
except Exception as e:
conn.close()
print(f"ā Error deleting course: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =======================
# Admin Endpoints - Lectures & Stats
# =======================
@app.get("/admin/get-users")
async def get_users(current_user: dict = Depends(get_current_user)):
"""Get all users"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
c.execute("SELECT id, email, role, created_at FROM users ORDER BY created_at DESC")
users = [dict(row) for row in c.fetchall()]
conn.close()
return {"users": users}
@app.get("/admin/get-lectures")
async def get_lectures(current_user: dict = Depends(get_current_user)):
"""Get all lectures"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
try:
c.execute("""
SELECT id, filename, subject, uploaded_at,
processing_status, total_chunks, total_characters, error_message
FROM lectures
ORDER BY uploaded_at DESC
""")
lectures = [dict(row) for row in c.fetchall()]
conn.close()
return {"lectures": lectures}
except Exception as e:
conn.close()
print(f"ā Error in get_lectures: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/admin/upload-lecture")
async def upload_lecture(
file: UploadFile = File(...),
subject: str = Form(...),
current_user: dict = Depends(get_current_user)
):
"""Upload and process lecture"""
print(f"\n{'='*60}")
print(f"š¤ Upload Request")
print(f" File: {file.filename}")
print(f" Course: {subject}")
print(f" User: {current_user['user_id']}")
print(f"{'='*60}\n")
if current_user['role'] != 'admin':
raise HTTPException(status_code=403, detail="Access denied")
file_ext = os.path.splitext(file.filename)[1].lower()
if file_ext != '.pdf':
raise HTTPException(status_code=400, detail="Only PDF files allowed")
if not subject or subject.strip() == "":
raise HTTPException(status_code=400, detail="Course name required")
unique_filename = f"{uuid.uuid4()}{file_ext}"
filepath = os.path.join(LECTURES_DIR, unique_filename)
try:
with open(filepath, "wb") as f:
content = await file.read()
f.write(content)
print(f"ā
File saved: {filepath}")
except Exception as e:
print(f"ā Failed to save: {e}")
raise HTTPException(status_code=500, detail=f"Save error: {str(e)}")
lecture_id = None
try:
conn = get_db()
c = conn.cursor()
c.execute(
"""INSERT INTO lectures
(admin_id, filename, filepath, subject, uploaded_at, processing_status)
VALUES (?, ?, ?, ?, datetime('now'), ?)""",
(current_user['user_id'], file.filename, filepath, subject.strip(), 'processing')
)
lecture_id = c.lastrowid
conn.commit()
conn.close()
print(f"ā
Lecture saved to DB: {lecture_id}")
# Process PDF (uncomment when ready)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
process_new_pdf,
filepath,
subject.strip()
)
if not result['success']:
raise Exception(result.get('error', 'Processing failed'))
conn = get_db()
c = conn.cursor()
c.execute(
"""UPDATE lectures
SET processing_status = 'completed',
total_chunks = ?,
total_characters = ?
WHERE id = ?""",
(result['total_chunks'], result['total_characters'], lecture_id)
)
conn.commit()
conn.close()
print(f"ā
Processing completed")
return {
"success": True,
"message": "Lecture uploaded successfully",
"lecture_id": lecture_id,
"filename": file.filename,
"subject": subject,
"status": "completed",
"stats": {
"total_chunks": result['total_chunks'],
"total_characters": result['total_characters']
}
}
except Exception as e:
error_msg = str(e)
print(f"ā Error: {error_msg}")
if lecture_id:
try:
conn = get_db()
c = conn.cursor()
c.execute(
"""UPDATE lectures
SET processing_status = 'failed', error_message = ?
WHERE id = ?""",
(error_msg, lecture_id)
)
conn.commit()
conn.close()
except Exception as db_error:
print(f"ā ļø Failed to update error: {db_error}")
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
pass
raise HTTPException(status_code=500, detail=f"Processing error: {error_msg}")
@app.delete("/admin/delete-lecture/{lecture_id}")
async def delete_lecture(
lecture_id: int,
current_user: dict = Depends(get_current_user)
):
"""Delete a lecture"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
c.execute("SELECT filepath FROM lectures WHERE id = ?", (lecture_id,))
lecture = c.fetchone()
if not lecture:
conn.close()
raise HTTPException(status_code=404, detail="Lecture not found")
filepath = lecture[0]
c.execute("DELETE FROM lectures WHERE id = ?", (lecture_id,))
conn.commit()
conn.close()
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"šļø Deleted file: {filepath}")
except Exception as e:
print(f"ā ļø Could not delete file: {e}")
return {
"success": True,
"message": "Lecture deleted",
"lecture_id": lecture_id
}
@app.get("/admin/get-stats")
async def get_stats(current_user: dict = Depends(get_current_user)):
"""Get comprehensive statistics"""
if current_user["role"] != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
conn = get_db()
c = conn.cursor()
try:
# Users
c.execute("SELECT COUNT(*) FROM users WHERE role = 'student'")
total_students = c.fetchone()[0]
# Lectures
c.execute("SELECT COUNT(*) FROM lectures")
total_lectures = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM lectures WHERE processing_status = 'completed'")
completed_lectures = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM lectures WHERE processing_status = 'failed'")
failed_lectures = c.fetchone()[0]
# Courses
c.execute("SELECT COUNT(*) FROM courses")
total_courses = c.fetchone()[0]
# Activity
c.execute("SELECT COUNT(*) FROM conversations WHERE is_deleted = 0")
total_conversations = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM messages")
total_messages = c.fetchone()[0]
# Feedback
c.execute("SELECT COUNT(*) FROM feedbacks WHERE feedback_type = 'positive'")
positive_feedbacks = c.fetchone()[0]
c.execute("SELECT COUNT(*) FROM feedbacks WHERE feedback_type = 'negative'")
negative_feedbacks = c.fetchone()[0]
# Content stats
c.execute("SELECT SUM(total_chunks) FROM lectures WHERE processing_status = 'completed'")
result = c.fetchone()[0]
total_chunks = result if result else 0
c.execute("SELECT SUM(total_characters) FROM lectures WHERE processing_status = 'completed'")
result = c.fetchone()[0]
total_characters = result if result else 0
conn.close()
return {
"stats": {
"users": {
"total_students": total_students
},
"courses": {
"total": total_courses
},
"lectures": {
"total": total_lectures,
"completed": completed_lectures,
"failed": failed_lectures,
"processing": total_lectures - completed_lectures - failed_lectures
},
"content": {
"total_chunks": total_chunks,
"total_characters": total_characters
},
"activity": {
"total_conversations": total_conversations,
"total_messages": total_messages
},
"feedback": {
"positive": positive_feedbacks,
"negative": negative_feedbacks,
"total": positive_feedbacks + negative_feedbacks
}
}
}
except Exception as e:
conn.close()
print(f"ā Error in get_stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
# =======================
# User Info
# =======================
@app.get("/user/me")
def get_me(current_user: dict = Depends(get_current_user)):
"""Get current user info"""
conn = get_db()
c = conn.cursor()
c.execute("SELECT email FROM users WHERE id = ?", (current_user['user_id'],))
user = c.fetchone()
conn.close()
if user:
current_user['email'] = user['email']
return current_user
# =======================
# Health Check
# =======================
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.now(timezone.utc).isoformat(),
"version": "3.1.0"
}
# =======================
# Run Server
# =======================
if __name__ == "__main__":
import uvicorn
print("\n" + "="*60)
print("š Starting University AI Chatbot API")
print("="*60)
print(f"š API URL: http://localhost:8080")
print(f"š Docs: http://localhost:8080/docs")
print(f"š¤ Admin: admin@university.edu / Admin123")
print("="*60 + "\n")
uvicorn.run(app, host="0.0.0.0", port=7860)