| import logging |
| import uuid |
| from typing import Optional |
|
|
| from open_webui.apps.webui.internal.db import Base, get_db |
| from open_webui.apps.webui.models.users import UserModel, Users |
| from open_webui.env import SRC_LOG_LEVELS |
| from pydantic import BaseModel |
| from sqlalchemy import Boolean, Column, String, Text |
| from open_webui.utils.utils import verify_password |
|
|
| log = logging.getLogger(__name__) |
| log.setLevel(SRC_LOG_LEVELS["MODELS"]) |
|
|
| |
| |
| |
|
|
|
|
| class Auth(Base): |
| __tablename__ = "auth" |
|
|
| id = Column(String, primary_key=True) |
| email = Column(String) |
| password = Column(Text) |
| active = Column(Boolean) |
|
|
|
|
| class AuthModel(BaseModel): |
| id: str |
| email: str |
| password: str |
| active: bool = True |
|
|
|
|
| |
| |
| |
|
|
|
|
| class Token(BaseModel): |
| token: str |
| token_type: str |
|
|
|
|
| class ApiKey(BaseModel): |
| api_key: Optional[str] = None |
|
|
|
|
| class UserResponse(BaseModel): |
| id: str |
| email: str |
| name: str |
| role: str |
| profile_image_url: str |
|
|
|
|
| class SigninResponse(Token, UserResponse): |
| pass |
|
|
|
|
| class SigninForm(BaseModel): |
| email: str |
| password: str |
|
|
|
|
| class ProfileImageUrlForm(BaseModel): |
| profile_image_url: str |
|
|
|
|
| class UpdateProfileForm(BaseModel): |
| profile_image_url: str |
| name: str |
|
|
|
|
| class UpdatePasswordForm(BaseModel): |
| password: str |
| new_password: str |
|
|
|
|
| class SignupForm(BaseModel): |
| name: str |
| email: str |
| password: str |
| profile_image_url: Optional[str] = "/user.png" |
|
|
|
|
| class AddUserForm(SignupForm): |
| role: Optional[str] = "pending" |
|
|
|
|
| class AuthsTable: |
| def insert_new_auth( |
| self, |
| email: str, |
| password: str, |
| name: str, |
| profile_image_url: str = "/user.png", |
| role: str = "pending", |
| oauth_sub: Optional[str] = None, |
| ) -> Optional[UserModel]: |
| with get_db() as db: |
| log.info("insert_new_auth") |
|
|
| id = str(uuid.uuid4()) |
|
|
| auth = AuthModel( |
| **{"id": id, "email": email, "password": password, "active": True} |
| ) |
| result = Auth(**auth.model_dump()) |
| db.add(result) |
|
|
| user = Users.insert_new_user( |
| id, name, email, profile_image_url, role, oauth_sub |
| ) |
|
|
| db.commit() |
| db.refresh(result) |
|
|
| if result and user: |
| return user |
| else: |
| return None |
|
|
| def authenticate_user(self, email: str, password: str) -> Optional[UserModel]: |
| log.info(f"authenticate_user: {email}") |
| try: |
| with get_db() as db: |
| auth = db.query(Auth).filter_by(email=email, active=True).first() |
| if auth: |
| if verify_password(password, auth.password): |
| user = Users.get_user_by_id(auth.id) |
| return user |
| else: |
| return None |
| else: |
| return None |
| except Exception: |
| return None |
|
|
| def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]: |
| log.info(f"authenticate_user_by_api_key: {api_key}") |
| |
| if not api_key: |
| return None |
|
|
| try: |
| user = Users.get_user_by_api_key(api_key) |
| return user if user else None |
| except Exception: |
| return False |
|
|
| def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]: |
| log.info(f"authenticate_user_by_trusted_header: {email}") |
| try: |
| with get_db() as db: |
| auth = db.query(Auth).filter_by(email=email, active=True).first() |
| if auth: |
| user = Users.get_user_by_id(auth.id) |
| return user |
| except Exception: |
| return None |
|
|
| def update_user_password_by_id(self, id: str, new_password: str) -> bool: |
| try: |
| with get_db() as db: |
| result = ( |
| db.query(Auth).filter_by(id=id).update({"password": new_password}) |
| ) |
| db.commit() |
| return True if result == 1 else False |
| except Exception: |
| return False |
|
|
| def update_email_by_id(self, id: str, email: str) -> bool: |
| try: |
| with get_db() as db: |
| result = db.query(Auth).filter_by(id=id).update({"email": email}) |
| db.commit() |
| return True if result == 1 else False |
| except Exception: |
| return False |
|
|
| def delete_auth_by_id(self, id: str) -> bool: |
| try: |
| with get_db() as db: |
| |
| result = Users.delete_user_by_id(id) |
|
|
| if result: |
| db.query(Auth).filter_by(id=id).delete() |
| db.commit() |
|
|
| return True |
| else: |
| return False |
| except Exception: |
| return False |
|
|
|
|
| Auths = AuthsTable() |
|
|