| """
|
| CycloneDX 1.6 Schema Validation for AIBOM Generator.
|
|
|
| This module provides validation of generated AIBOMs against the official
|
| CycloneDX 1.6 JSON schema to ensure compliance and interoperability.
|
| """
|
| import json
|
| import logging
|
| from pathlib import Path
|
| from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
|
| import requests
|
| import jsonschema
|
| from jsonschema import Draft7Validator, ValidationError
|
| from referencing import Registry, Resource
|
|
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| CYCLONEDX_1_6_SCHEMA_URL = "https://raw.githubusercontent.com/CycloneDX/specification/master/schema/bom-1.6.schema.json"
|
|
|
| SCHEMA_CACHE_DIR = Path(__file__).parent.parent / "schemas"
|
| SCHEMA_CACHE_FILE = SCHEMA_CACHE_DIR / "bom-1.6.schema.json"
|
|
|
|
|
| _cached_schema: Optional[Dict[str, Any]] = None
|
|
|
|
|
| def _ensure_cache_dir() -> None:
|
| """Ensure the schema cache directory exists."""
|
| SCHEMA_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| def _load_schema_from_cache() -> Optional[Dict[str, Any]]:
|
| """Load schema from local cache if available."""
|
| if SCHEMA_CACHE_FILE.exists():
|
| try:
|
| with open(SCHEMA_CACHE_FILE, "r", encoding="utf-8") as f:
|
| schema = json.load(f)
|
| logger.debug("Loaded CycloneDX 1.6 schema from cache")
|
| return schema
|
| except (json.JSONDecodeError, IOError) as e:
|
| logger.warning("Failed to load cached schema: %s", e)
|
| return None
|
|
|
|
|
| def _download_schema() -> Optional[Dict[str, Any]]:
|
| """Download the CycloneDX 1.6 schema from the official repository."""
|
| try:
|
| logger.info("Downloading CycloneDX 1.6 schema from %s", CYCLONEDX_1_6_SCHEMA_URL)
|
| response = requests.get(CYCLONEDX_1_6_SCHEMA_URL, timeout=30)
|
| response.raise_for_status()
|
| schema = response.json()
|
|
|
|
|
| _ensure_cache_dir()
|
| with open(SCHEMA_CACHE_FILE, "w", encoding="utf-8") as f:
|
| json.dump(schema, f, indent=2)
|
| logger.info("CycloneDX 1.6 schema downloaded and cached")
|
|
|
| return schema
|
| except requests.RequestException as e:
|
| logger.error("Failed to download CycloneDX schema: %s", e)
|
| return None
|
| except (json.JSONDecodeError, IOError) as e:
|
| logger.error("Failed to parse or cache schema: %s", e)
|
| return None
|
|
|
|
|
| def load_schema(force_download: bool = False) -> Optional[Dict[str, Any]]:
|
| """
|
| Load the CycloneDX 1.6 JSON schema.
|
|
|
| Uses in-memory cache first, then file cache, then downloads if needed.
|
|
|
| Args:
|
| force_download: If True, download fresh schema even if cached.
|
|
|
| Returns:
|
| The schema dictionary, or None if loading failed.
|
| """
|
| global _cached_schema
|
|
|
|
|
| if _cached_schema is not None and not force_download:
|
| return _cached_schema
|
|
|
|
|
| if not force_download:
|
| schema = _load_schema_from_cache()
|
| if schema:
|
| _cached_schema = schema
|
| return schema
|
|
|
|
|
| schema = _download_schema()
|
| if schema:
|
| _cached_schema = schema
|
|
|
| return schema
|
|
|
|
|
| def _format_validation_error(error: ValidationError) -> str:
|
| """Format a validation error into a readable message."""
|
| path = " -> ".join(str(p) for p in error.absolute_path) if error.absolute_path else "root"
|
| return f"[{path}] {error.message}"
|
|
|
|
|
| def validate_aibom(aibom: Dict[str, Any], strict: bool = False) -> Tuple[bool, List[str]]:
|
| """
|
| Validate an AIBOM against the CycloneDX 1.6 schema.
|
|
|
| Args:
|
| aibom: The AIBOM dictionary to validate.
|
| strict: If True, fail on any schema deviation. If False, collect all errors.
|
|
|
| Returns:
|
| Tuple of (is_valid, list of error messages).
|
| If valid, returns (True, []).
|
| If invalid, returns (False, [error1, error2, ...]).
|
| """
|
| schema = load_schema()
|
|
|
| if schema is None:
|
| logger.warning("Could not load CycloneDX schema - skipping validation")
|
| return True, ["Schema unavailable"]
|
|
|
|
|
| spdx_path = SCHEMA_CACHE_DIR / "spdx.schema.json"
|
| registry = Registry()
|
| if spdx_path.exists():
|
| try:
|
| with open(spdx_path, "r", encoding="utf-8") as f:
|
| spdx_schema = json.load(f)
|
| resource = Resource.from_contents(spdx_schema)
|
| registry = registry.with_resource(uri="spdx.schema.json", resource=resource)
|
| except Exception as e:
|
| logger.warning("Failed to load SPDX schema for validation: %s", e)
|
|
|
| validator = Draft7Validator(schema, registry=registry)
|
| errors = sorted(validator.iter_errors(aibom), key=lambda e: e.path)
|
|
|
| if not errors:
|
| return True, []
|
|
|
| error_messages = [_format_validation_error(e) for e in errors]
|
| return False, error_messages
|
|
|
| def get_validation_summary(aibom: Dict[str, Any]) -> Dict[str, Any]:
|
| """Get a summary of schema validation results."""
|
| is_valid, errors = validate_aibom(aibom)
|
| return {
|
| "valid": is_valid,
|
| "error_count": len(errors),
|
| "errors": errors[:10] if not is_valid else []
|
| }
|
|
|