OWASP-AIBOM-Generator / src /controllers /web_controller.py
e2hln's picture
Upload 44 files
6165ba9 verified
raw
history blame
6.45 kB
import os
import re
import json
import logging
import html
from urllib.parse import urlparse
from typing import Optional
from fastapi import APIRouter, Request, Form, HTTPException, Depends
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from huggingface_hub import HfApi
from huggingface_hub.utils import RepositoryNotFoundError
from ..models.service import AIBOMService
from ..models.scoring import calculate_completeness_score
from ..utils.analytics import log_sbom_generation, get_sbom_count
from ..utils.formatter import export_aibom
from ..config import TEMPLATES_DIR, OUTPUT_DIR
logger = logging.getLogger(__name__)
router = APIRouter()
templates = Jinja2Templates(directory=TEMPLATES_DIR)
# --- Helpers ---
HF_ID_REGEX = re.compile(r"^[a-zA-Z0-9\.\-\_]+/[a-zA-Z0-9\.\-\_]+$")
def is_valid_hf_input(input_str: str) -> bool:
if not input_str or len(input_str) > 200:
return False
if input_str.startswith(("http://", "https://")):
try:
parsed = urlparse(input_str)
if parsed.netloc == "huggingface.co":
parts = parsed.path.strip("/").split("/")
if len(parts) >= 2 and parts[0] and parts[1]:
if re.match(r"^[a-zA-Z0-9\.\-\_]+$", parts[0]) and \
re.match(r"^[a-zA-Z0-9\.\-\_]+$", parts[1]):
return True
return False
except Exception:
return False
else:
return bool(HF_ID_REGEX.match(input_str))
# --- Routes ---
@router.get("/", response_class=HTMLResponse)
async def root(request: Request):
return templates.TemplateResponse("index.html", {
"request": request,
"sbom_count": get_sbom_count()
})
@router.get("/status")
async def get_status():
return {"status": "operational", "version": "1.0.0", "generator_version": "2.0.0"}
@router.post("/generate", response_class=HTMLResponse)
async def generate_form(
request: Request,
model_id: str = Form(...),
include_inference: bool = Form(False),
use_best_practices: bool = Form(True)
):
# Security: Validate BEFORE sanitizing to prevent bypass attacks
# (e.g., <script>org/model</script> → &lt;script&gt;org/model&lt;/script&gt; could slip through)
if not is_valid_hf_input(model_id):
return templates.TemplateResponse("error.html", {
"request": request,
"error": "Invalid model ID format.",
"sbom_count": get_sbom_count(),
"model_id": html.escape(model_id)
})
# Sanitize after validation for safe display/storage
sanitized_model_id = html.escape(model_id)
# Use helper from Service to normalize
normalized_id = AIBOMService._normalise_model_id(sanitized_model_id)
# Check existence (non-blocking)
import asyncio
try:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, lambda: HfApi().model_info(normalized_id))
except RepositoryNotFoundError:
return templates.TemplateResponse("error.html", {
"request": request,
"error": f"Model {normalized_id} not found on Hugging Face.",
"sbom_count": get_sbom_count(),
"model_id": normalized_id
})
except Exception as e:
return templates.TemplateResponse("error.html", {
"request": request,
"error": f"Error verifying model: {e}",
"sbom_count": get_sbom_count(),
"model_id": normalized_id
})
# Generate (non-blocking)
try:
def _generate_task():
service = AIBOMService(use_best_practices=use_best_practices)
aibom = service.generate_aibom(sanitized_model_id, include_inference=include_inference)
report = service.get_enhancement_report()
return service, aibom, report
service, aibom, report = await loop.run_in_executor(None, _generate_task)
# Save file (non-blocking I/O)
filename = f"{normalized_id.replace('/', '_')}_ai_sbom_1_6.json"
filepath = os.path.join(OUTPUT_DIR, filename)
filepath_1_7 = os.path.join(OUTPUT_DIR, f"{normalized_id.replace('/', '_')}_ai_sbom_1_7.json")
def _save_task():
# Generate Formatted JSON strings
json_1_6 = export_aibom(aibom, bom_type="cyclonedx", spec_version="1.6")
json_1_7 = export_aibom(aibom, bom_type="cyclonedx", spec_version="1.7")
with open(filepath, "w") as f:
f.write(json_1_6)
with open(filepath_1_7, "w") as f:
f.write(json_1_7)
log_sbom_generation(sanitized_model_id)
return json_1_6, json_1_7
json_1_6, json_1_7 = await loop.run_in_executor(None, _save_task)
# Extract score
completeness_score = None
if report and "final_score" in report:
completeness_score = report["final_score"]
# Fallback score if needed
if not completeness_score:
completeness_score = calculate_completeness_score(aibom)
# Prepare context for template
context = {
"request": request,
"filename": filename,
"download_url": f"/output/{filename}",
"aibom": aibom,
"aibom_cdx_json_1_6": json_1_6,
"aibom_cdx_json_1_7": json_1_7,
"components_json": json.dumps(aibom.get("components", []), indent=2),
"model_id": normalized_id,
"sbom_count": get_sbom_count(),
"completeness_score": completeness_score,
"enhancement_report": report or {},
# Pass legacy variables for template compatibility if needed
"result_file": f"/output/{filename}"
}
return templates.TemplateResponse("result.html", context)
except Exception as e:
logger.error(f"Generation error: {e}", exc_info=True)
return templates.TemplateResponse("error.html", {
"request": request,
"error": f"Internal generation error: {e}",
"sbom_count": get_sbom_count(),
"model_id": normalized_id
})