Spaces:
Sleeping
Sleeping
| """ | |
| Authentication API endpoints for user signup and login. | |
| """ | |
| from datetime import timedelta | |
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from pydantic import BaseModel, EmailStr, Field | |
| from sqlmodel import select | |
| from sqlmodel.ext.asyncio.session import AsyncSession | |
| from src.db.database import get_session | |
| from src.db.models import User | |
| from src.auth.security import hash_password, verify_password, create_access_token | |
| from src.utils.logger import setup_logger | |
| from src.utils.config import settings | |
| logger = setup_logger(__name__) | |
| router = APIRouter(prefix="/auth", tags=["Authentication"]) | |
| # Request/Response Models | |
| class SignupRequest(BaseModel): | |
| """Request model for user signup.""" | |
| email: EmailStr = Field(..., description="User email address") | |
| username: str = Field(..., min_length=3, max_length=50, description="Username") | |
| password: str = Field(..., min_length=6, description="Password (min 6 characters)") | |
| class Config: | |
| json_schema_extra = { | |
| "example": { | |
| "email": "student@example.com", | |
| "username": "Student123", | |
| "password": "secure_password" | |
| } | |
| } | |
| class UserResponse(BaseModel): | |
| """Response model for user data (without password).""" | |
| id: int | |
| email: str | |
| username: str | |
| role: str | |
| created_at: str | |
| class Config: | |
| json_schema_extra = { | |
| "example": { | |
| "id": 1, | |
| "email": "student@example.com", | |
| "username": "Student123", | |
| "role": "user", | |
| "created_at": "2024-01-27T05:00:00" | |
| } | |
| } | |
| class TokenResponse(BaseModel): | |
| """Response model for login token.""" | |
| access_token: str = Field(..., description="JWT access token") | |
| token_type: str = Field(default="bearer", description="Token type") | |
| expires_in: int = Field(..., description="Token expiration time in minutes") | |
| class Config: | |
| json_schema_extra = { | |
| "example": { | |
| "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", | |
| "token_type": "bearer", | |
| "expires_in": 60 | |
| } | |
| } | |
| async def signup( | |
| signup_data: SignupRequest, | |
| session: AsyncSession = Depends(get_session) | |
| ): | |
| """ | |
| Register a new user. | |
| """ | |
| # Check if email or username already exists | |
| statement = select(User).where( | |
| (User.email == signup_data.email) | (User.username == signup_data.username) | |
| ) | |
| result = await session.exec(statement) | |
| existing_user = result.first() | |
| if existing_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="Email or Username already registered" | |
| ) | |
| # Create new user with hashed password | |
| hashed_password_value = hash_password(signup_data.password) | |
| new_user = User( | |
| email=signup_data.email, | |
| username=signup_data.username, | |
| password_hash=hashed_password_value, | |
| role="user" | |
| ) | |
| session.add(new_user) | |
| await session.commit() | |
| await session.refresh(new_user) | |
| logger.info(f"New user registered: {new_user.email}") | |
| return UserResponse( | |
| id=new_user.id, | |
| email=new_user.email, | |
| username=new_user.username, | |
| role=new_user.role, | |
| created_at=str(new_user.created_at) | |
| ) | |
| async def login( | |
| form_data: OAuth2PasswordRequestForm = Depends(), | |
| session: AsyncSession = Depends(get_session) | |
| ): | |
| """ | |
| Authenticate user and return JWT access token. | |
| """ | |
| # Find user by username | |
| statement = select(User).where(User.username == form_data.username) | |
| result = await session.exec(statement) | |
| user = result.first() | |
| # If not found by username, try finding by email | |
| if not user: | |
| statement = select(User).where(User.email == form_data.username) | |
| result = await session.exec(statement) | |
| user = result.first() | |
| # Verify user exists and password is correct | |
| if not user or not verify_password(form_data.password, user.password_hash): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Create access token | |
| access_token_expires = timedelta(minutes=settings.access_token_expire_minutes) | |
| access_token = create_access_token( | |
| data={"sub": user.username}, | |
| expires_delta=access_token_expires | |
| ) | |
| logger.info(f"User logged in: {user.username}") | |
| return TokenResponse( | |
| access_token=access_token, | |
| token_type="bearer", | |
| expires_in=settings.access_token_expire_minutes | |
| ) |