| | from flask import Blueprint, request, jsonify, current_app |
| | from flask_jwt_extended import jwt_required, get_jwt_identity |
| | from email_validator import validate_email, EmailNotValidError |
| | from backend.services.auth_service import register_user, login_user, get_user_by_id, request_password_reset, reset_user_password |
| | from backend.models.user import User |
| | from backend.utils.country_language_data import COUNTRIES, LANGUAGES |
| |
|
| | auth_bp = Blueprint('auth', __name__) |
| |
|
| | def validate_email_format(email: str) -> tuple[bool, str]: |
| | """ |
| | Validate email format using email-validator library. |
| | |
| | Args: |
| | email: Email string to validate |
| | |
| | Returns: |
| | Tuple of (is_valid, validated_email_or_error_message) |
| | """ |
| | try: |
| | validated = validate_email(email) |
| | return True, validated['email'] |
| | except EmailNotValidError as e: |
| | return False, str(e) |
| |
|
| | @auth_bp.route('/', methods=['OPTIONS']) |
| | def handle_options(): |
| | """Handle OPTIONS requests for preflight CORS checks.""" |
| | return '', 200 |
| |
|
| | @auth_bp.route('/register', methods=['OPTIONS']) |
| | def handle_register_options(): |
| | """Handle OPTIONS requests for preflight CORS checks for register route.""" |
| | return '', 200 |
| |
|
| | @auth_bp.route('/register', methods=['POST']) |
| | def register(): |
| | """ |
| | Register a new user. |
| | |
| | Request Body: |
| | email (str): User email |
| | password (str): User password |
| | country (str, optional): User country (ISO 3166-1 alpha-2 code) |
| | language (str, optional): User language (ISO 639-1 code) |
| | |
| | Returns: |
| | JSON: Registration result |
| | """ |
| | try: |
| | data = request.get_json() |
| |
|
| | |
| | if not data or not all(k in data for k in ('email', 'password')): |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'Email and password are required' |
| | }), 400 |
| |
|
| | email = data['email'] |
| | password = data['password'] |
| | country = data.get('country') |
| | language = data.get('language') |
| |
|
| | |
| | is_valid_email, validated_email_or_error = validate_email_format(email) |
| | if not is_valid_email: |
| | return jsonify({ |
| | 'success': False, |
| | 'message': f'Invalid email format: {validated_email_or_error}' |
| | }), 400 |
| |
|
| | |
| | email = validated_email_or_error |
| |
|
| | |
| | password_validation = validate_password_strength(password) |
| | if not password_validation['valid']: |
| | return jsonify({ |
| | 'success': False, |
| | 'message': password_validation['message'] |
| | }), 400 |
| |
|
| | |
| | if country: |
| | |
| | if not isinstance(country, str) or len(country) != 2 or not country.isalpha(): |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'Country must be a valid ISO 3166-1 alpha-2 code (2 alphabetic characters)' |
| | }), 400 |
| |
|
| | if language: |
| | |
| | if not isinstance(language, str) or len(language) != 2 or not language.isalpha(): |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'Language must be a valid ISO 639-1 code (2 alphabetic characters)' |
| | }), 400 |
| |
|
| | |
| | result = register_user(email, password, country, language) |
| |
|
| | if result['success']: |
| | return jsonify(result), 201 |
| | else: |
| | |
| | if 'already exist' in result.get('message', '').lower(): |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'Account with this email already exists' |
| | }), 400 |
| | return jsonify(result), 400 |
| |
|
| | except Exception as e: |
| | current_app.logger.error(f"Registration error: {str(e)}") |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'An error occurred during registration' |
| | }), 500 |
| |
|
| | @auth_bp.route('/login', methods=['OPTIONS']) |
| | def handle_login_options(): |
| | """Handle OPTIONS requests for preflight CORS checks for login route.""" |
| | from flask import current_app |
| | current_app.logger.info(f"OPTIONS request for /login from {request.remote_addr}") |
| | current_app.logger.info(f"Request headers: {dict(request.headers)}") |
| | return '', 200 |
| |
|
| | @auth_bp.route('/login', methods=['POST']) |
| | def login(): |
| | """ |
| | Authenticate and login a user. |
| | |
| | Request Body: |
| | email (str): User email |
| | password (str): User password |
| | remember_me (bool): Remember me flag for extended session (optional) |
| | |
| | Returns: |
| | JSON: Login result with JWT token |
| | """ |
| | try: |
| | |
| | current_app.logger.info(f"Login request received from {request.remote_addr}") |
| | current_app.logger.info(f"Request headers: {dict(request.headers)}") |
| |
|
| | data = request.get_json() |
| |
|
| | |
| | if not data or not all(k in data for k in ('email', 'password')): |
| | current_app.logger.warning("Login failed: Missing email or password") |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'Email and password are required' |
| | }), 400 |
| |
|
| | email = data['email'] |
| | password = data['password'] |
| | remember_me = data.get('remember_me', False) |
| |
|
| | |
| | is_valid_email, validated_email_or_error = validate_email_format(email) |
| | if not is_valid_email: |
| | current_app.logger.warning(f"Login attempt with invalid email format: {email}") |
| | |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'Invalid email or password' |
| | }), 401 |
| |
|
| | |
| | email = validated_email_or_error |
| |
|
| | |
| | result = login_user(email, password, remember_me) |
| |
|
| | if result['success']: |
| | |
| | response_data = jsonify(result) |
| | response_data.headers.add('Access-Control-Allow-Origin', 'http://localhost:3000') |
| | response_data.headers.add('Access-Control-Allow-Credentials', 'true') |
| | current_app.logger.info(f"Login successful for user {email}") |
| | return response_data, 200 |
| | else: |
| | current_app.logger.warning(f"Login failed for user {email}: {result.get('message', 'Unknown error')}") |
| | return jsonify(result), 401 |
| |
|
| | except Exception as e: |
| | current_app.logger.error(f"Login error: {str(e)}", exc_info=True) |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'An error occurred during login' |
| | }), 500 |
| |
|
| | @auth_bp.route('/logout', methods=['OPTIONS']) |
| | def handle_logout_options(): |
| | """Handle OPTIONS requests for preflight CORS checks for logout route.""" |
| | return '', 200 |
| |
|
| | @auth_bp.route('/logout', methods=['POST']) |
| | @jwt_required() |
| | def logout(): |
| | """ |
| | Logout current user. |
| | |
| | Returns: |
| | JSON: Logout result |
| | """ |
| | try: |
| | current_app.logger.info(f"Logout request for user: {get_jwt_identity()}") |
| | return jsonify({ |
| | 'success': True, |
| | 'message': 'Logged out successfully' |
| | }), 200 |
| |
|
| | except Exception as e: |
| | current_app.logger.error(f"Logout error: {str(e)}") |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'An error occurred during logout' |
| | }), 500 |
| |
|
| | @auth_bp.route('/user', methods=['OPTIONS']) |
| | def handle_user_options(): |
| | """Handle OPTIONS requests for preflight CORS checks for user route.""" |
| | return '', 200 |
| |
|
| | @auth_bp.route('/user', methods=['GET']) |
| | @jwt_required() |
| | def get_current_user(): |
| | """ |
| | Get current authenticated user. |
| | |
| | Returns: |
| | JSON: Current user data |
| | """ |
| | try: |
| | user_id = get_jwt_identity() |
| | current_app.logger.info(f"Get user profile request for user: {user_id}") |
| | user_data = get_user_by_id(user_id) |
| |
|
| | if user_data: |
| | |
| | safe_user_data = {k: v for k, v in user_data.items() if k not in ['password', 'password_hash']} |
| | return jsonify({ |
| | 'success': True, |
| | 'user': safe_user_data |
| | }), 200 |
| | else: |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'User not found' |
| | }), 404 |
| |
|
| | except Exception as e: |
| | current_app.logger.error(f"Get user error: {str(e)}") |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'An error occurred while fetching user data' |
| | }), 500 |
| |
|
| | @auth_bp.route('/registration-options', methods=['GET']) |
| | def get_registration_options(): |
| | """ |
| | Get registration options including countries and languages. |
| | |
| | Returns: |
| | JSON: Registration options |
| | """ |
| | try: |
| | return jsonify({ |
| | 'success': True, |
| | 'countries': COUNTRIES, |
| | 'languages': LANGUAGES |
| | }), 200 |
| |
|
| | except Exception as e: |
| | current_app.logger.error(f"Get registration options error: {str(e)}") |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'An error occurred while fetching registration options' |
| | }), 500 |
| |
|
| | @auth_bp.route('/forgot-password', methods=['OPTIONS']) |
| | def handle_forgot_password_options(): |
| | """Handle OPTIONS requests for preflight CORS checks for forgot password route.""" |
| | return '', 200 |
| |
|
| |
|
| | @auth_bp.route('/forgot-password', methods=['POST']) |
| | def forgot_password(): |
| | """ |
| | Request password reset for a user. |
| | |
| | Request Body: |
| | email (str): User email |
| | |
| | Returns: |
| | JSON: Password reset request result |
| | """ |
| | try: |
| | data = request.get_json() |
| |
|
| | |
| | if not data or 'email' not in data: |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'Email is required' |
| | }), 400 |
| |
|
| | email = data['email'] |
| |
|
| | |
| | is_valid_email, validated_email_or_error = validate_email_format(email) |
| | if not is_valid_email: |
| | |
| | current_app.logger.warning(f"Forgot password request with invalid email format: {email}") |
| | |
| | return jsonify({ |
| | 'success': True, |
| | 'message': 'If an account exists with this email, password reset instructions have been sent.' |
| | }), 200 |
| |
|
| | |
| | email = validated_email_or_error |
| |
|
| | |
| | result = request_password_reset(current_app.supabase, email) |
| |
|
| | |
| | return jsonify({ |
| | 'success': True, |
| | 'message': 'If an account exists with this email, password reset instructions have been sent.' |
| | }), 200 |
| |
|
| | except Exception as e: |
| | current_app.logger.error(f"Forgot password error: {str(e)}") |
| | |
| | return jsonify({ |
| | 'success': True, |
| | 'message': 'If an account exists with this email, password reset instructions have been sent.' |
| | }), 200 |
| |
|
| |
|
| | @auth_bp.route('/reset-password', methods=['OPTIONS']) |
| | def handle_reset_password_options(): |
| | """Handle OPTIONS requests for preflight CORS checks for reset password route.""" |
| | return '', 200 |
| |
|
| | @auth_bp.route('/reset-password', methods=['GET']) |
| | def show_reset_password_form(): |
| | """ |
| | Serve the password reset form. |
| | This endpoint is accessed via the link sent in the password reset email. |
| | The token will be available as a query parameter (e.g., ?token=abc123). |
| | The SPA frontend should read this token and display the form accordingly. |
| | """ |
| | |
| | current_app.logger.info("Password reset form page accessed.") |
| | |
| | |
| | |
| | |
| | return '', 200 |
| |
|
| | @auth_bp.route('/reset-password', methods=['POST']) |
| | def reset_password(): |
| | """ |
| | Reset user password with token. |
| | |
| | Request Body: |
| | token (str): Password reset token |
| | password (str): New password |
| | |
| | Returns: |
| | JSON: Password reset result |
| | """ |
| | try: |
| | data = request.get_json() |
| |
|
| | |
| | if not data or not all(k in data for k in ('token', 'password')): |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'Token and password are required' |
| | }), 400 |
| |
|
| | token = data['token'] |
| | password = data['password'] |
| |
|
| | |
| | password_validation = validate_password_strength(password) |
| | if not password_validation['valid']: |
| | return jsonify({ |
| | 'success': False, |
| | 'message': password_validation['message'] |
| | }), 400 |
| |
|
| | |
| | result = reset_user_password(current_app.supabase, token, password) |
| |
|
| | if result['success']: |
| | return jsonify(result), 200 |
| | else: |
| | return jsonify(result), 400 |
| |
|
| | except Exception as e: |
| | current_app.logger.error(f"Reset password error: {str(e)}") |
| | return jsonify({ |
| | 'success': False, |
| | 'message': 'An error occurred while resetting your password' |
| | }), 500 |
| |
|
| | def validate_password_strength(password: str) -> dict: |
| | """ |
| | Validates password strength based on security requirements. |
| | |
| | Args: |
| | password: Password string to validate |
| | |
| | Returns: |
| | Dictionary with validation result and message |
| | """ |
| | if len(password) < 8: |
| | return { |
| | 'valid': False, |
| | 'message': 'Password must be at least 8 characters long' |
| | } |
| |
|
| | |
| | has_upper = any(c.isupper() for c in password) |
| | has_lower = any(c.islower() for c in password) |
| | has_digit = any(c.isdigit() for c in password) |
| | has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password) |
| |
|
| | if not has_upper: |
| | return { |
| | 'valid': False, |
| | 'message': 'Password must contain at least one uppercase letter' |
| | } |
| |
|
| | if not has_lower: |
| | return { |
| | 'valid': False, |
| | 'message': 'Password must contain at least one lowercase letter' |
| | } |
| |
|
| | if not has_digit: |
| | return { |
| | 'valid': False, |
| | 'message': 'Password must contain at least one number' |
| | } |
| |
|
| | if not has_special: |
| | return { |
| | 'valid': False, |
| | 'message': 'Password must contain at least one special character' |
| | } |
| |
|
| | return { |
| | 'valid': True, |
| | 'message': 'Password is valid' |
| | } |