| | import flask_restful |
| | from flask_login import current_user |
| | from flask_restful import Resource, fields, marshal_with |
| | from werkzeug.exceptions import Forbidden |
| |
|
| | from extensions.ext_database import db |
| | from libs.helper import TimestampField |
| | from libs.login import login_required |
| | from models.dataset import Dataset |
| | from models.model import ApiToken, App |
| |
|
| | from . import api |
| | from .wraps import account_initialization_required, setup_required |
| |
|
| | api_key_fields = { |
| | "id": fields.String, |
| | "type": fields.String, |
| | "token": fields.String, |
| | "last_used_at": TimestampField, |
| | "created_at": TimestampField, |
| | } |
| |
|
| | api_key_list = {"data": fields.List(fields.Nested(api_key_fields), attribute="items")} |
| |
|
| |
|
| | def _get_resource(resource_id, tenant_id, resource_model): |
| | resource = resource_model.query.filter_by(id=resource_id, tenant_id=tenant_id).first() |
| |
|
| | if resource is None: |
| | flask_restful.abort(404, message=f"{resource_model.__name__} not found.") |
| |
|
| | return resource |
| |
|
| |
|
| | class BaseApiKeyListResource(Resource): |
| | method_decorators = [account_initialization_required, login_required, setup_required] |
| |
|
| | resource_type = None |
| | resource_model = None |
| | resource_id_field = None |
| | token_prefix = None |
| | max_keys = 10 |
| |
|
| | @marshal_with(api_key_list) |
| | def get(self, resource_id): |
| | resource_id = str(resource_id) |
| | _get_resource(resource_id, current_user.current_tenant_id, self.resource_model) |
| | keys = ( |
| | db.session.query(ApiToken) |
| | .filter(ApiToken.type == self.resource_type, getattr(ApiToken, self.resource_id_field) == resource_id) |
| | .all() |
| | ) |
| | return {"items": keys} |
| |
|
| | @marshal_with(api_key_fields) |
| | def post(self, resource_id): |
| | resource_id = str(resource_id) |
| | _get_resource(resource_id, current_user.current_tenant_id, self.resource_model) |
| | if not current_user.is_editor: |
| | raise Forbidden() |
| |
|
| | current_key_count = ( |
| | db.session.query(ApiToken) |
| | .filter(ApiToken.type == self.resource_type, getattr(ApiToken, self.resource_id_field) == resource_id) |
| | .count() |
| | ) |
| |
|
| | if current_key_count >= self.max_keys: |
| | flask_restful.abort( |
| | 400, |
| | message=f"Cannot create more than {self.max_keys} API keys for this resource type.", |
| | code="max_keys_exceeded", |
| | ) |
| |
|
| | key = ApiToken.generate_api_key(self.token_prefix, 24) |
| | api_token = ApiToken() |
| | setattr(api_token, self.resource_id_field, resource_id) |
| | api_token.tenant_id = current_user.current_tenant_id |
| | api_token.token = key |
| | api_token.type = self.resource_type |
| | db.session.add(api_token) |
| | db.session.commit() |
| | return api_token, 201 |
| |
|
| |
|
| | class BaseApiKeyResource(Resource): |
| | method_decorators = [account_initialization_required, login_required, setup_required] |
| |
|
| | resource_type = None |
| | resource_model = None |
| | resource_id_field = None |
| |
|
| | def delete(self, resource_id, api_key_id): |
| | resource_id = str(resource_id) |
| | api_key_id = str(api_key_id) |
| | _get_resource(resource_id, current_user.current_tenant_id, self.resource_model) |
| |
|
| | |
| | if not current_user.is_admin_or_owner: |
| | raise Forbidden() |
| |
|
| | key = ( |
| | db.session.query(ApiToken) |
| | .filter( |
| | getattr(ApiToken, self.resource_id_field) == resource_id, |
| | ApiToken.type == self.resource_type, |
| | ApiToken.id == api_key_id, |
| | ) |
| | .first() |
| | ) |
| |
|
| | if key is None: |
| | flask_restful.abort(404, message="API key not found") |
| |
|
| | db.session.query(ApiToken).filter(ApiToken.id == api_key_id).delete() |
| | db.session.commit() |
| |
|
| | return {"result": "success"}, 204 |
| |
|
| |
|
| | class AppApiKeyListResource(BaseApiKeyListResource): |
| | def after_request(self, resp): |
| | resp.headers["Access-Control-Allow-Origin"] = "*" |
| | resp.headers["Access-Control-Allow-Credentials"] = "true" |
| | return resp |
| |
|
| | resource_type = "app" |
| | resource_model = App |
| | resource_id_field = "app_id" |
| | token_prefix = "app-" |
| |
|
| |
|
| | class AppApiKeyResource(BaseApiKeyResource): |
| | def after_request(self, resp): |
| | resp.headers["Access-Control-Allow-Origin"] = "*" |
| | resp.headers["Access-Control-Allow-Credentials"] = "true" |
| | return resp |
| |
|
| | resource_type = "app" |
| | resource_model = App |
| | resource_id_field = "app_id" |
| |
|
| |
|
| | class DatasetApiKeyListResource(BaseApiKeyListResource): |
| | def after_request(self, resp): |
| | resp.headers["Access-Control-Allow-Origin"] = "*" |
| | resp.headers["Access-Control-Allow-Credentials"] = "true" |
| | return resp |
| |
|
| | resource_type = "dataset" |
| | resource_model = Dataset |
| | resource_id_field = "dataset_id" |
| | token_prefix = "ds-" |
| |
|
| |
|
| | class DatasetApiKeyResource(BaseApiKeyResource): |
| | def after_request(self, resp): |
| | resp.headers["Access-Control-Allow-Origin"] = "*" |
| | resp.headers["Access-Control-Allow-Credentials"] = "true" |
| | return resp |
| |
|
| | resource_type = "dataset" |
| | resource_model = Dataset |
| | resource_id_field = "dataset_id" |
| |
|
| |
|
| | api.add_resource(AppApiKeyListResource, "/apps/<uuid:resource_id>/api-keys") |
| | api.add_resource(AppApiKeyResource, "/apps/<uuid:resource_id>/api-keys/<uuid:api_key_id>") |
| | api.add_resource(DatasetApiKeyListResource, "/datasets/<uuid:resource_id>/api-keys") |
| | api.add_resource(DatasetApiKeyResource, "/datasets/<uuid:resource_id>/api-keys/<uuid:api_key_id>") |
| |
|