| | from collections.abc import Callable |
| | from functools import wraps |
| | from typing import Optional, Union |
| |
|
| | from controllers.console.app.error import AppNotFoundError |
| | from extensions.ext_database import db |
| | from libs.login import current_user |
| | from models import App |
| | from models.model import AppMode |
| |
|
| |
|
| | def get_app_model(view: Optional[Callable] = None, *, mode: Union[AppMode, list[AppMode]] = None): |
| | def decorator(view_func): |
| | @wraps(view_func) |
| | def decorated_view(*args, **kwargs): |
| | if not kwargs.get("app_id"): |
| | raise ValueError("missing app_id in path parameters") |
| |
|
| | app_id = kwargs.get("app_id") |
| | app_id = str(app_id) |
| |
|
| | del kwargs["app_id"] |
| |
|
| | app_model = ( |
| | db.session.query(App) |
| | .filter(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") |
| | .first() |
| | ) |
| |
|
| | if not app_model: |
| | raise AppNotFoundError() |
| |
|
| | app_mode = AppMode.value_of(app_model.mode) |
| | if app_mode == AppMode.CHANNEL: |
| | raise AppNotFoundError() |
| |
|
| | if mode is not None: |
| | if isinstance(mode, list): |
| | modes = mode |
| | else: |
| | modes = [mode] |
| |
|
| | if app_mode not in modes: |
| | mode_values = {m.value for m in modes} |
| | raise AppNotFoundError(f"App mode is not in the supported list: {mode_values}") |
| |
|
| | kwargs["app_model"] = app_model |
| |
|
| | return view_func(*args, **kwargs) |
| |
|
| | return decorated_view |
| |
|
| | if view is None: |
| | return decorator |
| | else: |
| | return decorator(view) |
| |
|