Spaces:
Sleeping
Sleeping
File size: 5,145 Bytes
ed147e2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """
Authentication API endpoints for user signup and login.
"""
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr, Field
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from src.db.database import get_session
from src.db.models import User
from src.auth.security import hash_password, verify_password, create_access_token
from src.utils.logger import setup_logger
from src.utils.config import settings
logger = setup_logger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
# Request/Response Models
class SignupRequest(BaseModel):
"""Request model for user signup."""
email: EmailStr = Field(..., description="User email address")
username: str = Field(..., min_length=3, max_length=50, description="Username")
password: str = Field(..., min_length=6, description="Password (min 6 characters)")
class Config:
json_schema_extra = {
"example": {
"email": "student@example.com",
"username": "Student123",
"password": "secure_password"
}
}
class UserResponse(BaseModel):
"""Response model for user data (without password)."""
id: int
email: str
username: str
role: str
created_at: str
class Config:
json_schema_extra = {
"example": {
"id": 1,
"email": "student@example.com",
"username": "Student123",
"role": "user",
"created_at": "2024-01-27T05:00:00"
}
}
class TokenResponse(BaseModel):
"""Response model for login token."""
access_token: str = Field(..., description="JWT access token")
token_type: str = Field(default="bearer", description="Token type")
expires_in: int = Field(..., description="Token expiration time in minutes")
class Config:
json_schema_extra = {
"example": {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 60
}
}
@router.post("/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def signup(
signup_data: SignupRequest,
session: AsyncSession = Depends(get_session)
):
"""
Register a new user.
"""
# Check if email or username already exists
statement = select(User).where(
(User.email == signup_data.email) | (User.username == signup_data.username)
)
result = await session.exec(statement)
existing_user = result.first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email or Username already registered"
)
# Create new user with hashed password
hashed_password_value = hash_password(signup_data.password)
new_user = User(
email=signup_data.email,
username=signup_data.username,
password_hash=hashed_password_value,
role="user"
)
session.add(new_user)
await session.commit()
await session.refresh(new_user)
logger.info(f"New user registered: {new_user.email}")
return UserResponse(
id=new_user.id,
email=new_user.email,
username=new_user.username,
role=new_user.role,
created_at=str(new_user.created_at)
)
@router.post("/login", response_model=TokenResponse)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
session: AsyncSession = Depends(get_session)
):
"""
Authenticate user and return JWT access token.
"""
# Find user by username
statement = select(User).where(User.username == form_data.username)
result = await session.exec(statement)
user = result.first()
# If not found by username, try finding by email
if not user:
statement = select(User).where(User.email == form_data.username)
result = await session.exec(statement)
user = result.first()
# Verify user exists and password is correct
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
logger.info(f"User logged in: {user.username}")
return TokenResponse(
access_token=access_token,
token_type="bearer",
expires_in=settings.access_token_expire_minutes
) |