from __future__ import annotations import ast import json from copy import deepcopy from typing import Any from prefab_ui.themes import Basic PAGE_CSS_CLASS = "w-full max-w-6xl mx-auto p-4 md:p-6 lg:px-8" WIDE_PAGE_CSS_CLASS = "w-full max-w-[90rem] mx-auto p-4 md:p-6 lg:px-8" DEFAULT_THEME: dict[str, Any] = Basic(accent="blue").to_json() _COMPONENT_KEY_ALIASES: dict[str, str] = { "bar_radius": "barRadius", "button_type": "buttonType", "close_delay": "closeDelay", "col_span": "colSpan", "column_template": "columnTemplate", "css_class": "cssClass", "data_key": "dataKey", "default_value": "defaultValue", "end_angle": "endAngle", "inner_radius": "innerRadius", "name_key": "nameKey", "min_column_width": "minColumnWidth", "on_change": "onChange", "on_click": "onClick", "on_submit": "onSubmit", "open_delay": "openDelay", "padding_angle": "paddingAngle", "row_height": "rowHeight", "row_span": "rowSpan", "show_dots": "showDots", "show_grid": "showGrid", "show_label": "showLabel", "show_legend": "showLegend", "show_tooltip": "showTooltip", "show_y_axis": "showYAxis", "start_angle": "startAngle", "trend_sentiment": "trendSentiment", "x_axis": "xAxis", "y_axis_format": "yAxisFormat", } _DATATABLE_KEY_ALIASES: dict[str, str] = { "on_row_click": "onRowClick", "page_size": "pageSize", "searchable": "search", } _DATATABLE_COLUMN_KEY_ALIASES: dict[str, str] = { "cell_class": "cellClass", "header_class": "headerClass", "max_width": "maxWidth", "min_width": "minWidth", } _PREFERRED_COLUMN_ORDER: tuple[str, ...] = ( "repo_id", "title", "name", "username", "handle", "owner", "author", "repo_type", "type", "role", "status", "likes", "downloads", "followers_count", "following_count", "item_count", "count", "created_at", "published_at", "timestamp", "last_modified", ) _PREFERRED_METRIC_KEYS: tuple[str, ...] = ( "count", "total", "returned", "matched", "scanned", "likes", "downloads", "followers_count", "following_count", "members_count", "models_count", "datasets_count", "spaces_count", "item_count", "api_calls", "elapsed_ms", "pro_likers", "normal_likers", ) _PREFERRED_LABEL_KEYS: tuple[str, ...] = ( "label", "name", "title", "repo_type", "status", "task", "pipeline_tag", "kind", "owner", "username", ) _URL_KEYS: tuple[str, ...] = ( "repo_url", "url", "html_url", "website_url", "project_page_url", "github_repo_url", ) _FILTERABLE_COLUMN_KEYS: tuple[str, ...] = ( "repo_type", "pipeline_tag", "pipeline_tags", "tags", "status", "license", "author", "owner", "username", "user", "users", "handle", "organization", "organizations", ) _FILTERABLE_COLUMN_SUFFIXES: tuple[str, ...] = ( "_type", "_tag", "_tags", "_status", "_license", "_author", "_owner", "_username", "_user", "_users", "_handle", "_organization", "_organizations", ) _USER_NAME_KEYS: tuple[str, ...] = ( "full_name", "display_name", "name", "username", "handle", ) _USER_AVATAR_KEYS: tuple[str, ...] = ( "avatar_url", "avatar", "image_url", ) _USER_SOCIAL_LINK_KEYS: tuple[tuple[str, str], ...] = ( ("hf_url", "Hugging Face"), ("profile_url", "Profile"), ("website_url", "Website"), ("blog_url", "Blog"), ("github_url", "GitHub"), ("twitter_url", "Twitter"), ) def _copy_default_theme() -> dict[str, Any]: return deepcopy(DEFAULT_THEME) def _rename_keys(value: dict[str, Any], aliases: dict[str, str]) -> dict[str, Any]: renamed = dict(value) for old_key, new_key in aliases.items(): if old_key in renamed and new_key not in renamed: renamed[new_key] = renamed.pop(old_key) return renamed def normalize_prefab_wire(value: Any) -> Any: if isinstance(value, list): return [normalize_prefab_wire(item) for item in value] if not isinstance(value, dict): return value normalized = {key: normalize_prefab_wire(item) for key, item in value.items()} component_type = normalized.get("type") if isinstance(component_type, str): normalized = _rename_keys(normalized, _COMPONENT_KEY_ALIASES) if component_type == "Metric" and "title" in normalized and "label" not in normalized: normalized["label"] = normalized.pop("title") if component_type == "DataTable": normalized = _rename_keys(normalized, _DATATABLE_KEY_ALIASES) if "data" in normalized and "rows" not in normalized: normalized["rows"] = normalized.pop("data") if isinstance(normalized.get("columns"), list): fixed_columns: list[Any] = [] for column in normalized["columns"]: if isinstance(column, dict): fixed_column = _rename_keys(dict(column), _DATATABLE_COLUMN_KEY_ALIASES) if "accessor" in fixed_column and "key" not in fixed_column: fixed_column["key"] = fixed_column.pop("accessor") if "title" in fixed_column and "header" not in fixed_column: fixed_column["header"] = fixed_column.pop("title") if "sortable" not in fixed_column: fixed_column["sortable"] = fixed_column.get("key") not in {"description"} fixed_columns.append(fixed_column) else: fixed_columns.append(column) normalized["columns"] = fixed_columns if "view" in normalized and isinstance(normalized["view"], list): normalized["view"] = { "type": "Column", "gap": 6, "cssClass": PAGE_CSS_CLASS, "children": normalized["view"], } return normalized def error_wire(message: str) -> dict[str, Any]: return { "version": "0.2", "theme": _copy_default_theme(), "view": { "type": "Alert", "variant": "destructive", "icon": "circle-x", "children": [ {"type": "AlertTitle", "content": "Prefab demo error"}, {"type": "AlertDescription", "content": message}, ], }, } def extract_prefab_body_parts( raw_value: Any, ) -> tuple[dict[str, Any], dict[str, Any] | None, dict[str, Any] | None, dict[str, Any] | None]: normalized = normalize_prefab_wire(raw_value) if isinstance(normalized, dict) and isinstance(normalized.get("view"), dict): return ( normalized["view"], normalized.get("state") if isinstance(normalized.get("state"), dict) else None, normalized.get("defs") if isinstance(normalized.get("defs"), dict) else None, normalized.get("theme") if isinstance(normalized.get("theme"), dict) else None, ) if isinstance(normalized, dict) and normalized.get("type"): return normalized, None, None, None if isinstance(normalized, list): return {"type": "Column", "gap": 6, "children": normalized}, None, None, None raise ValueError("payload did not contain a Prefab body or wire object") def build_runtime_sections(meta: dict[str, Any] | None) -> list[dict[str, Any]]: if not isinstance(meta, dict) or not meta: return [] sections: list[dict[str, Any]] = [] if meta.get("limits_reached"): sections.append( { "type": "Alert", "variant": "warning", "icon": "circle-alert", "children": [ {"type": "AlertTitle", "content": "Runtime limits reached"}, { "type": "AlertDescription", "content": "The result was produced under runtime limits. See execution metadata for details.", }, ], } ) summary_children: list[dict[str, Any]] = [{"type": "Small", "content": "Execution metadata"}] if meta.get("api_calls") is not None: summary_children.append( { "type": "Badge", "label": f'{meta["api_calls"]} calls', "variant": "outline", } ) if meta.get("elapsed_ms") is not None: summary_children.append( { "type": "Badge", "label": f'{meta["elapsed_ms"]} ms', "variant": "outline", } ) if meta.get("limits_reached"): summary_children.append( { "type": "Badge", "label": "limits reached", "variant": "secondary", } ) else: summary_children.append( { "type": "Muted", "content": "No runtime limits reached", } ) if len(summary_children) > 1: sections.append( { "type": "Column", "gap": 2, "children": [ {"type": "Separator", "orientation": "horizontal"}, { "type": "Row", "gap": 2, "align": "center", "cssClass": "flex-wrap", "children": summary_children, }, ], } ) limit_summary = meta.get("limit_summary") if isinstance(limit_summary, list) and limit_summary: parts: list[str] = [] for item in limit_summary: if not isinstance(item, dict): continue helper = str(item.get("helper") or item.get("source") or "unknown") returned = item.get("returned") total = item.get("total") truncated_by = str(item.get("truncated_by") or "").strip() hint = str(item.get("next_request_hint") or "").strip() detail_bits: list[str] = [] if returned is not None and total is not None: detail_bits.append(f"{returned}/{total}") elif returned is not None: detail_bits.append(str(returned)) if truncated_by: detail_bits.append(truncated_by.replace("_", " ")) detail = ", ".join(detail_bits) part = helper if not detail else f"{helper} ({detail})" if hint: part = f"{part}: {hint}" parts.append(part) if parts: sections.append( { "type": "Alert", "variant": "warning", "icon": "circle-alert", "children": [ {"type": "AlertTitle", "content": "Compact limit summary"}, {"type": "AlertDescription", "content": " • ".join(parts[:3])}, ], } ) return sections def _looks_like_page_root(component: dict[str, Any]) -> bool: if component.get("type") != "Column": return False if not isinstance(component.get("children"), list): return False css_class = str(component.get("cssClass") or "") return "max-w-" in css_class and "mx-auto" in css_class def wrap_prefab_body( body_view: dict[str, Any], *, meta: dict[str, Any] | None = None, state: dict[str, Any] | None = None, defs: dict[str, Any] | None = None, theme: dict[str, Any] | None = None, ) -> dict[str, Any]: runtime_sections = build_runtime_sections(meta) if _looks_like_page_root(body_view): children = list(body_view.get("children") or []) view = dict(body_view) view.setdefault("gap", 6) view.setdefault("cssClass", PAGE_CSS_CLASS) view["children"] = [*children, *runtime_sections] else: view = { "type": "Column", "gap": 6, "cssClass": PAGE_CSS_CLASS, "children": [body_view, *runtime_sections], } wire: dict[str, Any] = { "version": "0.2", "theme": theme if theme is not None else _copy_default_theme(), "view": view, } if state: wire["state"] = state if defs: wire["defs"] = defs return wire def coerce_wire(raw_value: Any) -> dict[str, Any]: normalized = normalize_prefab_wire(raw_value) if isinstance(normalized, dict) and isinstance(normalized.get("view"), dict): wire = dict(normalized) wire.setdefault("version", "0.2") if not isinstance(wire.get("theme"), dict): wire["theme"] = _copy_default_theme() return wire body_view, state, defs, theme = extract_prefab_body_parts(normalized) return wrap_prefab_body( body_view, state=state, defs=defs, theme=theme, ) def _strip_code_fence(text: str) -> str: stripped = text.strip() if not stripped.startswith("```"): return stripped lines = stripped.splitlines() if not lines: return stripped if lines[0].startswith("```"): lines = lines[1:] if lines and lines[-1].strip() == "```": lines = lines[:-1] return "\n".join(lines).strip() def parse_jsonish_value(text: str) -> Any: candidate = _strip_code_fence(text) if not candidate: raise ValueError("empty payload") try: return json.loads(candidate) except json.JSONDecodeError: try: return ast.literal_eval(candidate) except (SyntaxError, ValueError) as exc: raise ValueError("payload was not valid JSON or Python literal data") from exc def parse_runtime_payload(text: str) -> dict[str, Any]: payload = parse_jsonish_value(text) if not isinstance(payload, dict): raise ValueError("runtime payload was not an object") return payload def _titleize(key: str) -> str: text = key.replace("_", " ").replace("-", " ").strip() return " ".join(part.capitalize() for part in text.split()) or key def _truncate(text: str, limit: int = 160) -> str: if len(text) <= limit: return text return f"{text[: limit - 1]}…" def _compact_text(value: Any, limit: int = 160) -> str: if value is None: return "None" if isinstance(value, str): collapsed = " ".join(value.split()) return _truncate(collapsed, limit=limit) if isinstance(value, (int, float, bool)): return str(value) if isinstance(value, list): if value and all(isinstance(item, (str, int, float, bool)) or item is None for item in value): collapsed = ", ".join(_compact_text(item, limit=40) for item in value[:6]) if len(value) > 6: collapsed = f"{collapsed}, …" return _truncate(collapsed, limit=limit) return _truncate(json.dumps(value, ensure_ascii=False, sort_keys=True), limit=limit) def _is_scalar(value: Any) -> bool: if value is None or isinstance(value, (str, int, float, bool)): return True if isinstance(value, list): return all(item is None or isinstance(item, (str, int, float, bool)) for item in value) return False def _normalize_cell(value: Any, *, key: str) -> Any: if value is None or isinstance(value, (str, int, float, bool)): return value if isinstance(value, list): if value and all(isinstance(item, str) for item in value): return [_compact_text(item, limit=40) for item in value[:8]] return _compact_text(value) def _normalize_row(row: dict[str, Any]) -> dict[str, Any]: return { str(key): _normalize_cell(value, key=str(key)) for key, value in row.items() } def _is_badge_friendly_key(key: str) -> bool: return key in _FILTERABLE_COLUMN_KEYS or key.endswith(_FILTERABLE_COLUMN_SUFFIXES) def _should_make_filterable(key: str, rows: list[dict[str, Any]]) -> bool: if not _is_badge_friendly_key(key): return False values = [row.get(key) for row in rows] if any(isinstance(value, list) for value in values): return True scalar_values = [ value for value in values if isinstance(value, (str, int, float, bool)) ] if not scalar_values: return False if any(isinstance(value, (int, float)) and not isinstance(value, bool) for value in scalar_values): return False return 0 < len({str(value) for value in scalar_values}) <= 12 def _column_rank(key: str) -> tuple[int, str]: try: return (_PREFERRED_COLUMN_ORDER.index(key), key) except ValueError: return (len(_PREFERRED_COLUMN_ORDER), key) def _metric_rank(key: str) -> tuple[int, str]: try: return (_PREFERRED_METRIC_KEYS.index(key), key) except ValueError: return (len(_PREFERRED_METRIC_KEYS), key) def _label_rank(key: str) -> tuple[int, str]: try: return (_PREFERRED_LABEL_KEYS.index(key), key) except ValueError: return (len(_PREFERRED_LABEL_KEYS), key) def _build_row_click(rows: list[dict[str, Any]]) -> dict[str, Any] | None: for key in _URL_KEYS: if any(isinstance(row.get(key), str) and row.get(key) for row in rows): return { "action": "openLink", "url": f"{{{{ $event.{key} }}}}", } return None def _select_distribution_fields( rows: list[dict[str, Any]], ) -> tuple[str, str] | None: if not 2 <= len(rows) <= 8: return None shared_keys = set(rows[0]) for row in rows[1:]: shared_keys &= set(row) if not shared_keys: return None numeric_keys = [ key for key in shared_keys if all(isinstance(row.get(key), (int, float)) for row in rows) ] if not numeric_keys: return None count_key = sorted(numeric_keys, key=_metric_rank)[0] label_candidates = [ key for key in shared_keys if key != count_key and all(isinstance(row.get(key), str) and row.get(key).strip() for row in rows) ] if not label_candidates: return None label_key = sorted(label_candidates, key=_label_rank)[0] return label_key, count_key def _build_distribution_card( title: str, rows: list[dict[str, Any]], *, label_key: str, count_key: str, ) -> dict[str, Any]: return { "type": "Card", "children": [ { "type": "CardHeader", "children": [ {"type": "CardTitle", "content": f"{title} distribution"}, { "type": "CardDescription", "content": f'{_titleize(count_key)} by {_titleize(label_key).lower()}', }, ], }, { "type": "CardContent", "children": [ { "type": "PieChart", "data": rows, "dataKey": count_key, "nameKey": label_key, "innerRadius": 60, "paddingAngle": 2, "showLegend": True, "showTooltip": True, "showLabel": False, "height": 260, } ], }, ], } def _looks_like_user_profile(values: dict[str, Any]) -> bool: return any(key in values for key in ("username", "handle", "avatar_url", "hf_url", "profile_url")) def _first_present(values: dict[str, Any], keys: tuple[str, ...]) -> str | None: for key in keys: value = values.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None def _user_profile_links(values: dict[str, Any]) -> list[tuple[str, str]]: links: list[tuple[str, str]] = [] for key, label in _USER_SOCIAL_LINK_KEYS: value = values.get(key) if isinstance(value, str) and value.strip(): links.append((label, value.strip())) username = _first_present(values, ("username", "handle")) if username and not any(label == "Hugging Face" for label, _ in links): links.insert(0, ("Hugging Face", f"https://huggingface.co/{username.lstrip('@')}")) github = values.get("github") if isinstance(github, str) and github.strip() and not any(label == "GitHub" for label, _ in links): links.append(("GitHub", f"https://github.com/{github.strip().lstrip('@')}")) twitter = values.get("twitter") if isinstance(twitter, str) and twitter.strip() and not any(label == "Twitter" for label, _ in links): links.append(("Twitter", f"https://x.com/{twitter.strip().lstrip('@')}")) deduped: list[tuple[str, str]] = [] seen_urls: set[str] = set() for label, url in links: if url in seen_urls: continue seen_urls.add(url) deduped.append((label, url)) return deduped[:4] def _build_user_profile_card(title: str, values: dict[str, Any]) -> dict[str, Any] | None: name = _first_present(values, _USER_NAME_KEYS) if not name: return None username = _first_present(values, ("username", "handle")) subtitle = f"@{username.lstrip('@')}" if username else title avatar = _first_present(values, _USER_AVATAR_KEYS) bio = _first_present(values, ("bio", "description", "headline")) links = _user_profile_links(values) row_children: list[dict[str, Any]] = [] if avatar: row_children.append( { "type": "Image", "src": avatar, "alt": name, "width": "64px", "height": "64px", "cssClass": "rounded-full border object-cover", } ) body_children: list[dict[str, Any]] = [ {"type": "H3", "content": name}, {"type": "Muted", "content": subtitle}, ] if bio: body_children.append({"type": "Text", "content": bio}) if links: body_children.append( { "type": "Row", "gap": 2, "cssClass": "flex-wrap", "children": [ { "type": "Button", "label": "View profile" if index == 0 else label, "variant": "default" if index == 0 else "outline", "buttonType": "button", "onClick": {"action": "openLink", "url": url}, } for index, (label, url) in enumerate(links) ], } ) row_children.append({"type": "Column", "gap": 2, "children": body_children}) return { "type": "Card", "children": [ { "type": "CardContent", "cssClass": "p-6", "children": [{"type": "Row", "gap": 4, "align": "center", "children": row_children}], } ], } def _prefers_wide_layout(value: Any) -> bool: if isinstance(value, list): return bool(value) and all(isinstance(item, dict) for item in value) if isinstance(value, dict): items = value.get("items") if isinstance(items, list) and items and all(isinstance(item, dict) for item in items): return True results = value.get("results") if isinstance(results, list) and results and all(isinstance(item, dict) for item in results): return True return False def _build_table_card( title: str, rows: list[dict[str, Any]], *, description: str | None = None, empty_message: str = "No rows returned.", ) -> dict[str, Any]: if not rows: return { "type": "Card", "children": [ { "type": "CardHeader", "children": [ {"type": "CardTitle", "content": title}, *( [{"type": "CardDescription", "content": description}] if description else [] ), ], }, { "type": "CardContent", "children": [ { "type": "Alert", "variant": "default", "children": [ {"type": "AlertTitle", "content": title}, {"type": "AlertDescription", "content": empty_message}, ], } ], }, ], } normalized_rows = [_normalize_row(row) for row in rows] all_keys = {key for row in normalized_rows for key in row} row_click = _build_row_click(rows) visible_keys = sorted(all_keys, key=_column_rank) if row_click is not None: non_url_keys = [key for key in visible_keys if key not in _URL_KEYS] if non_url_keys: visible_keys = non_url_keys visible_keys = visible_keys[:8] columns: list[dict[str, Any]] = [] for key in visible_keys: column: dict[str, Any] = { "key": key, "header": _titleize(key), "sortable": key not in {"description"}, } if _should_make_filterable(key, normalized_rows): column["filterable"] = True if any(isinstance(row.get(key), (int, float)) for row in normalized_rows): column["headerClass"] = "text-right" column["cellClass"] = "text-right" column["format"] = "number" if key in {"description"}: column["maxWidth"] = "28rem" column["sortable"] = False columns.append(column) data_table: dict[str, Any] = { "type": "DataTable", "columns": columns, "rows": normalized_rows, "search": len(normalized_rows) > 8, "paginated": len(normalized_rows) > 10, "pageSize": 10, } if row_click is not None: data_table["onRowClick"] = row_click return { "type": "Card", "children": [ { "type": "CardHeader", "children": [ {"type": "CardTitle", "content": title}, *( [{"type": "CardDescription", "content": description}] if description else [] ), ], }, { "type": "CardContent", "children": [data_table], }, ], } def _build_key_value_card( title: str, values: dict[str, Any], *, description: str | None = None, ) -> dict[str, Any]: rows = [ {"field": _titleize(key), "value": _normalize_cell(value, key=str(key))} for key, value in values.items() ] return _build_table_card( title, rows, description=description, empty_message="No fields available.", ) def _select_metric_keys(values: dict[str, Any]) -> list[str]: numeric_or_bool = [ key for key, value in values.items() if isinstance(value, (int, float, bool)) and not isinstance(value, str) ] prioritized = sorted(numeric_or_bool, key=_metric_rank) return prioritized[:4] def _build_metric_grid(title: str, values: dict[str, Any]) -> dict[str, Any] | None: metric_keys = _select_metric_keys(values) if not metric_keys: return None tiles = [ { "type": "Card", "children": [ { "type": "CardContent", "cssClass": "p-6", "children": [ { "type": "Metric", "label": _titleize(key), "value": values[key], } ], } ], } for key in metric_keys ] return { "type": "Card", "children": [ { "type": "CardHeader", "children": [{"type": "CardTitle", "content": title}], }, { "type": "CardContent", "children": [ { "type": "Grid", "gap": 4, "minColumnWidth": "12rem", "children": tiles, } ], }, ], } def _looks_like_helper_envelope(value: Any) -> bool: return isinstance(value, dict) and {"ok", "items", "item", "meta", "error"}.issubset(value.keys()) def build_helper_sections(meta: dict[str, Any] | None) -> list[dict[str, Any]]: if not isinstance(meta, dict) or not meta: return [] sections: list[dict[str, Any]] = [] warning_bits: list[str] = [] if meta.get("error"): warning_bits.append(str(meta["error"])) if meta.get("truncated"): truncated_by = str(meta.get("truncated_by") or "limit").replace("_", " ") warning_bits.append(f"Helper output was truncated by {truncated_by}.") if meta.get("lower_bound"): warning_bits.append("Counts may be lower bounds rather than exact totals.") if meta.get("more_available") is True: warning_bits.append("More matching rows are available than were returned.") if meta.get("more_available") == "unknown": warning_bits.append("More matching rows may be available.") if meta.get("next_request_hint"): warning_bits.append(str(meta["next_request_hint"])) if warning_bits: sections.append( { "type": "Alert", "variant": "warning", "icon": "circle-alert", "children": [ {"type": "AlertTitle", "content": "Helper coverage notes"}, {"type": "AlertDescription", "content": " ".join(warning_bits[:3])}, ], } ) badges: list[dict[str, Any]] = [{"type": "Small", "content": "Helper metadata"}] for key in ("returned", "matched", "total", "scanned"): value = meta.get(key) if value is not None: badges.append( { "type": "Badge", "label": f"{key}: {value}", "variant": "outline", } ) source = meta.get("source") if isinstance(source, str) and source: badges.append({"type": "Muted", "content": source}) if len(badges) > 1: sections.append( { "type": "Row", "gap": 2, "align": "center", "cssClass": "flex-wrap", "children": badges, } ) return sections def _render_list( title: str, value: list[Any], *, description: str | None = None, depth: int = 0, ) -> list[dict[str, Any]]: if not value: return [ { "type": "Alert", "variant": "default", "children": [ {"type": "AlertTitle", "content": title}, {"type": "AlertDescription", "content": "No results returned."}, ], } ] if all(isinstance(item, dict) for item in value): rows = [item for item in value if isinstance(item, dict)] table_card = _build_table_card(title, rows, description=description) distribution_fields = _select_distribution_fields(rows) if distribution_fields is None: return [table_card] label_key, count_key = distribution_fields return [ { "type": "Column", "gap": 4, "children": [ _build_distribution_card( title, rows, label_key=label_key, count_key=count_key, ), table_card, ], } ] rows = [ { "index": index + 1, "value": _normalize_cell(item, key="value"), } for index, item in enumerate(value) ] return [_build_table_card(title, rows, description=description)] def _render_dict( title: str, value: dict[str, Any], *, description: str | None = None, depth: int = 0, ) -> list[dict[str, Any]]: if depth > 2: return [_build_key_value_card(title, value, description=description)] if depth <= 1 and _looks_like_user_profile(value): sections: list[dict[str, Any]] = [] user_card = _build_user_profile_card(title, value) if user_card is not None: sections.append(user_card) remaining = { key: item for key, item in value.items() if key not in { *_USER_NAME_KEYS, *_USER_AVATAR_KEYS, "bio", "description", "headline", "hf_url", "profile_url", "website_url", "blog_url", "github_url", "twitter_url", "github", "twitter", } } if remaining: sections.extend(_render_dict(title, remaining, description=description, depth=depth + 1)) return sections if "results" in value or "coverage" in value: sections: list[dict[str, Any]] = [] results = value.get("results") coverage = value.get("coverage") if results is not None: sections.extend(_render_value("Results", results, depth=depth + 1)) if isinstance(coverage, dict): sections.append(_build_key_value_card("Coverage", coverage)) remaining = { key: item for key, item in value.items() if key not in {"results", "coverage"} } if remaining: sections.extend(_render_dict(title, remaining, description=description, depth=depth + 1)) return sections scalar_items = {key: item for key, item in value.items() if _is_scalar(item)} nested_items = {key: item for key, item in value.items() if key not in scalar_items} sections: list[dict[str, Any]] = [] metric_grid = _build_metric_grid(title, scalar_items) metric_keys = set(_select_metric_keys(scalar_items)) if metric_grid is not None: sections.append(metric_grid) remaining_scalars = { key: item for key, item in scalar_items.items() if key not in metric_keys } if remaining_scalars: sections.append(_build_key_value_card(title, remaining_scalars, description=description)) for key, item in nested_items.items(): sections.extend(_render_value(_titleize(key), item, depth=depth + 1)) if not sections: sections.append(_build_key_value_card(title, value, description=description)) return sections def _render_scalar( title: str, value: Any, *, description: str | None = None, ) -> list[dict[str, Any]]: return [ { "type": "Card", "children": [ { "type": "CardHeader", "children": [ {"type": "CardTitle", "content": title}, *( [{"type": "CardDescription", "content": description}] if description else [] ), ], }, { "type": "CardContent", "children": [{"type": "Text", "content": _compact_text(value, limit=500)}], }, ], } ] def _render_value( title: str, value: Any, *, description: str | None = None, depth: int = 0, ) -> list[dict[str, Any]]: if value is None: return _render_scalar(title, "No result returned.", description=description) if isinstance(value, dict): return _render_dict(title, value, description=description, depth=depth) if isinstance(value, list): return _render_list(title, value, description=description, depth=depth) return _render_scalar(title, value, description=description) def _result_count(value: Any) -> int | None: if isinstance(value, list): return len(value) if _looks_like_helper_envelope(value): items = value.get("items") if isinstance(items, list): return len(items) if isinstance(value, dict) and isinstance(value.get("results"), list): return len(value["results"]) return None def _build_summary_card( query: str, *, runtime_meta: dict[str, Any] | None, helper_meta: dict[str, Any] | None, result: Any, ) -> dict[str, Any]: badges: list[dict[str, Any]] = [] row_count = _result_count(result) if row_count is not None: badges.append({"type": "Badge", "label": f"{row_count} rows", "variant": "outline"}) if isinstance(helper_meta, dict): for key in ("total", "matched", "returned"): if helper_meta.get(key) is not None: badges.append( { "type": "Badge", "label": f"{key}: {helper_meta[key]}", "variant": "outline", } ) summary_children: list[dict[str, Any]] = [ { "type": "CardHeader", "children": [ {"type": "CardTitle", "content": "Hub search results"}, {"type": "CardDescription", "content": query}, ], } ] if badges: summary_children.append( { "type": "CardContent", "children": [ { "type": "Row", "gap": 2, "align": "center", "cssClass": "flex-wrap", "children": badges, } ], } ) if isinstance(runtime_meta, dict) and runtime_meta.get("elapsed_ms") is not None: summary_children.append( { "type": "CardFooter", "children": [ { "type": "Muted", "content": f'Runtime: {runtime_meta["elapsed_ms"]} ms', } ], } ) return {"type": "Card", "children": summary_children} def build_runtime_wire(query: str, payload: dict[str, Any]) -> dict[str, Any]: runtime_meta = payload.get("meta") if isinstance(payload.get("meta"), dict) else None if isinstance(runtime_meta, dict) and runtime_meta.get("ok") is False: return wrap_prefab_body( extract_prefab_body_parts(error_wire(str(runtime_meta.get("error") or "Hub query failed")))[0], meta=runtime_meta, ) result = payload.get("result") helper_meta: dict[str, Any] | None = None body_children: list[dict[str, Any]] = [] if isinstance(result, dict) and _looks_like_helper_envelope(result): helper_meta = result.get("meta") if isinstance(result.get("meta"), dict) else None if result.get("ok") is False: message = str(result.get("error") or "Helper query failed") body_children.append( { "type": "Alert", "variant": "destructive", "icon": "circle-x", "children": [ {"type": "AlertTitle", "content": "Hub helper error"}, {"type": "AlertDescription", "content": message}, ], } ) else: helper_result: Any items = result.get("items") item = result.get("item") if isinstance(items, list): helper_result = items elif item is not None: helper_result = item else: helper_result = result body_children.extend(_render_value("Results", helper_result)) else: body_children.extend(_render_value("Results", result)) page_css_class = WIDE_PAGE_CSS_CLASS if _prefers_wide_layout(result) else PAGE_CSS_CLASS body_view = { "type": "Column", "gap": 6, "cssClass": page_css_class, "children": [ _build_summary_card( query, runtime_meta=runtime_meta, helper_meta=helper_meta, result=result, ), *build_helper_sections(helper_meta), *body_children, ], } return wrap_prefab_body(body_view, meta=runtime_meta) def parse_direct_wire(text: str) -> dict[str, Any]: return coerce_wire(parse_jsonish_value(text)) def parse_passthrough_wire(text: str) -> dict[str, Any]: payload = parse_runtime_payload(text) runtime_meta = payload.get("meta") raw_wire = payload.get("result") if raw_wire is None and isinstance(runtime_meta, dict): error = runtime_meta.get("error") if error: body_view, state, defs, theme = extract_prefab_body_parts(error_wire(str(error))) return wrap_prefab_body( body_view, meta=runtime_meta, state=state, defs=defs, theme=theme, ) if isinstance(raw_wire, (dict, list)): try: body_view, state, defs, theme = extract_prefab_body_parts(raw_wire) except ValueError: return build_runtime_wire("Hub query", payload) return wrap_prefab_body( body_view, meta=runtime_meta if isinstance(runtime_meta, dict) else None, state=state, defs=defs, theme=theme, ) return build_runtime_wire("Hub query", payload)