| | from functools import wraps |
| |
|
| | from flask import current_app, g, has_request_context, request |
| | from flask_login import user_logged_in |
| | from flask_login.config import EXEMPT_METHODS |
| | from werkzeug.exceptions import Unauthorized |
| | from werkzeug.local import LocalProxy |
| |
|
| | from configs import dify_config |
| | from extensions.ext_database import db |
| | from models.account import Account, Tenant, TenantAccountJoin |
| |
|
| | |
| | |
| | current_user = LocalProxy(lambda: _get_user()) |
| |
|
| |
|
| | def login_required(func): |
| | """ |
| | If you decorate a view with this, it will ensure that the current user is |
| | logged in and authenticated before calling the actual view. (If they are |
| | not, it calls the :attr:`LoginManager.unauthorized` callback.) For |
| | example:: |
| | |
| | @app.route('/post') |
| | @login_required |
| | def post(): |
| | pass |
| | |
| | If there are only certain times you need to require that your user is |
| | logged in, you can do so with:: |
| | |
| | if not current_user.is_authenticated: |
| | return current_app.login_manager.unauthorized() |
| | |
| | ...which is essentially the code that this function adds to your views. |
| | |
| | It can be convenient to globally turn off authentication when unit testing. |
| | To enable this, if the application configuration variable `LOGIN_DISABLED` |
| | is set to `True`, this decorator will be ignored. |
| | |
| | .. Note :: |
| | |
| | Per `W3 guidelines for CORS preflight requests |
| | <http://www.w3.org/TR/cors/#cross-origin-request-with-preflight-0>`_, |
| | HTTP ``OPTIONS`` requests are exempt from login checks. |
| | |
| | :param func: The view function to decorate. |
| | :type func: function |
| | """ |
| |
|
| | @wraps(func) |
| | def decorated_view(*args, **kwargs): |
| | auth_header = request.headers.get("Authorization") |
| | if dify_config.ADMIN_API_KEY_ENABLE: |
| | if auth_header: |
| | if " " not in auth_header: |
| | raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.") |
| | auth_scheme, auth_token = auth_header.split(None, 1) |
| | auth_scheme = auth_scheme.lower() |
| | if auth_scheme != "bearer": |
| | raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.") |
| |
|
| | admin_api_key = dify_config.ADMIN_API_KEY |
| | if admin_api_key: |
| | if admin_api_key == auth_token: |
| | workspace_id = request.headers.get("X-WORKSPACE-ID") |
| | if workspace_id: |
| | tenant_account_join = ( |
| | db.session.query(Tenant, TenantAccountJoin) |
| | .filter(Tenant.id == workspace_id) |
| | .filter(TenantAccountJoin.tenant_id == Tenant.id) |
| | .filter(TenantAccountJoin.role == "owner") |
| | .one_or_none() |
| | ) |
| | if tenant_account_join: |
| | tenant, ta = tenant_account_join |
| | account = Account.query.filter_by(id=ta.account_id).first() |
| | |
| | if account: |
| | account.current_tenant = tenant |
| | current_app.login_manager._update_request_context_with_user(account) |
| | user_logged_in.send(current_app._get_current_object(), user=_get_user()) |
| | if request.method in EXEMPT_METHODS or dify_config.LOGIN_DISABLED: |
| | pass |
| | elif not current_user.is_authenticated: |
| | return current_app.login_manager.unauthorized() |
| |
|
| | |
| | |
| | if callable(getattr(current_app, "ensure_sync", None)): |
| | return current_app.ensure_sync(func)(*args, **kwargs) |
| | return func(*args, **kwargs) |
| |
|
| | return decorated_view |
| |
|
| |
|
| | def _get_user(): |
| | if has_request_context(): |
| | if "_login_user" not in g: |
| | current_app.login_manager._load_user() |
| |
|
| | return g._login_user |
| |
|
| | return None |
| |
|