File size: 3,032 Bytes
ed147e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""

FastAPI dependencies for authentication and authorization.

"""

from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlmodel import Session, select

from src.db.database import get_session
from src.db.models import User
from src.auth.security import decode_access_token

# OAuth2 scheme for extracting bearer tokens from Authorization header
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


async def get_current_user(

    token: str = Depends(oauth2_scheme),

    session: Session = Depends(get_session)

) -> User:
    """

    Get the currently authenticated user from JWT token.

    

    This dependency extracts the JWT token from the Authorization header,

    validates it, and retrieves the corresponding user from the database.

    

    Args:

        token: JWT token from Authorization header

        session: Database session

        

    Returns:

        User object if authentication is successful

        

    Raises:

        HTTPException: 401 Unauthorized if token is invalid or user not found

        

    Usage:

        @app.get("/protected")

        async def protected_route(current_user: User = Depends(get_current_user)):

            return {"message": f"Hello {current_user.username}"}

    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    # Decode the token
    payload = decode_access_token(token)
    if payload is None:
        raise credentials_exception
    
    # Extract user email from token
    email: Optional[str] = payload.get("sub")
    if email is None:
        raise credentials_exception
    
    # Retrieve user from database
    statement = select(User).where(User.email == email)
    user = session.exec(statement).first()
    
    if user is None:
        raise credentials_exception
    
    return user


async def get_current_active_user(

    current_user: User = Depends(get_current_user)

) -> User:
    """

    Get the current active user (for future soft-delete support).

    

    Currently returns the user as-is, but can be extended to check

    for account status, email verification, banned users, etc.

    

    Args:

        current_user: User from get_current_user dependency

        

    Returns:

        User object if user is active

        

    Raises:

        HTTPException: 400 Bad Request if user is inactive

        

    Usage:

        @app.get("/protected")

        async def protected_route(user: User = Depends(get_current_active_user)):

            return {"message": f"Hello active user {user.username}"}

    """
    # Future: Check if user.is_active, user.is_verified, etc.
    # if not current_user.is_active:
    #     raise HTTPException(status_code=400, detail="Inactive user")
    
    return current_user