OWASP-AIBOM-Generator / src /utils /validation.py
e2hln's picture
Upload 44 files
6165ba9 verified
raw
history blame
5.6 kB
"""
CycloneDX 1.6 Schema Validation for AIBOM Generator.
This module provides validation of generated AIBOMs against the official
CycloneDX 1.6 JSON schema to ensure compliance and interoperability.
"""
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Make sure to handle requests import if it's not a core dependency (it is in my project)
import requests
import jsonschema
from jsonschema import Draft7Validator, ValidationError
from referencing import Registry, Resource
# Module-level logger
logger = logging.getLogger(__name__)
# CycloneDX schema configuration
CYCLONEDX_1_6_SCHEMA_URL = "https://raw.githubusercontent.com/CycloneDX/specification/master/schema/bom-1.6.schema.json"
# Correct path relative to this file: src/utils/../schemas -> src/schemas
SCHEMA_CACHE_DIR = Path(__file__).parent.parent / "schemas"
SCHEMA_CACHE_FILE = SCHEMA_CACHE_DIR / "bom-1.6.schema.json"
# Global schema cache
_cached_schema: Optional[Dict[str, Any]] = None
def _ensure_cache_dir() -> None:
"""Ensure the schema cache directory exists."""
SCHEMA_CACHE_DIR.mkdir(parents=True, exist_ok=True)
def _load_schema_from_cache() -> Optional[Dict[str, Any]]:
"""Load schema from local cache if available."""
if SCHEMA_CACHE_FILE.exists():
try:
with open(SCHEMA_CACHE_FILE, "r", encoding="utf-8") as f:
schema = json.load(f)
logger.debug("Loaded CycloneDX 1.6 schema from cache")
return schema
except (json.JSONDecodeError, IOError) as e:
logger.warning("Failed to load cached schema: %s", e)
return None
def _download_schema() -> Optional[Dict[str, Any]]:
"""Download the CycloneDX 1.6 schema from the official repository."""
try:
logger.info("Downloading CycloneDX 1.6 schema from %s", CYCLONEDX_1_6_SCHEMA_URL)
response = requests.get(CYCLONEDX_1_6_SCHEMA_URL, timeout=30)
response.raise_for_status()
schema = response.json()
# Cache the schema locally
_ensure_cache_dir()
with open(SCHEMA_CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(schema, f, indent=2)
logger.info("CycloneDX 1.6 schema downloaded and cached")
return schema
except requests.RequestException as e:
logger.error("Failed to download CycloneDX schema: %s", e)
return None
except (json.JSONDecodeError, IOError) as e:
logger.error("Failed to parse or cache schema: %s", e)
return None
def load_schema(force_download: bool = False) -> Optional[Dict[str, Any]]:
"""
Load the CycloneDX 1.6 JSON schema.
Uses in-memory cache first, then file cache, then downloads if needed.
Args:
force_download: If True, download fresh schema even if cached.
Returns:
The schema dictionary, or None if loading failed.
"""
global _cached_schema
# Return in-memory cache if available
if _cached_schema is not None and not force_download:
return _cached_schema
# Try loading from file cache
if not force_download:
schema = _load_schema_from_cache()
if schema:
_cached_schema = schema
return schema
# Download fresh schema
schema = _download_schema()
if schema:
_cached_schema = schema
return schema
def _format_validation_error(error: ValidationError) -> str:
"""Format a validation error into a readable message."""
path = " -> ".join(str(p) for p in error.absolute_path) if error.absolute_path else "root"
return f"[{path}] {error.message}"
def validate_aibom(aibom: Dict[str, Any], strict: bool = False) -> Tuple[bool, List[str]]:
"""
Validate an AIBOM against the CycloneDX 1.6 schema.
Args:
aibom: The AIBOM dictionary to validate.
strict: If True, fail on any schema deviation. If False, collect all errors.
Returns:
Tuple of (is_valid, list of error messages).
If valid, returns (True, []).
If invalid, returns (False, [error1, error2, ...]).
"""
schema = load_schema()
if schema is None:
logger.warning("Could not load CycloneDX schema - skipping validation")
return True, ["Schema unavailable"]
# Load SPDX schema for reference resolution
spdx_path = SCHEMA_CACHE_DIR / "spdx.schema.json"
registry = Registry()
if spdx_path.exists():
try:
with open(spdx_path, "r", encoding="utf-8") as f:
spdx_schema = json.load(f)
resource = Resource.from_contents(spdx_schema)
registry = registry.with_resource(uri="spdx.schema.json", resource=resource)
except Exception as e:
logger.warning("Failed to load SPDX schema for validation: %s", e)
validator = Draft7Validator(schema, registry=registry)
errors = sorted(validator.iter_errors(aibom), key=lambda e: e.path)
if not errors:
return True, []
error_messages = [_format_validation_error(e) for e in errors]
return False, error_messages
def get_validation_summary(aibom: Dict[str, Any]) -> Dict[str, Any]:
"""Get a summary of schema validation results."""
is_valid, errors = validate_aibom(aibom)
return {
"valid": is_valid,
"error_count": len(errors),
"errors": errors[:10] if not is_valid else [] # Limit to first 10
}