study-notes-api / src /api /auth_routes.py
ALI7ADEL's picture
Upload 51 files
ed147e2 verified
"""
Authentication API endpoints for user signup and login.
"""
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr, Field
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from src.db.database import get_session
from src.db.models import User
from src.auth.security import hash_password, verify_password, create_access_token
from src.utils.logger import setup_logger
from src.utils.config import settings
logger = setup_logger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
# Request/Response Models
class SignupRequest(BaseModel):
"""Request model for user signup."""
email: EmailStr = Field(..., description="User email address")
username: str = Field(..., min_length=3, max_length=50, description="Username")
password: str = Field(..., min_length=6, description="Password (min 6 characters)")
class Config:
json_schema_extra = {
"example": {
"email": "student@example.com",
"username": "Student123",
"password": "secure_password"
}
}
class UserResponse(BaseModel):
"""Response model for user data (without password)."""
id: int
email: str
username: str
role: str
created_at: str
class Config:
json_schema_extra = {
"example": {
"id": 1,
"email": "student@example.com",
"username": "Student123",
"role": "user",
"created_at": "2024-01-27T05:00:00"
}
}
class TokenResponse(BaseModel):
"""Response model for login token."""
access_token: str = Field(..., description="JWT access token")
token_type: str = Field(default="bearer", description="Token type")
expires_in: int = Field(..., description="Token expiration time in minutes")
class Config:
json_schema_extra = {
"example": {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 60
}
}
@router.post("/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def signup(
signup_data: SignupRequest,
session: AsyncSession = Depends(get_session)
):
"""
Register a new user.
"""
# Check if email or username already exists
statement = select(User).where(
(User.email == signup_data.email) | (User.username == signup_data.username)
)
result = await session.exec(statement)
existing_user = result.first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email or Username already registered"
)
# Create new user with hashed password
hashed_password_value = hash_password(signup_data.password)
new_user = User(
email=signup_data.email,
username=signup_data.username,
password_hash=hashed_password_value,
role="user"
)
session.add(new_user)
await session.commit()
await session.refresh(new_user)
logger.info(f"New user registered: {new_user.email}")
return UserResponse(
id=new_user.id,
email=new_user.email,
username=new_user.username,
role=new_user.role,
created_at=str(new_user.created_at)
)
@router.post("/login", response_model=TokenResponse)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
session: AsyncSession = Depends(get_session)
):
"""
Authenticate user and return JWT access token.
"""
# Find user by username
statement = select(User).where(User.username == form_data.username)
result = await session.exec(statement)
user = result.first()
# If not found by username, try finding by email
if not user:
statement = select(User).where(User.email == form_data.username)
result = await session.exec(statement)
user = result.first()
# Verify user exists and password is correct
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
logger.info(f"User logged in: {user.username}")
return TokenResponse(
access_token=access_token,
token_type="bearer",
expires_in=settings.access_token_expire_minutes
)