| import os
|
| import re
|
| import json
|
| import logging
|
| import html
|
| from urllib.parse import urlparse
|
| from typing import Optional
|
|
|
| from fastapi import APIRouter, Request, Form, HTTPException, Depends
|
| from fastapi.responses import HTMLResponse, JSONResponse
|
| from fastapi.templating import Jinja2Templates
|
| from huggingface_hub import HfApi
|
| from huggingface_hub.utils import RepositoryNotFoundError
|
|
|
| from ..models.service import AIBOMService
|
| from ..models.scoring import calculate_completeness_score
|
| from ..utils.analytics import log_sbom_generation, get_sbom_count
|
| from ..utils.formatter import export_aibom
|
| from ..config import TEMPLATES_DIR, OUTPUT_DIR
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| router = APIRouter()
|
| templates = Jinja2Templates(directory=TEMPLATES_DIR)
|
|
|
|
|
| HF_ID_REGEX = re.compile(r"^[a-zA-Z0-9\.\-\_]+/[a-zA-Z0-9\.\-\_]+$")
|
|
|
| def is_valid_hf_input(input_str: str) -> bool:
|
| if not input_str or len(input_str) > 200:
|
| return False
|
| if input_str.startswith(("http://", "https://")):
|
| try:
|
| parsed = urlparse(input_str)
|
| if parsed.netloc == "huggingface.co":
|
| parts = parsed.path.strip("/").split("/")
|
| if len(parts) >= 2 and parts[0] and parts[1]:
|
| if re.match(r"^[a-zA-Z0-9\.\-\_]+$", parts[0]) and \
|
| re.match(r"^[a-zA-Z0-9\.\-\_]+$", parts[1]):
|
| return True
|
| return False
|
| except Exception:
|
| return False
|
| else:
|
| return bool(HF_ID_REGEX.match(input_str))
|
|
|
|
|
|
|
| @router.get("/", response_class=HTMLResponse)
|
| async def root(request: Request):
|
| return templates.TemplateResponse("index.html", {
|
| "request": request,
|
| "sbom_count": get_sbom_count()
|
| })
|
|
|
| @router.get("/status")
|
| async def get_status():
|
| return {"status": "operational", "version": "1.0.0", "generator_version": "2.0.0"}
|
|
|
| @router.post("/generate", response_class=HTMLResponse)
|
| async def generate_form(
|
| request: Request,
|
| model_id: str = Form(...),
|
| include_inference: bool = Form(False),
|
| use_best_practices: bool = Form(True)
|
| ):
|
|
|
|
|
| if not is_valid_hf_input(model_id):
|
| return templates.TemplateResponse("error.html", {
|
| "request": request,
|
| "error": "Invalid model ID format.",
|
| "sbom_count": get_sbom_count(),
|
| "model_id": html.escape(model_id)
|
| })
|
|
|
|
|
| sanitized_model_id = html.escape(model_id)
|
|
|
|
|
| normalized_id = AIBOMService._normalise_model_id(sanitized_model_id)
|
|
|
|
|
| import asyncio
|
| try:
|
| loop = asyncio.get_running_loop()
|
| await loop.run_in_executor(None, lambda: HfApi().model_info(normalized_id))
|
| except RepositoryNotFoundError:
|
| return templates.TemplateResponse("error.html", {
|
| "request": request,
|
| "error": f"Model {normalized_id} not found on Hugging Face.",
|
| "sbom_count": get_sbom_count(),
|
| "model_id": normalized_id
|
| })
|
| except Exception as e:
|
| return templates.TemplateResponse("error.html", {
|
| "request": request,
|
| "error": f"Error verifying model: {e}",
|
| "sbom_count": get_sbom_count(),
|
| "model_id": normalized_id
|
| })
|
|
|
|
|
| try:
|
| def _generate_task():
|
| service = AIBOMService(use_best_practices=use_best_practices)
|
| aibom = service.generate_aibom(sanitized_model_id, include_inference=include_inference)
|
| report = service.get_enhancement_report()
|
| return service, aibom, report
|
|
|
| service, aibom, report = await loop.run_in_executor(None, _generate_task)
|
|
|
|
|
| filename = f"{normalized_id.replace('/', '_')}_ai_sbom_1_6.json"
|
| filepath = os.path.join(OUTPUT_DIR, filename)
|
| filepath_1_7 = os.path.join(OUTPUT_DIR, f"{normalized_id.replace('/', '_')}_ai_sbom_1_7.json")
|
|
|
| def _save_task():
|
|
|
| json_1_6 = export_aibom(aibom, bom_type="cyclonedx", spec_version="1.6")
|
| json_1_7 = export_aibom(aibom, bom_type="cyclonedx", spec_version="1.7")
|
|
|
| with open(filepath, "w") as f:
|
| f.write(json_1_6)
|
| with open(filepath_1_7, "w") as f:
|
| f.write(json_1_7)
|
| log_sbom_generation(sanitized_model_id)
|
| return json_1_6, json_1_7
|
|
|
| json_1_6, json_1_7 = await loop.run_in_executor(None, _save_task)
|
|
|
|
|
| completeness_score = None
|
| if report and "final_score" in report:
|
| completeness_score = report["final_score"]
|
|
|
|
|
| if not completeness_score:
|
| completeness_score = calculate_completeness_score(aibom)
|
|
|
|
|
| context = {
|
| "request": request,
|
| "filename": filename,
|
| "download_url": f"/output/{filename}",
|
| "aibom": aibom,
|
| "aibom_cdx_json_1_6": json_1_6,
|
| "aibom_cdx_json_1_7": json_1_7,
|
| "components_json": json.dumps(aibom.get("components", []), indent=2),
|
| "model_id": normalized_id,
|
| "sbom_count": get_sbom_count(),
|
| "completeness_score": completeness_score,
|
| "enhancement_report": report or {},
|
|
|
| "result_file": f"/output/{filename}"
|
| }
|
|
|
| return templates.TemplateResponse("result.html", context)
|
|
|
| except Exception as e:
|
| logger.error(f"Generation error: {e}", exc_info=True)
|
| return templates.TemplateResponse("error.html", {
|
| "request": request,
|
| "error": f"Internal generation error: {e}",
|
| "sbom_count": get_sbom_count(),
|
| "model_id": normalized_id
|
| })
|
|
|