| """ |
| Agent 2: Token Normalizer & Structurer |
| Design System Extractor v2 |
| |
| Persona: Design System Librarian |
| |
| Responsibilities: |
| - Clean noisy extraction data |
| - Deduplicate similar tokens (colors within threshold, similar spacing) |
| - Infer naming patterns from class names and contexts |
| - Tag tokens as: detected | inferred | low-confidence |
| - Group colors by role (primary, secondary, neutral, etc.) |
| """ |
|
|
| import re |
| from typing import Optional |
| from collections import defaultdict |
|
|
| from core.token_schema import ( |
| ColorToken, |
| TypographyToken, |
| SpacingToken, |
| ExtractedTokens, |
| NormalizedTokens, |
| Confidence, |
| TokenSource, |
| ) |
| from core.color_utils import ( |
| parse_color, |
| normalize_hex, |
| categorize_color, |
| ) |
|
|
|
|
| class TokenNormalizer: |
| """ |
| Normalizes and structures extracted tokens. |
| |
| This is Agent 2's job — taking raw extraction data and |
| organizing it into a clean, deduplicated structure. |
| """ |
| |
| def __init__(self): |
| |
| self.color_similarity_threshold = 10 |
| self.spacing_merge_threshold = 2 |
| |
| |
| self.color_role_keywords = { |
| "primary": ["primary", "brand", "main", "accent"], |
| "secondary": ["secondary", "alt", "alternate"], |
| "success": ["success", "green", "positive", "valid"], |
| "warning": ["warning", "yellow", "caution", "alert"], |
| "error": ["error", "red", "danger", "invalid", "negative"], |
| "info": ["info", "blue", "notice"], |
| "neutral": ["gray", "grey", "neutral", "muted", "subtle"], |
| "background": ["bg", "background", "surface"], |
| "text": ["text", "foreground", "content", "body"], |
| "border": ["border", "divider", "separator", "line"], |
| } |
| |
| def normalize(self, extracted: ExtractedTokens) -> NormalizedTokens: |
| """ |
| Normalize extracted tokens. |
| |
| Args: |
| extracted: Raw extraction results from Agent 1 |
| |
| Returns: |
| NormalizedTokens with cleaned, deduplicated data |
| """ |
| |
| colors_list = self._normalize_colors(extracted.colors) |
| typography_list = self._normalize_typography(extracted.typography) |
| spacing_list = self._normalize_spacing(extracted.spacing) |
| |
| |
| colors_dict = {} |
| for c in colors_list: |
| key = c.suggested_name or c.value |
| colors_dict[key] = c |
| |
| typography_dict = {} |
| for t in typography_list: |
| key = t.suggested_name or f"{t.font_family}-{t.font_size}" |
| typography_dict[key] = t |
| |
| spacing_dict = {} |
| for s in spacing_list: |
| key = s.suggested_name or s.value |
| spacing_dict[key] = s |
| |
| |
| radius_dict = {} |
| for r in extracted.radius: |
| key = f"radius-{r.value}" |
| radius_dict[key] = r |
| |
| shadows_dict = {} |
| for s in extracted.shadows: |
| key = f"shadow-{hash(s.value) % 1000}" |
| shadows_dict[key] = s |
| |
| |
| normalized = NormalizedTokens( |
| viewport=extracted.viewport, |
| source_url=extracted.source_url, |
| colors=colors_dict, |
| typography=typography_dict, |
| spacing=spacing_dict, |
| radius=radius_dict, |
| shadows=shadows_dict, |
| font_families=extracted.font_families, |
| detected_spacing_base=extracted.spacing_base, |
| detected_naming_convention=extracted.naming_convention, |
| ) |
| |
| return normalized |
| |
| def _normalize_colors(self, colors: list[ColorToken]) -> list[ColorToken]: |
| """ |
| Normalize color tokens: |
| - Deduplicate similar colors |
| - Infer color roles |
| - Assign suggested names |
| - Calculate confidence |
| """ |
| if not colors: |
| return [] |
| |
| |
| unique_colors = {} |
| for color in colors: |
| hex_val = normalize_hex(color.value) |
| if hex_val in unique_colors: |
| |
| existing = unique_colors[hex_val] |
| existing.frequency += color.frequency |
| existing.contexts = list(set(existing.contexts + color.contexts)) |
| existing.elements = list(set(existing.elements + color.elements)) |
| existing.css_properties = list(set(existing.css_properties + color.css_properties)) |
| else: |
| color.value = hex_val |
| unique_colors[hex_val] = color |
| |
| |
| merged_colors = self._merge_similar_colors(list(unique_colors.values())) |
| |
| |
| for color in merged_colors: |
| role = self._infer_color_role(color) |
| if role: |
| color.suggested_name = self._generate_color_name(color, role) |
| else: |
| color.suggested_name = self._generate_color_name_from_value(color) |
| |
| |
| color.confidence = self._calculate_confidence(color.frequency) |
| |
| |
| merged_colors.sort(key=lambda c: -c.frequency) |
| |
| return merged_colors |
| |
| def _merge_similar_colors(self, colors: list[ColorToken]) -> list[ColorToken]: |
| """Merge colors that are visually very similar.""" |
| if len(colors) <= 1: |
| return colors |
| |
| merged = [] |
| used = set() |
| |
| for i, color1 in enumerate(colors): |
| if i in used: |
| continue |
| |
| |
| similar_group = [color1] |
| for j, color2 in enumerate(colors[i+1:], i+1): |
| if j in used: |
| continue |
| if self._colors_are_similar(color1.value, color2.value): |
| similar_group.append(color2) |
| used.add(j) |
| |
| |
| similar_group.sort(key=lambda c: -c.frequency) |
| primary = similar_group[0] |
| |
| |
| for other in similar_group[1:]: |
| primary.frequency += other.frequency |
| primary.contexts = list(set(primary.contexts + other.contexts)) |
| primary.elements = list(set(primary.elements + other.elements)) |
| |
| merged.append(primary) |
| used.add(i) |
| |
| return merged |
| |
| def _colors_are_similar(self, hex1: str, hex2: str) -> bool: |
| """Check if two colors are visually similar.""" |
| try: |
| parsed1 = parse_color(hex1) |
| parsed2 = parse_color(hex2) |
| if parsed1 is None or parsed2 is None: |
| return False |
| if parsed1.rgb is None or parsed2.rgb is None: |
| return False |
| |
| rgb1 = parsed1.rgb |
| rgb2 = parsed2.rgb |
| |
| |
| distance = sum((a - b) ** 2 for a, b in zip(rgb1, rgb2)) ** 0.5 |
| return distance < self.color_similarity_threshold |
| except Exception: |
| return False |
| |
| def _infer_color_role(self, color: ColorToken) -> Optional[str]: |
| """Infer the semantic role of a color from its contexts.""" |
| all_context = " ".join(color.contexts + color.elements).lower() |
| |
| for role, keywords in self.color_role_keywords.items(): |
| for keyword in keywords: |
| if keyword in all_context: |
| return role |
| |
| |
| category = categorize_color(color.value) |
| if category in ["gray", "white", "black"]: |
| return "neutral" |
| |
| return None |
| |
| def _generate_color_name(self, color: ColorToken, role: str) -> str: |
| """Generate a semantic name for a color.""" |
| |
| parsed = parse_color(color.value) |
| if parsed and parsed.rgb: |
| rgb = parsed.rgb |
| luminance = (0.299 * rgb[0] + 0.587 * rgb[1] + 0.114 * rgb[2]) / 255 |
| if luminance > 0.8: |
| shade = "50" |
| elif luminance > 0.6: |
| shade = "200" |
| elif luminance > 0.4: |
| shade = "500" |
| elif luminance > 0.2: |
| shade = "700" |
| else: |
| shade = "900" |
| else: |
| shade = "500" |
| |
| return f"color.{role}.{shade}" |
| |
| def _generate_color_name_from_value(self, color: ColorToken) -> str: |
| """Generate a name based on the color value itself.""" |
| category = categorize_color(color.value) |
| parsed = parse_color(color.value) |
| |
| if parsed and parsed.rgb: |
| rgb = parsed.rgb |
| luminance = (0.299 * rgb[0] + 0.587 * rgb[1] + 0.114 * rgb[2]) / 255 |
| if luminance > 0.6: |
| shade = "light" |
| elif luminance > 0.3: |
| shade = "base" |
| else: |
| shade = "dark" |
| else: |
| shade = "base" |
| |
| return f"color.{category}.{shade}" |
| |
| def _normalize_typography(self, typography: list[TypographyToken]) -> list[TypographyToken]: |
| """ |
| Normalize typography tokens: |
| - Deduplicate identical styles |
| - Infer type scale categories |
| - Assign suggested names |
| """ |
| if not typography: |
| return [] |
| |
| |
| unique_typo = {} |
| for typo in typography: |
| key = f"{typo.font_family}|{typo.font_size}|{typo.font_weight}|{typo.line_height}" |
| if key in unique_typo: |
| existing = unique_typo[key] |
| existing.frequency += typo.frequency |
| existing.elements = list(set(existing.elements + typo.elements)) |
| else: |
| unique_typo[key] = typo |
| |
| result = list(unique_typo.values()) |
| |
| |
| for typo in result: |
| typo.suggested_name = self._generate_typography_name(typo) |
| typo.confidence = self._calculate_confidence(typo.frequency) |
| |
| |
| result.sort(key=lambda t: -self._parse_font_size(t.font_size)) |
| |
| return result |
| |
| def _generate_typography_name(self, typo: TypographyToken) -> str: |
| """Generate a semantic name for typography.""" |
| size_px = self._parse_font_size(typo.font_size) |
| elements = " ".join(typo.elements).lower() |
| |
| |
| if any(h in elements for h in ["h1", "hero", "display"]): |
| category = "display" |
| elif any(h in elements for h in ["h2", "h3", "h4", "h5", "h6", "heading", "title"]): |
| category = "heading" |
| elif any(h in elements for h in ["label", "caption", "small", "meta"]): |
| category = "label" |
| elif any(h in elements for h in ["body", "p", "paragraph", "text"]): |
| category = "body" |
| else: |
| category = "text" |
| |
| |
| if size_px >= 32: |
| size_tier = "xl" |
| elif size_px >= 24: |
| size_tier = "lg" |
| elif size_px >= 18: |
| size_tier = "md" |
| elif size_px >= 14: |
| size_tier = "sm" |
| else: |
| size_tier = "xs" |
| |
| return f"font.{category}.{size_tier}" |
| |
| def _parse_font_size(self, size: str) -> float: |
| """Parse font size string to pixels.""" |
| if not size: |
| return 16 |
| |
| size = size.lower().strip() |
| |
| |
| if "px" in size: |
| try: |
| return float(size.replace("px", "")) |
| except ValueError: |
| return 16 |
| |
| |
| if "rem" in size: |
| try: |
| return float(size.replace("rem", "")) * 16 |
| except ValueError: |
| return 16 |
| |
| |
| if "em" in size: |
| try: |
| return float(size.replace("em", "")) * 16 |
| except ValueError: |
| return 16 |
| |
| |
| try: |
| return float(size) |
| except ValueError: |
| return 16 |
| |
| def _normalize_spacing(self, spacing: list[SpacingToken]) -> list[SpacingToken]: |
| """ |
| Normalize spacing tokens: |
| - Merge similar values |
| - Align to base-8 grid if close |
| - Assign suggested names |
| """ |
| if not spacing: |
| return [] |
| |
| |
| unique_spacing = {} |
| for space in spacing: |
| key = space.value |
| if key in unique_spacing: |
| existing = unique_spacing[key] |
| existing.frequency += space.frequency |
| existing.contexts = list(set(existing.contexts + space.contexts)) |
| else: |
| unique_spacing[key] = space |
| |
| result = list(unique_spacing.values()) |
| |
| |
| result = self._merge_similar_spacing(result) |
| |
| |
| for space in result: |
| space.suggested_name = self._generate_spacing_name(space) |
| space.confidence = self._calculate_confidence(space.frequency) |
| |
| |
| result.sort(key=lambda s: s.value_px) |
| |
| return result |
| |
| def _merge_similar_spacing(self, spacing: list[SpacingToken]) -> list[SpacingToken]: |
| """Merge spacing values that are very close.""" |
| if len(spacing) <= 1: |
| return spacing |
| |
| |
| spacing.sort(key=lambda s: s.value_px) |
| |
| merged = [] |
| i = 0 |
| |
| while i < len(spacing): |
| current = spacing[i] |
| group = [current] |
| |
| |
| j = i + 1 |
| while j < len(spacing): |
| if abs(spacing[j].value_px - current.value_px) <= self.spacing_merge_threshold: |
| group.append(spacing[j]) |
| j += 1 |
| else: |
| break |
| |
| |
| group.sort(key=lambda s: (-s.fits_base_8, -s.frequency)) |
| primary = group[0] |
| |
| for other in group[1:]: |
| primary.frequency += other.frequency |
| primary.contexts = list(set(primary.contexts + other.contexts)) |
| |
| merged.append(primary) |
| i = j |
| |
| return merged |
| |
| def _generate_spacing_name(self, space: SpacingToken) -> str: |
| """Generate a semantic name for spacing.""" |
| px = space.value_px |
| |
| |
| if px <= 2: |
| size = "px" |
| elif px <= 4: |
| size = "0.5" |
| elif px <= 8: |
| size = "1" |
| elif px <= 12: |
| size = "1.5" |
| elif px <= 16: |
| size = "2" |
| elif px <= 20: |
| size = "2.5" |
| elif px <= 24: |
| size = "3" |
| elif px <= 32: |
| size = "4" |
| elif px <= 40: |
| size = "5" |
| elif px <= 48: |
| size = "6" |
| elif px <= 64: |
| size = "8" |
| elif px <= 80: |
| size = "10" |
| elif px <= 96: |
| size = "12" |
| else: |
| size = str(int(px / 4)) |
| |
| return f"space.{size}" |
| |
| def _calculate_confidence(self, frequency: int) -> Confidence: |
| """Calculate confidence based on frequency.""" |
| if frequency >= 10: |
| return Confidence.HIGH |
| elif frequency >= 3: |
| return Confidence.MEDIUM |
| else: |
| return Confidence.LOW |
|
|
|
|
| def normalize_tokens(extracted: ExtractedTokens) -> NormalizedTokens: |
| """Convenience function to normalize tokens.""" |
| normalizer = TokenNormalizer() |
| return normalizer.normalize(extracted) |
|
|