File size: 5,145 Bytes
ed147e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""

Authentication API endpoints for user signup and login.

"""

from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr, Field
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession

from src.db.database import get_session
from src.db.models import User
from src.auth.security import hash_password, verify_password, create_access_token
from src.utils.logger import setup_logger
from src.utils.config import settings

logger = setup_logger(__name__)

router = APIRouter(prefix="/auth", tags=["Authentication"])


# Request/Response Models
class SignupRequest(BaseModel):
    """Request model for user signup."""
    email: EmailStr = Field(..., description="User email address")
    username: str = Field(..., min_length=3, max_length=50, description="Username")
    password: str = Field(..., min_length=6, description="Password (min 6 characters)")

    class Config:
        json_schema_extra = {
            "example": {
                "email": "student@example.com",
                "username": "Student123",
                "password": "secure_password"
            }
        }


class UserResponse(BaseModel):
    """Response model for user data (without password)."""
    id: int
    email: str
    username: str
    role: str
    created_at: str
    
    class Config:
        json_schema_extra = {
            "example": {
                "id": 1,
                "email": "student@example.com",
                "username": "Student123",
                "role": "user",
                "created_at": "2024-01-27T05:00:00"
            }
        }


class TokenResponse(BaseModel):
    """Response model for login token."""
    access_token: str = Field(..., description="JWT access token")
    token_type: str = Field(default="bearer", description="Token type")
    expires_in: int = Field(..., description="Token expiration time in minutes")
    
    class Config:
        json_schema_extra = {
            "example": {
                "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
                "token_type": "bearer",
                "expires_in": 60
            }
        }


@router.post("/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def signup(

    signup_data: SignupRequest,

    session: AsyncSession = Depends(get_session)

):
    """

    Register a new user.

    """
    # Check if email or username already exists
    statement = select(User).where(
        (User.email == signup_data.email) | (User.username == signup_data.username)
    )
    result = await session.exec(statement)
    existing_user = result.first()
    
    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Email or Username already registered"
        )
    
    # Create new user with hashed password
    hashed_password_value = hash_password(signup_data.password)
    
    new_user = User(
        email=signup_data.email,
        username=signup_data.username,
        password_hash=hashed_password_value,
        role="user"
    )
    
    session.add(new_user)
    await session.commit()
    await session.refresh(new_user)
    
    logger.info(f"New user registered: {new_user.email}")
    
    return UserResponse(
        id=new_user.id,
        email=new_user.email,
        username=new_user.username,
        role=new_user.role,
        created_at=str(new_user.created_at)
    )


@router.post("/login", response_model=TokenResponse)
async def login(

    form_data: OAuth2PasswordRequestForm = Depends(),

    session: AsyncSession = Depends(get_session)

):
    """

    Authenticate user and return JWT access token.

    """
    # Find user by username
    statement = select(User).where(User.username == form_data.username)
    result = await session.exec(statement)
    user = result.first()
    
    # If not found by username, try finding by email
    if not user:
        statement = select(User).where(User.email == form_data.username)
        result = await session.exec(statement)
        user = result.first()
    
    # Verify user exists and password is correct
    if not user or not verify_password(form_data.password, user.password_hash):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Create access token
    access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    
    logger.info(f"User logged in: {user.username}")
    
    return TokenResponse(
        access_token=access_token,
        token_type="bearer",
        expires_in=settings.access_token_expire_minutes
    )