Spaces:
Sleeping
Sleeping
File size: 3,032 Bytes
ed147e2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | """
FastAPI dependencies for authentication and authorization.
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlmodel import Session, select
from src.db.database import get_session
from src.db.models import User
from src.auth.security import decode_access_token
# OAuth2 scheme for extracting bearer tokens from Authorization header
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
session: Session = Depends(get_session)
) -> User:
"""
Get the currently authenticated user from JWT token.
This dependency extracts the JWT token from the Authorization header,
validates it, and retrieves the corresponding user from the database.
Args:
token: JWT token from Authorization header
session: Database session
Returns:
User object if authentication is successful
Raises:
HTTPException: 401 Unauthorized if token is invalid or user not found
Usage:
@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_user)):
return {"message": f"Hello {current_user.username}"}
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Decode the token
payload = decode_access_token(token)
if payload is None:
raise credentials_exception
# Extract user email from token
email: Optional[str] = payload.get("sub")
if email is None:
raise credentials_exception
# Retrieve user from database
statement = select(User).where(User.email == email)
user = session.exec(statement).first()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Get the current active user (for future soft-delete support).
Currently returns the user as-is, but can be extended to check
for account status, email verification, banned users, etc.
Args:
current_user: User from get_current_user dependency
Returns:
User object if user is active
Raises:
HTTPException: 400 Bad Request if user is inactive
Usage:
@app.get("/protected")
async def protected_route(user: User = Depends(get_current_active_user)):
return {"message": f"Hello active user {user.username}"}
"""
# Future: Check if user.is_active, user.is_verified, etc.
# if not current_user.is_active:
# raise HTTPException(status_code=400, detail="Inactive user")
return current_user
|