| """
|
| MangoMAS — Multi-Agent Cognitive Architecture
|
| ==============================================
|
|
|
| Interactive HuggingFace Space showcasing:
|
| - 10 Cognitive Cells with NN heads
|
| - MCTS Planning with policy/value networks
|
| - 7M-param MoE Neural Router
|
| - Multi-agent orchestration
|
|
|
| Author: MangoMAS Engineering (Ian Shanker)
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import hashlib
|
| import json
|
| import math
|
| import random
|
| import time
|
| import uuid
|
| from dataclasses import dataclass
|
| from typing import Any
|
|
|
| import gradio as gr
|
| import numpy as np
|
| import plotly.graph_objects as go
|
|
|
|
|
|
|
|
|
| try:
|
| import torch
|
| import torch.nn as nn
|
| import torch.nn.functional as F
|
|
|
| _TORCH = True
|
| except ImportError:
|
| _TORCH = False
|
|
|
|
|
|
|
|
|
|
|
|
|
| def featurize64(text: str) -> list[float]:
|
| """
|
| Extract a deterministic 64-dimensional feature vector from text.
|
|
|
| Combines:
|
| - 32 hash-based sinusoidal features (content fingerprint)
|
| - 16 domain-tag signals (code, security, architecture, data, etc.)
|
| - 8 structural signals (length, punctuation, questions, etc.)
|
| - 4 sentiment polarity estimates
|
| - 4 novelty/complexity scores
|
| """
|
| features: list[float] = []
|
|
|
|
|
| h = hashlib.sha256(text.encode()).hexdigest()
|
| for i in range(32):
|
| byte_val = int(h[i * 2 : i * 2 + 2], 16) / 255.0
|
| features.append(math.sin(byte_val * math.pi * (i + 1)))
|
|
|
|
|
| lower = text.lower()
|
| domain_tags = [
|
| "code", "function", "class", "api", "security", "threat",
|
| "architecture", "design", "data", "database", "test", "deploy",
|
| "optimize", "performance", "research", "analyze",
|
| ]
|
| for tag in domain_tags:
|
| features.append(1.0 if tag in lower else 0.0)
|
|
|
|
|
| features.append(min(len(text) / 500.0, 1.0))
|
| features.append(text.count(".") / max(len(text), 1) * 10)
|
| features.append(text.count("?") / max(len(text), 1) * 10)
|
| features.append(text.count("!") / max(len(text), 1) * 10)
|
| features.append(text.count(",") / max(len(text), 1) * 10)
|
| features.append(len(text.split()) / 100.0)
|
| features.append(1.0 if any(c.isupper() for c in text) else 0.0)
|
| features.append(sum(1 for c in text if c.isdigit()) / max(len(text), 1))
|
|
|
|
|
| pos_words = ["good", "great", "excellent", "improve", "best", "optimize"]
|
| neg_words = ["bad", "fail", "error", "bug", "crash", "threat"]
|
| features.append(sum(1 for w in pos_words if w in lower) / len(pos_words))
|
| features.append(sum(1 for w in neg_words if w in lower) / len(neg_words))
|
| features.append(0.5)
|
| features.append(abs(features[-3] - features[-2]))
|
|
|
|
|
| unique_words = len(set(text.lower().split()))
|
| total_words = max(len(text.split()), 1)
|
| features.append(unique_words / total_words)
|
| features.append(min(len(text.split("\n")) / 10.0, 1.0))
|
| features.append(text.count("(") / max(len(text), 1) * 20)
|
| features.append(min(max(len(w) for w in text.split()) / 20.0, 1.0) if text.strip() else 0.0)
|
|
|
|
|
| norm = math.sqrt(sum(f * f for f in features)) + 1e-8
|
| return [f / norm for f in features[:64]]
|
|
|
|
|
| def plot_features(features: list[float], title: str = "64-D Feature Vector") -> go.Figure:
|
| """Create a plotly bar chart of the 64-dim feature vector."""
|
| labels = (
|
| [f"hash_{i}" for i in range(32)]
|
| + [f"tag_{t}" for t in [
|
| "code", "func", "class", "api", "sec", "threat",
|
| "arch", "design", "data", "db", "test", "deploy",
|
| "opt", "perf", "research", "analyze",
|
| ]]
|
| + [f"struct_{i}" for i in range(8)]
|
| + [f"sent_{i}" for i in range(4)]
|
| + [f"novel_{i}" for i in range(4)]
|
| )
|
| colors = (
|
| ["#FF6B6B"] * 32
|
| + ["#4ECDC4"] * 16
|
| + ["#45B7D1"] * 8
|
| + ["#96CEB4"] * 4
|
| + ["#FFEAA7"] * 4
|
| )
|
| fig = go.Figure(
|
| data=[go.Bar(x=labels, y=features, marker_color=colors)],
|
| layout=go.Layout(
|
| title=title,
|
| xaxis=dict(title="Feature Dimension", tickangle=-45, tickfont=dict(size=7)),
|
| yaxis=dict(title="Value"),
|
| height=350,
|
| template="plotly_dark",
|
| margin=dict(b=120),
|
| ),
|
| )
|
| return fig
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| class ExpertTower(nn.Module if _TORCH else object):
|
| """Single expert tower: 64 → 512 → 512 → 256."""
|
|
|
| def __init__(self, d_in: int = 64, h1: int = 512, h2: int = 512, d_out: int = 256):
|
| super().__init__()
|
| self.fc1 = nn.Linear(d_in, h1)
|
| self.fc2 = nn.Linear(h1, h2)
|
| self.fc3 = nn.Linear(h2, d_out)
|
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| return self.fc3(F.relu(self.fc2(F.relu(self.fc1(x)))))
|
|
|
|
|
| class MixtureOfExperts7M(nn.Module if _TORCH else object):
|
| """
|
| ~7M parameter Mixture-of-Experts model.
|
|
|
| Architecture:
|
| - Gating network: 64 → 512 → N_experts (softmax)
|
| - Expert towers (×N): 64 → 512 → 512 → 256
|
| - Classifier head: 256 → N_classes
|
| """
|
|
|
| def __init__(self, num_classes: int = 10, num_experts: int = 16):
|
| super().__init__()
|
| self.num_experts = num_experts
|
|
|
|
|
| self.gate_fc1 = nn.Linear(64, 512)
|
| self.gate_fc2 = nn.Linear(512, num_experts)
|
|
|
|
|
| self.experts = nn.ModuleList([ExpertTower() for _ in range(num_experts)])
|
|
|
|
|
| self.classifier = nn.Linear(256, num_classes)
|
|
|
| @property
|
| def parameter_count(self) -> int:
|
| return sum(p.numel() for p in self.parameters())
|
|
|
| def forward(self, x64: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
|
|
|
| gate = F.relu(self.gate_fc1(x64))
|
| gate_weights = torch.softmax(self.gate_fc2(gate), dim=-1)
|
|
|
|
|
| expert_outs = torch.stack([e(x64) for e in self.experts], dim=1)
|
|
|
|
|
| agg = torch.sum(expert_outs * gate_weights.unsqueeze(-1), dim=1)
|
|
|
|
|
| logits = self.classifier(agg)
|
| return logits, gate_weights
|
|
|
|
|
| class RouterNet(nn.Module if _TORCH else object):
|
| """
|
| Neural routing gate MLP: 64 → 128 → 64 → N_experts.
|
|
|
| Used for fast (~0.8ms) expert selection.
|
| """
|
|
|
| EXPERTS = [
|
| "code_expert", "test_expert", "design_expert", "research_expert",
|
| "architecture_expert", "security_expert", "performance_expert",
|
| "documentation_expert",
|
| ]
|
|
|
| def __init__(self, d_in: int = 64, d_h: int = 128, n_out: int = 8):
|
| super().__init__()
|
| self.net = nn.Sequential(
|
| nn.Linear(d_in, d_h),
|
| nn.ReLU(),
|
| nn.Dropout(0.1),
|
| nn.Linear(d_h, d_h // 2),
|
| nn.ReLU(),
|
| nn.Linear(d_h // 2, n_out),
|
| )
|
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| return torch.softmax(self.net(x), dim=-1)
|
|
|
|
|
| class PolicyNetwork(nn.Module if _TORCH else object):
|
| """MCTS policy network: 128 → 256 → 128 → N_actions."""
|
|
|
| def __init__(self, d_in: int = 128, n_actions: int = 32):
|
| super().__init__()
|
| self.net = nn.Sequential(
|
| nn.Linear(d_in, 256), nn.ReLU(),
|
| nn.Linear(256, 128), nn.ReLU(),
|
| nn.Linear(128, n_actions), nn.Softmax(dim=-1),
|
| )
|
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| return self.net(x)
|
|
|
|
|
| class ValueNetwork(nn.Module if _TORCH else object):
|
| """MCTS value network: 192 → 256 → 64 → 1 (tanh)."""
|
|
|
| def __init__(self, d_in: int = 192):
|
| super().__init__()
|
| self.net = nn.Sequential(
|
| nn.Linear(d_in, 256), nn.ReLU(),
|
| nn.Linear(256, 64), nn.ReLU(),
|
| nn.Linear(64, 1), nn.Tanh(),
|
| )
|
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| return self.net(x)
|
|
|
|
|
|
|
|
|
|
|
|
|
| CELL_TYPES = {
|
| "reasoning": {
|
| "name": "ReasoningCell",
|
| "description": "Structured reasoning with Rule or NN heads",
|
| "heads": ["rule", "nn"],
|
| },
|
| "memory": {
|
| "name": "MemoryCell",
|
| "description": "Privacy-preserving preference extraction",
|
| "heads": ["preference_extractor"],
|
| },
|
| "causal": {
|
| "name": "CausalCell",
|
| "description": "Pearl's do-calculus for causal inference",
|
| "heads": ["do_calculus"],
|
| },
|
| "ethics": {
|
| "name": "EthicsCell",
|
| "description": "Safety classification and PII detection",
|
| "heads": ["classifier", "pii_scanner"],
|
| },
|
| "empathy": {
|
| "name": "EmpathyCell",
|
| "description": "Emotional tone detection and empathetic responses",
|
| "heads": ["tone_detector"],
|
| },
|
| "curiosity": {
|
| "name": "CuriosityCell",
|
| "description": "Epistemic curiosity and hypothesis generation",
|
| "heads": ["hypothesis_generator"],
|
| },
|
| "figliteral": {
|
| "name": "FigLiteralCell",
|
| "description": "Figurative vs literal language classification",
|
| "heads": ["classifier"],
|
| },
|
| "r2p": {
|
| "name": "R2PCell",
|
| "description": "Requirements-to-Plan structured decomposition",
|
| "heads": ["planner"],
|
| },
|
| "telemetry": {
|
| "name": "TelemetryCell",
|
| "description": "Telemetry event capture and structuring",
|
| "heads": ["collector"],
|
| },
|
| "aggregator": {
|
| "name": "AggregatorCell",
|
| "description": "Multi-expert output aggregation",
|
| "heads": ["weighted_average", "max_confidence", "ensemble"],
|
| },
|
| }
|
|
|
|
|
| def execute_cell(cell_type: str, text: str, config_json: str = "{}") -> dict[str, Any]:
|
| """Execute a cognitive cell and return structured results."""
|
| start = time.monotonic()
|
|
|
|
|
| if not text or not text.strip():
|
| return {
|
| "cell_type": cell_type,
|
| "status": "error",
|
| "message": "Input text is required. Please provide some text to process.",
|
| "elapsed_ms": 0.0,
|
| }
|
|
|
|
|
| try:
|
| config = json.loads(config_json) if config_json.strip() else {}
|
| except json.JSONDecodeError as e:
|
| return {
|
| "cell_type": cell_type,
|
| "status": "error",
|
| "message": f"Invalid JSON config: {e}",
|
| "elapsed_ms": 0.0,
|
| }
|
|
|
| request_id = f"req-{uuid.uuid4().hex[:12]}"
|
|
|
|
|
| result: dict[str, Any] = {
|
| "cell_type": cell_type,
|
| "request_id": request_id,
|
| "status": "ok",
|
| }
|
|
|
| if cell_type == "reasoning":
|
| head = config.get("head_type", "rule")
|
| words = text.split()
|
| sections = []
|
| chunk_size = max(len(words) // 3, 1)
|
| for i in range(0, len(words), chunk_size):
|
| chunk = " ".join(words[i : i + chunk_size])
|
| sections.append({
|
| "text": chunk,
|
| "confidence": round(random.uniform(0.7, 0.99), 3),
|
| "boundary_type": random.choice(["topic_shift", "elaboration", "conclusion"]),
|
| })
|
| result["head_type"] = head
|
| result["sections"] = sections
|
| result["section_count"] = len(sections)
|
|
|
| elif cell_type == "memory":
|
|
|
| preferences = []
|
| if "prefer" in text.lower() or "like" in text.lower():
|
| preferences.append({
|
| "type": "explicit",
|
| "value": text,
|
| "confidence": 0.95,
|
| })
|
| if "always" in text.lower() or "usually" in text.lower():
|
| preferences.append({
|
| "type": "implicit",
|
| "value": text,
|
| "confidence": 0.72,
|
| })
|
| result["preferences"] = preferences
|
| result["opt_out"] = "don't remember" in text.lower()
|
| result["consent_status"] = "granted"
|
|
|
| elif cell_type == "causal":
|
|
|
| result["mode"] = config.get("mode", "do_calculus")
|
| result["variables"] = [w for w in text.split() if len(w) > 3][:5]
|
| result["causal_effect"] = round(random.uniform(-0.5, 0.8), 3)
|
| result["confidence_interval"] = [
|
| round(result["causal_effect"] - 0.15, 3),
|
| round(result["causal_effect"] + 0.15, 3),
|
| ]
|
|
|
| elif cell_type == "ethics":
|
|
|
| import re
|
| pii_patterns = {
|
| "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
|
| "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
|
| "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
|
| }
|
| pii_found = []
|
| redacted = text
|
| for pii_type, pattern in pii_patterns.items():
|
| matches = re.findall(pattern, text)
|
| for m in matches:
|
| pii_found.append({"type": pii_type, "value": "[REDACTED]"})
|
| redacted = redacted.replace(m, "[REDACTED]")
|
|
|
| result["is_safe"] = len(pii_found) == 0
|
| result["pii_detected"] = pii_found
|
| result["redacted_text"] = redacted
|
| result["risk_score"] = round(random.uniform(0.0, 0.3) if not pii_found else random.uniform(0.6, 0.9), 3)
|
|
|
| elif cell_type == "empathy":
|
|
|
| lower = text.lower()
|
| emotion_keywords: dict[str, list[str]] = {
|
| "frustration": ["frustrat", "annoy", "angry", "upset", "fail", "broken", "stuck", "overwhelm"],
|
| "anxiety": ["worry", "anxious", "nervous", "afraid", "fear", "concern", "stress", "uncertain"],
|
| "excitement": ["excit", "amazing", "awesome", "great", "love", "fantastic", "thrilled", "happy"],
|
| "satisfaction": ["satisfied", "pleased", "good", "well", "success", "accomplish", "done", "complete"],
|
| "confusion": ["confus", "unclear", "don't understand", "what does", "how does", "lost", "puzzle"],
|
| }
|
| emotion_scores: dict[str, int] = {}
|
| for emotion, keywords in emotion_keywords.items():
|
| emotion_scores[emotion] = sum(1 for kw in keywords if kw in lower)
|
| best_emotion = max(emotion_scores, key=lambda e: emotion_scores[e])
|
| detected = best_emotion if emotion_scores[best_emotion] > 0 else "neutral"
|
| confidence = min(0.95, 0.6 + emotion_scores.get(detected, 0) * 0.1)
|
| responses = {
|
| "neutral": "I understand your message. How can I help further?",
|
| "frustration": "I can see this is frustrating. Let me help resolve this.",
|
| "excitement": "That's great news! Let's build on that momentum.",
|
| "confusion": "Let me clarify that for you step by step.",
|
| "satisfaction": "Glad to hear things are going well!",
|
| "anxiety": "I understand your concern. Let's work through this together.",
|
| }
|
| result["detected_emotion"] = detected
|
| result["confidence"] = round(confidence, 3)
|
| result["empathetic_response"] = responses[detected]
|
|
|
| elif cell_type == "curiosity":
|
|
|
| words = [w for w in text.split() if len(w) > 3]
|
| topics = list(dict.fromkeys(words[:5]))
|
| topic_str = ", ".join(topics[:3]) if topics else "this topic"
|
| questions = [
|
| f"What are the underlying assumptions behind {topic_str}?",
|
| f"How would the outcome differ if we changed the approach to {topics[0] if topics else 'this'}?",
|
| f"What related problems have been solved in adjacent domains?",
|
| f"What are the second-order effects of {topics[1] if len(topics) > 1 else 'this decision'}?",
|
| f"What evidence would disprove our current hypothesis about {topic_str}?",
|
| ]
|
| max_q = config.get("max_questions", 3)
|
| result["questions"] = questions[:max_q]
|
|
|
| unique_ratio = len(set(text.lower().split())) / max(len(text.split()), 1)
|
| result["novelty_score"] = round(min(unique_ratio + 0.3, 0.95), 3)
|
|
|
| elif cell_type == "figliteral":
|
|
|
| figurative_map: dict[str, str] = {
|
| "raining cats and dogs": "raining very heavily",
|
| "piece of cake": "something very easy to do",
|
| "break a leg": "good luck; perform well",
|
| "time flies": "time appears to pass quickly",
|
| "hit the nail on the head": "to be exactly right",
|
| "spill the beans": "to reveal a secret",
|
| "under the weather": "feeling sick or unwell",
|
| "bite the bullet": "to endure a painful situation with courage",
|
| }
|
| figurative_markers = ["like a", "as if"] + list(figurative_map.keys())
|
| is_figurative = any(m in text.lower() for m in figurative_markers)
|
| result["classification"] = "figurative" if is_figurative else "literal"
|
| result["confidence"] = round(0.9 if is_figurative else 0.85, 3)
|
| if is_figurative:
|
|
|
| literal_parts = []
|
| remaining = text
|
| for idiom, meaning in figurative_map.items():
|
| if idiom in text.lower():
|
| literal_parts.append(f"'{idiom}' = {meaning}")
|
| if not literal_parts and ("like a" in text.lower() or "as if" in text.lower()):
|
| literal_parts.append("Contains simile/metaphor — direct comparison without figurative intent")
|
| result["literal_interpretation"] = "; ".join(literal_parts) if literal_parts else "No specific idiom decomposition available"
|
| result["figurative_elements"] = literal_parts
|
|
|
| elif cell_type == "r2p":
|
| steps = [
|
| {"step": 1, "action": "Analyze requirements", "estimated_effort": "2h"},
|
| {"step": 2, "action": "Design solution architecture", "estimated_effort": "4h"},
|
| {"step": 3, "action": "Implement core logic", "estimated_effort": "8h"},
|
| {"step": 4, "action": "Write tests", "estimated_effort": "4h"},
|
| {"step": 5, "action": "Deploy and validate", "estimated_effort": "2h"},
|
| ]
|
| result["plan"] = steps
|
| result["total_effort"] = "20h"
|
| result["success_criteria"] = ["All tests pass", "Performance targets met", "Code reviewed"]
|
|
|
| elif cell_type == "telemetry":
|
|
|
| import re as _re
|
| result["event_recorded"] = True
|
| result["trace_id"] = f"trace-{uuid.uuid4().hex[:8]}"
|
| result["timestamp"] = time.time()
|
|
|
| lower = text.lower()
|
| attrs: dict[str, Any] = {"source": "cognitive_cell", "cell_type": cell_type}
|
|
|
| action_map = {"click": "click", "submit": "submit", "scroll": "scroll",
|
| "navigate": "navigate", "hover": "hover", "type": "input",
|
| "select": "select", "drag": "drag", "drop": "drop", "open": "open"}
|
| for verb, action in action_map.items():
|
| if verb in lower:
|
| attrs["action"] = action
|
| break
|
|
|
| dur_match = _re.search(r"(\d+)\s*(?:second|sec|ms|minute|min)", lower)
|
| if dur_match:
|
| attrs["duration_value"] = int(dur_match.group(1))
|
| unit = dur_match.group(0).replace(dur_match.group(1), "").strip()
|
| attrs["duration_unit"] = unit
|
|
|
| page_match = _re.search(r"(?:on|at|in)\s+(?:the\s+)?(\w+)\s+page", lower)
|
| if page_match:
|
| attrs["page"] = page_match.group(1)
|
| elem_match = _re.search(r"(?:click|clicked|press|pressed|hit)\s+(?:the\s+)?(\w+)", lower)
|
| if elem_match:
|
| attrs["element"] = elem_match.group(1)
|
| result["metadata"] = attrs
|
| result["parsed_attributes"] = {k: v for k, v in attrs.items() if k not in ("source", "cell_type")}
|
|
|
| elif cell_type == "aggregator":
|
|
|
| strategy = config.get("strategy", "weighted_average")
|
| sub_cells = config.get("sub_cells", ["reasoning", "ethics", "causal"])
|
| sub_results = []
|
| for sc in sub_cells:
|
| if sc in CELL_TYPES and sc != "aggregator":
|
| sr = execute_cell(sc, text)
|
| sub_results.append({
|
| "cell": sc,
|
| "status": sr.get("status", "ok"),
|
| "confidence": sr.get("confidence", sr.get("risk_score", 0.8)),
|
| "elapsed_ms": sr.get("elapsed_ms", 0),
|
| })
|
|
|
| if sub_results:
|
| confidences = [r["confidence"] for r in sub_results if isinstance(r["confidence"], (int, float))]
|
| if strategy == "max_confidence":
|
| agg_confidence = max(confidences) if confidences else 0.0
|
| elif strategy == "ensemble":
|
| agg_confidence = sum(confidences) / len(confidences) if confidences else 0.0
|
| else:
|
| weights = [1.0 / (i + 1) for i in range(len(confidences))]
|
| w_sum = sum(weights)
|
| agg_confidence = sum(c * w for c, w in zip(confidences, weights)) / w_sum if w_sum else 0.0
|
| else:
|
| agg_confidence = 0.0
|
| result["strategy"] = strategy
|
| result["sub_cell_results"] = sub_results
|
| result["cells_aggregated"] = len(sub_results)
|
| result["aggregated_output"] = f"Aggregated {len(sub_results)} cells via {strategy}"
|
| result["confidence"] = round(agg_confidence, 3)
|
|
|
| elapsed = (time.monotonic() - start) * 1000
|
| result["elapsed_ms"] = round(elapsed, 2)
|
| return result
|
|
|
|
|
| def compose_cells(pipeline_str: str, text: str) -> dict[str, Any]:
|
| """Execute a pipeline of cells sequentially."""
|
| cell_types = [c.strip() for c in pipeline_str.split(",") if c.strip()]
|
| if not cell_types:
|
| return {"error": "No cell types specified"}
|
|
|
| activations = []
|
| context: dict[str, Any] = {}
|
| final_output: dict[str, Any] = {}
|
|
|
| for ct in cell_types:
|
| if ct not in CELL_TYPES:
|
| activations.append({"cell_type": ct, "status": "error", "message": f"Unknown cell type: {ct}"})
|
| continue
|
| result = execute_cell(ct, text)
|
| activations.append({
|
| "cell_type": ct,
|
| "status": result.get("status", "ok"),
|
| "elapsed_ms": result.get("elapsed_ms", 0),
|
| })
|
| context.update({k: v for k, v in result.items() if k not in ("request_id", "elapsed_ms")})
|
| final_output = result
|
|
|
| return {
|
| "pipeline": cell_types,
|
| "activations": activations,
|
| "final_output": final_output,
|
| "total_cells": len(cell_types),
|
| "context_keys": list(context.keys()),
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
| TASK_CATEGORIES = {
|
| "architecture": ["service_split", "api_gateway", "data_layer", "security_layer", "caching"],
|
| "implementation": ["requirements", "design", "code", "test", "deploy"],
|
| "optimization": ["profile", "identify_bottleneck", "optimize", "validate", "benchmark"],
|
| "security": ["asset_inventory", "threat_enumeration", "risk_scoring", "mitigations", "audit"],
|
| "research": ["literature_review", "comparison", "synthesis", "recommendations", "publish"],
|
| }
|
|
|
|
|
| @dataclass
|
| class MCTSNode:
|
| """Node in the MCTS search tree."""
|
|
|
| id: str
|
| action: str
|
| visits: int = 0
|
| total_value: float = 0.0
|
| policy_prior: float = 0.0
|
| children: list["MCTSNode"] | None = None
|
|
|
| def ucb1_score(self, parent_visits: int, c: float = 1.414) -> float:
|
| if self.visits == 0:
|
| return float("inf")
|
| exploitation = self.total_value / self.visits
|
| exploration = c * math.sqrt(math.log(parent_visits) / self.visits)
|
| return exploitation + exploration
|
|
|
| def puct_score(self, parent_visits: int, c: float = 1.0) -> float:
|
| if self.visits == 0:
|
| return float("inf")
|
| exploitation = self.total_value / self.visits
|
| exploration = c * self.policy_prior * math.sqrt(parent_visits) / (1 + self.visits)
|
| return exploitation + exploration
|
|
|
| def to_dict(self, max_depth: int = 3) -> dict[str, Any]:
|
| d: dict[str, Any] = {
|
| "id": self.id,
|
| "action": self.action,
|
| "visits": self.visits,
|
| "value": round(self.total_value / max(self.visits, 1), 3),
|
| "policy_prior": round(self.policy_prior, 3),
|
| }
|
| if self.children and max_depth > 0:
|
| d["children"] = [
|
| c.to_dict(max_depth - 1)
|
| for c in sorted(self.children, key=lambda n: -n.visits)[:5]
|
| ]
|
| return d
|
|
|
|
|
| def run_mcts(
|
| task: str,
|
| max_simulations: int = 100,
|
| exploration_constant: float = 1.414,
|
| strategy: str = "ucb1",
|
| ) -> dict[str, Any]:
|
| """Run MCTS planning on a task and return the search tree."""
|
| start = time.monotonic()
|
|
|
|
|
| lower = task.lower()
|
| category = "implementation"
|
| for cat, keywords in {
|
| "architecture": ["architect", "design", "micro", "system"],
|
| "security": ["security", "threat", "vulnerability", "attack"],
|
| "optimization": ["optimize", "performance", "latency", "speed"],
|
| "research": ["research", "survey", "study", "analyze"],
|
| }.items():
|
| if any(k in lower for k in keywords):
|
| category = cat
|
| break
|
|
|
| actions = TASK_CATEGORIES[category]
|
|
|
|
|
| root = MCTSNode(id="root", action=task[:50], children=[])
|
|
|
|
|
| if _TORCH:
|
| policy_net = PolicyNetwork(d_in=128, n_actions=len(actions))
|
| value_net = ValueNetwork(d_in=192)
|
| policy_net.eval()
|
| value_net.eval()
|
|
|
| for sim in range(max_simulations):
|
|
|
| node = root
|
|
|
|
|
| if not node.children:
|
| node.children = []
|
| for i, act in enumerate(actions):
|
| prior = random.uniform(0.1, 0.5)
|
| if _TORCH:
|
| embed = torch.randn(1, 128)
|
| with torch.no_grad():
|
| priors = policy_net(embed)[0]
|
| prior = priors[i % len(priors)].item()
|
| node.children.append(
|
| MCTSNode(
|
| id=f"{act}-{sim}",
|
| action=act,
|
| policy_prior=prior,
|
| children=[],
|
| )
|
| )
|
|
|
|
|
| score_fn = (
|
| (lambda n: n.ucb1_score(root.visits + 1, exploration_constant))
|
| if strategy == "ucb1"
|
| else (lambda n: n.puct_score(root.visits + 1, exploration_constant))
|
| )
|
| best_child = max(node.children, key=score_fn)
|
|
|
|
|
| if _TORCH:
|
| state = torch.randn(1, 192)
|
| with torch.no_grad():
|
| value = value_net(state).item()
|
| else:
|
| value = random.uniform(0.3, 0.9)
|
|
|
|
|
| best_child.visits += 1
|
| best_child.total_value += value
|
| root.visits += 1
|
|
|
| elapsed = (time.monotonic() - start) * 1000
|
|
|
|
|
| if root.children:
|
| best = max(root.children, key=lambda n: n.visits)
|
| best_action = best.action
|
| best_value = round(best.total_value / max(best.visits, 1), 3)
|
| else:
|
| best_action = "none"
|
| best_value = 0.0
|
|
|
| return {
|
| "task": task,
|
| "category": category,
|
| "strategy": strategy,
|
| "best_action": best_action,
|
| "best_value": best_value,
|
| "total_simulations": max_simulations,
|
| "exploration_constant": exploration_constant,
|
| "tree": root.to_dict(max_depth=2),
|
| "all_actions": [
|
| {
|
| "action": c.action,
|
| "visits": c.visits,
|
| "value": round(c.total_value / max(c.visits, 1), 3),
|
| }
|
| for c in sorted(root.children or [], key=lambda n: -n.visits)
|
| ],
|
| "elapsed_ms": round(elapsed, 2),
|
| "nn_enabled": _TORCH,
|
| }
|
|
|
|
|
| def benchmark_strategies(task: str) -> dict[str, Any]:
|
| """Compare MCTS vs Greedy vs Random on the same task."""
|
|
|
| results = {}
|
|
|
|
|
| lower = task.lower()
|
| category = "implementation"
|
| for cat, keywords in {
|
| "architecture": ["architect", "design", "micro", "system"],
|
| "security": ["security", "threat", "vulnerability", "attack"],
|
| "optimization": ["optimize", "performance", "latency", "speed"],
|
| "research": ["research", "survey", "study", "analyze"],
|
| }.items():
|
| if any(k in lower for k in keywords):
|
| category = cat
|
| break
|
| actions = TASK_CATEGORIES[category]
|
|
|
|
|
| start = time.monotonic()
|
| r = run_mcts(task, max_simulations=100)
|
| elapsed_mcts = (time.monotonic() - start) * 1000
|
| results["mcts"] = {
|
| "quality_score": r["best_value"],
|
| "best_action": r["best_action"],
|
| "elapsed_ms": round(elapsed_mcts, 2),
|
| }
|
|
|
|
|
| start = time.monotonic()
|
| if _TORCH:
|
| policy_net = PolicyNetwork(d_in=128, n_actions=len(actions))
|
| policy_net.eval()
|
| torch.manual_seed(hash(task) % (2**31))
|
| embed = torch.randn(1, 128)
|
| with torch.no_grad():
|
| priors = policy_net(embed)[0].numpy()
|
| best_idx = int(np.argmax(priors))
|
| greedy_action = actions[best_idx]
|
| greedy_quality = float(priors[best_idx])
|
| else:
|
| greedy_quality = max(random.uniform(0.1, 0.3) for _ in actions)
|
| greedy_action = random.choice(actions)
|
| elapsed_greedy = (time.monotonic() - start) * 1000
|
| results["greedy"] = {
|
| "quality_score": round(greedy_quality, 3),
|
| "best_action": greedy_action,
|
| "elapsed_ms": round(elapsed_greedy, 2),
|
| }
|
|
|
|
|
| start = time.monotonic()
|
| random_action = random.choice(actions)
|
|
|
| if _TORCH:
|
| value_net = ValueNetwork(d_in=192)
|
| value_net.eval()
|
| torch.manual_seed(hash(task + random_action) % (2**31))
|
| state = torch.randn(1, 192)
|
| with torch.no_grad():
|
| random_quality = value_net(state).item()
|
| else:
|
| random_quality = random.uniform(-0.5, 0.5)
|
| elapsed_random = (time.monotonic() - start) * 1000
|
| results["random"] = {
|
| "quality_score": round(random_quality, 3),
|
| "best_action": random_action,
|
| "elapsed_ms": round(elapsed_random, 2),
|
| }
|
|
|
| return {"task": task, "category": category, "results": results}
|
|
|
|
|
| def plot_mcts_tree(tree_data: dict) -> go.Figure:
|
| """Create a sunburst visualization of the MCTS tree."""
|
| ids, labels, parents, values, colors = [], [], [], [], []
|
|
|
| def _walk(node: dict, parent_id: str = "") -> None:
|
| nid = node["id"]
|
| ids.append(nid)
|
| labels.append(f"{node['action']}\n(v={node.get('value', 0)}, n={node.get('visits', 0)})")
|
| parents.append(parent_id)
|
| values.append(max(node.get("visits", 1), 1))
|
| colors.append(node.get("value", 0))
|
| for child in node.get("children", []):
|
| _walk(child, nid)
|
|
|
| _walk(tree_data)
|
|
|
| fig = go.Figure(go.Sunburst(
|
| ids=ids, labels=labels, parents=parents, values=values,
|
| marker=dict(colors=colors, colorscale="Viridis", showscale=True),
|
| branchvalues="total",
|
| ))
|
| fig.update_layout(
|
| title="MCTS Search Tree",
|
| height=500,
|
| template="plotly_dark",
|
| margin=dict(t=40, l=0, r=0, b=0),
|
| )
|
| return fig
|
|
|
|
|
|
|
|
|
|
|
|
|
| EXPERT_NAMES = [
|
| "Code Expert", "Test Expert", "Design Expert", "Research Expert",
|
| "Architecture Expert", "Security Expert", "Performance Expert", "Docs Expert",
|
| ]
|
|
|
|
|
| _ROUTER_SEED = 42
|
| _router_net_singleton: "RouterNet | None" = None
|
|
|
|
|
| def _get_router() -> "RouterNet":
|
| """Get or create the singleton RouterNet with a fixed seed."""
|
| global _router_net_singleton
|
| if _router_net_singleton is None and _TORCH:
|
| torch.manual_seed(_ROUTER_SEED)
|
| _router_net_singleton = RouterNet(d_in=64, n_out=len(EXPERT_NAMES))
|
| _router_net_singleton.eval()
|
| return _router_net_singleton
|
|
|
|
|
| def route_task(task: str, top_k: int = 3) -> dict[str, Any]:
|
| """Route a task through the neural MoE gate."""
|
| start = time.monotonic()
|
|
|
| features = featurize64(task)
|
| feature_tensor = None
|
|
|
| if _TORCH:
|
|
|
| router = _get_router()
|
| feature_tensor = torch.tensor([features], dtype=torch.float32)
|
| with torch.no_grad():
|
| weights = router(feature_tensor)[0].numpy()
|
| else:
|
|
|
| weights = np.array([abs(f) for f in features[:len(EXPERT_NAMES)]])
|
| weights = weights / (weights.sum() + 1e-8)
|
|
|
|
|
| lower_task = task.lower()
|
| expert_keywords: dict[int, list[str]] = {
|
| 0: ["code", "implement", "function", "class", "program", "script", "module"],
|
| 1: ["test", "unit test", "coverage", "qa", "assert", "mock", "fixture"],
|
| 2: ["design", "ui", "ux", "layout", "wireframe", "mockup", "style"],
|
| 3: ["research", "analyze", "study", "survey", "literature", "paper", "compare"],
|
| 4: ["architect", "system", "microservice", "scale", "pattern", "infrastructure"],
|
| 5: ["security", "auth", "encrypt", "threat", "vulnerab", "owasp", "pci", "compliance"],
|
| 6: ["performance", "optimize", "latency", "throughput", "cache", "speed", "fast"],
|
| 7: ["document", "readme", "docs", "comment", "explain", "write", "manual"],
|
| }
|
| boost = np.zeros(len(EXPERT_NAMES))
|
| for idx, kws in expert_keywords.items():
|
| for kw in kws:
|
| if kw in lower_task:
|
| boost[idx] += 0.15
|
|
|
| weights = weights + boost
|
| weights = weights / (weights.sum() + 1e-8)
|
|
|
|
|
| top_indices = np.argsort(weights)[::-1][:top_k]
|
| selected = [
|
| {
|
| "expert": EXPERT_NAMES[i],
|
| "weight": round(float(weights[i]), 4),
|
| "rank": rank + 1,
|
| }
|
| for rank, i in enumerate(top_indices)
|
| ]
|
|
|
| elapsed = (time.monotonic() - start) * 1000
|
|
|
| return {
|
| "task": task,
|
| "features": features,
|
| "all_weights": {EXPERT_NAMES[i]: round(float(weights[i]), 4) for i in range(len(EXPERT_NAMES))},
|
| "selected_experts": selected,
|
| "top_k": top_k,
|
| "nn_enabled": _TORCH,
|
| "elapsed_ms": round(elapsed, 2),
|
| }
|
|
|
|
|
| def plot_expert_weights(weights: dict[str, float]) -> go.Figure:
|
| """Create a bar chart of expert routing weights."""
|
| names = list(weights.keys())
|
| vals = list(weights.values())
|
| colors = ["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#F0E68C", "#87CEEB"]
|
| fig = go.Figure(
|
| data=[go.Bar(x=names, y=vals, marker_color=colors[:len(names)])],
|
| layout=go.Layout(
|
| title="Expert Routing Weights",
|
| yaxis=dict(title="Weight (softmax)", range=[0, max(vals) * 1.2]),
|
| height=350,
|
| template="plotly_dark",
|
| margin=dict(t=40),
|
| ),
|
| )
|
| return fig
|
|
|
|
|
|
|
|
|
|
|
|
|
| AGENTS = [
|
| {"name": "SWE Agent", "specialization": "Code scaffold generation", "icon": "[SWE]"},
|
| {"name": "Architect Agent", "specialization": "System design and patterns", "icon": "[ARCH]"},
|
| {"name": "QA Agent", "specialization": "Test plan and case generation", "icon": "[QA]"},
|
| {"name": "Security Agent", "specialization": "Threat modeling (OWASP)", "icon": "[SEC]"},
|
| {"name": "DevOps Agent", "specialization": "Infrastructure planning", "icon": "[OPS]"},
|
| {"name": "Research Agent", "specialization": "Technical analysis", "icon": "[RES]"},
|
| {"name": "Performance Agent", "specialization": "Optimization analysis", "icon": "[PERF]"},
|
| {"name": "Documentation Agent", "specialization": "Technical writing", "icon": "[DOC]"},
|
| ]
|
|
|
|
|
|
|
| _AGENT_CELL_MAP: dict[str, str] = {
|
| "SWE Agent": "reasoning",
|
| "Architect Agent": "r2p",
|
| "QA Agent": "reasoning",
|
| "Security Agent": "ethics",
|
| "DevOps Agent": "telemetry",
|
| "Research Agent": "causal",
|
| "Performance Agent": "reasoning",
|
| "Documentation Agent": "reasoning",
|
| }
|
|
|
|
|
| def orchestrate(task: str, max_agents: int = 3, strategy: str = "moe_routing") -> dict[str, Any]:
|
| """Orchestrate multiple agents for a task using specified routing strategy."""
|
| start = time.monotonic()
|
|
|
|
|
| if strategy == "round_robin":
|
|
|
| selected_agents = AGENTS[:max_agents]
|
| agent_results = []
|
| for i, agent in enumerate(selected_agents):
|
|
|
| cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
|
| cell_result = execute_cell(cell_type, task)
|
| agent_results.append({
|
| "agent": agent["name"],
|
| "icon": agent["icon"],
|
| "specialization": agent["specialization"],
|
| "weight": round(1.0 / max_agents, 4),
|
| "cell_used": cell_type,
|
| "output": cell_result,
|
| "confidence": cell_result.get("confidence", round(0.8, 3)),
|
| })
|
| elif strategy == "random":
|
|
|
| import random as _rnd
|
| shuffled = _rnd.sample(AGENTS, min(max_agents, len(AGENTS)))
|
| agent_results = []
|
| for agent in shuffled:
|
| cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
|
| cell_result = execute_cell(cell_type, task)
|
| agent_results.append({
|
| "agent": agent["name"],
|
| "icon": agent["icon"],
|
| "specialization": agent["specialization"],
|
| "weight": round(1.0 / max_agents, 4),
|
| "cell_used": cell_type,
|
| "output": cell_result,
|
| "confidence": cell_result.get("confidence", round(0.8, 3)),
|
| })
|
| else:
|
| routing = route_task(task, top_k=max_agents)
|
| agent_results = []
|
| for expert in routing["selected_experts"]:
|
| agent_name = expert["expert"].replace(" Expert", " Agent")
|
| agent = next((a for a in AGENTS if agent_name in a["name"]), AGENTS[0])
|
|
|
| cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
|
| cell_result = execute_cell(cell_type, task)
|
| agent_results.append({
|
| "agent": agent["name"],
|
| "icon": agent["icon"],
|
| "specialization": agent["specialization"],
|
| "weight": expert["weight"],
|
| "cell_used": cell_type,
|
| "output": cell_result,
|
| "confidence": cell_result.get("confidence", round(0.8, 3)),
|
| })
|
|
|
| elapsed = (time.monotonic() - start) * 1000
|
|
|
| return {
|
| "task": task,
|
| "strategy": strategy,
|
| "agents_selected": len(agent_results),
|
| "max_agents": max_agents,
|
| "results": agent_results,
|
| "total_elapsed_ms": round(elapsed, 2),
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
| THEME = gr.themes.Soft(
|
| primary_hue="amber",
|
| secondary_hue="orange",
|
| neutral_hue="stone",
|
| font=gr.themes.GoogleFont("Inter"),
|
| )
|
|
|
| CSS = """
|
| .main-header { text-align: center; margin-bottom: 1rem; }
|
| .main-header h1 { background: linear-gradient(135deg, #FF6B6B, #FFEAA7, #4ECDC4);
|
| -webkit-background-clip: text; -webkit-text-fill-color: transparent;
|
| font-size: 2.5rem; font-weight: 800; }
|
| .stat-box { background: linear-gradient(135deg, #1a1a2e, #16213e);
|
| border: 1px solid #0f3460; border-radius: 12px; padding: 1rem;
|
| text-align: center; color: #e8e8e8; }
|
| .stat-box h3 { color: #FFEAA7; margin: 0; font-size: 1.8rem; }
|
| .stat-box p { color: #a8a8a8; margin: 0; font-size: 0.85rem; }
|
| footer { display: none !important; }
|
| .plotly .main-svg { overflow: visible !important; }
|
| """
|
|
|
|
|
| def build_app() -> gr.Blocks:
|
| """Build the complete Gradio application."""
|
| with gr.Blocks(theme=THEME, css=CSS, title="MangoMAS — Multi-Agent Cognitive Architecture") as app:
|
|
|
|
|
| gr.HTML("""
|
| <div class="main-header">
|
| <h1>MangoMAS</h1>
|
| <p style="color: #a8a8a8; font-size: 1.1rem;">
|
| Multi-Agent Cognitive Architecture — Interactive Demo
|
| </p>
|
| </div>
|
| """)
|
|
|
|
|
| with gr.Row():
|
| for label, value in [
|
| ("Cognitive Cells", "10"), ("MoE Params", "~7M"),
|
| ("MCTS Strategies", "UCB1 + PUCT"), ("Expert Agents", "8"),
|
| ]:
|
| gr.HTML(f'<div class="stat-box"><h3>{value}</h3><p>{label}</p></div>')
|
|
|
|
|
| with gr.Tab("Cognitive Cells", id="cells"):
|
| gr.Markdown("### Execute any of the 10 biologically-inspired cognitive cells")
|
|
|
| with gr.Row():
|
| cell_type = gr.Dropdown(
|
| choices=list(CELL_TYPES.keys()),
|
| value="reasoning",
|
| label="Cell Type",
|
| info="Select a cognitive cell to execute",
|
| )
|
| cell_info = gr.Textbox(
|
| label="Description",
|
| value=CELL_TYPES["reasoning"]["description"],
|
| interactive=False,
|
| )
|
|
|
| cell_input = gr.Textbox(
|
| label="Input Text",
|
| placeholder="Enter text to process through the cell...",
|
| value="Design a scalable microservices architecture with event-driven communication",
|
| lines=3,
|
| )
|
| cell_config = gr.Textbox(
|
| label="Config (JSON, optional)",
|
| placeholder='{"head_type": "nn"}',
|
| value="{}",
|
| lines=1,
|
| )
|
| cell_btn = gr.Button("Execute Cell", variant="primary")
|
| cell_output = gr.JSON(label="Cell Output")
|
|
|
| gr.Markdown("---\n### Cell Composition Pipeline")
|
| pipeline_input = gr.Textbox(
|
| label="Pipeline (comma-separated cell types)",
|
| value="ethics, reasoning, aggregator",
|
| placeholder="ethics, reasoning, memory",
|
| )
|
| pipeline_text = gr.Textbox(
|
| label="Input Text",
|
| value="Analyze the security implications of this API design: user@example.com",
|
| lines=2,
|
| )
|
| pipeline_btn = gr.Button("Run Pipeline", variant="secondary")
|
| pipeline_output = gr.JSON(label="Pipeline Result")
|
|
|
|
|
| def on_cell_select(ct: str) -> str:
|
| return CELL_TYPES.get(ct, {}).get("description", "Unknown cell type")
|
|
|
| cell_type.change(on_cell_select, inputs=cell_type, outputs=cell_info)
|
| cell_btn.click(execute_cell, inputs=[cell_type, cell_input, cell_config], outputs=cell_output)
|
| pipeline_btn.click(compose_cells, inputs=[pipeline_input, pipeline_text], outputs=pipeline_output)
|
|
|
|
|
| with gr.Tab("MCTS Planning", id="mcts"):
|
| gr.Markdown("### Monte Carlo Tree Search with Policy/Value Neural Networks")
|
|
|
| with gr.Row():
|
| mcts_task = gr.Textbox(
|
| label="Task to Plan",
|
| value="Design a secure, scalable REST API with authentication",
|
| lines=2,
|
| scale=3,
|
| )
|
| with gr.Column(scale=1):
|
| mcts_sims = gr.Slider(10, 500, value=100, step=10, label="Simulations")
|
| mcts_c = gr.Slider(0.1, 3.0, value=1.414, step=0.1, label="Exploration Constant (C)")
|
| mcts_strat = gr.Radio(["ucb1", "puct"], value="ucb1", label="Selection Strategy")
|
|
|
| mcts_btn = gr.Button("Run MCTS", variant="primary")
|
|
|
| with gr.Row():
|
| mcts_tree_plot = gr.Plot(label="Search Tree Visualization")
|
| mcts_json = gr.JSON(label="MCTS Result")
|
|
|
| gr.Markdown("---\n### Strategy Benchmark")
|
| bench_task = gr.Textbox(
|
| label="Benchmark Task",
|
| value="Optimize database query performance for high-throughput system",
|
| )
|
| bench_btn = gr.Button("Run Benchmark", variant="secondary")
|
| bench_output = gr.JSON(label="Benchmark Results (MCTS vs Greedy vs Random)")
|
|
|
| def run_and_plot(task, sims, c, strat):
|
| result = run_mcts(task, int(sims), c, strat)
|
| fig = plot_mcts_tree(result["tree"])
|
| return fig, result
|
|
|
| mcts_btn.click(run_and_plot, inputs=[mcts_task, mcts_sims, mcts_c, mcts_strat], outputs=[mcts_tree_plot, mcts_json])
|
| bench_btn.click(benchmark_strategies, inputs=bench_task, outputs=bench_output)
|
|
|
|
|
| with gr.Tab("MoE Router", id="moe"):
|
| gr.Markdown("### Neural Mixture-of-Experts Routing Gate")
|
| gr.Markdown(
|
| "The RouterNet MLP extracts 64-dimensional features from text, "
|
| "then routes to the top-K most relevant expert agents."
|
| )
|
|
|
| with gr.Row():
|
| moe_task = gr.Textbox(
|
| label="Task to Route",
|
| value="Implement a threat detection system with real-time alerting",
|
| lines=2,
|
| scale=3,
|
| )
|
| moe_topk = gr.Slider(1, 8, value=3, step=1, label="Top-K Experts", scale=1)
|
|
|
| moe_btn = gr.Button("Route Task", variant="primary")
|
|
|
| with gr.Row():
|
| moe_features_plot = gr.Plot(label="64-D Feature Vector")
|
| moe_weights_plot = gr.Plot(label="Expert Routing Weights")
|
|
|
| moe_json = gr.JSON(label="Routing Result")
|
|
|
| def route_and_plot(task, top_k):
|
| result = route_task(task, int(top_k))
|
| feat_fig = plot_features(result["features"])
|
| weight_fig = plot_expert_weights(result["all_weights"])
|
|
|
| display = {k: v for k, v in result.items() if k != "features"}
|
| return feat_fig, weight_fig, display
|
|
|
| moe_btn.click(route_and_plot, inputs=[moe_task, moe_topk], outputs=[moe_features_plot, moe_weights_plot, moe_json])
|
|
|
|
|
| with gr.Tab("Agents", id="agents"):
|
| gr.Markdown("### Multi-Agent Orchestration with MoE Routing")
|
|
|
| with gr.Row():
|
| orch_task = gr.Textbox(
|
| label="Task",
|
| value="Build a secure payment processing microservice with PCI compliance",
|
| lines=2,
|
| scale=3,
|
| )
|
| with gr.Column(scale=1):
|
| orch_agents = gr.Slider(1, 8, value=3, step=1, label="Max Agents")
|
| orch_strat = gr.Dropdown(
|
| ["moe_routing", "round_robin", "random"],
|
| value="moe_routing",
|
| label="Routing Strategy",
|
| )
|
|
|
| orch_btn = gr.Button("Orchestrate", variant="primary")
|
| orch_output = gr.JSON(label="Orchestration Result")
|
|
|
| gr.Markdown("---\n### Available Agents")
|
| agent_table = gr.Dataframe(
|
| value=[[a["icon"], a["name"], a["specialization"]] for a in AGENTS],
|
| headers=["", "Agent", "Specialization"],
|
| interactive=False,
|
| )
|
|
|
| orch_btn.click(orchestrate, inputs=[orch_task, orch_agents, orch_strat], outputs=orch_output)
|
|
|
|
|
| with gr.Tab("Architecture", id="arch"):
|
| gr.Markdown("""
|
| ### MangoMAS System Architecture
|
|
|
| ```
|
| ┌──────────────────────────────────────────────────────────┐
|
| │ FastAPI Gateway │
|
| │ (Auth / Tenant Middleware) │
|
| ├──────────────────────────────────────────────────────────┤
|
| │ │
|
| │ ┌──────────────┐ ┌───────────────────────────┐ │
|
| │ │ MoE Input │────▶│ RouterNet (Neural Gate) │ │
|
| │ │ Parser │ │ 64-dim → MLP → Softmax │ │
|
| │ └──────────────┘ └─────────┬─────────────────┘ │
|
| │ │ │
|
| │ ┌───────┬───────┬───────┼───────┬───────┐ │
|
| │ ▼ ▼ ▼ ▼ ▼ ▼ │
|
| │ Expert Expert Expert Expert Expert Expert │
|
| │ │ │ │ │ │ │ │
|
| │ Agent Agent Agent Agent Agent Agent │
|
| │ │ │ │ │ │ │ │
|
| │ ┌─────┴───────┴───────┴───────┴───────┴───────┘ │
|
| │ │ Cognitive Cell Layer │
|
| │ │ [Reasoning│Memory│Ethics│Causal│Empathy│...] │
|
| │ └─────────────────────┬────────────────────────┘ │
|
| │ ▼ │
|
| │ Aggregator Cell │
|
| │ (weighted / ensemble / ranking) │
|
| │ │ │
|
| │ Feedback Loop → Router Update │
|
| │ │ │
|
| │ Response + Metrics + Traces │
|
| └──────────────────────────────────────────────────────────┘
|
| ```
|
|
|
| ### Neural Network Components
|
|
|
| | Component | Architecture | Parameters | Latency |
|
| |-----------|-------------|------------|---------|
|
| | **MixtureOfExperts7M** | 16 Expert Towers (64→512→512→256) + Gate | ~7M | ~5ms |
|
| | **RouterNet** | MLP (64→128→64→8) + Softmax | ~17K | <1ms |
|
| | **PolicyNetwork** | MLP (128→256→128→32) + Softmax | ~70K | <1ms |
|
| | **ValueNetwork** | MLP (192→256→64→1) + Tanh | ~66K | <1ms |
|
| | **ReasoningCell NN Head** | Lightweight transformer | ~500K | ~50ms |
|
|
|
| ### Cognitive Cell Lifecycle
|
|
|
| ```
|
| preprocess() → infer() → postprocess() → publish()
|
| │ │ │ │
|
| Validate Core Logic Format Emit Event
|
| Normalize NN/Rule Filter (Event Bus)
|
| Enrich Inference Enrich
|
| ```
|
| """)
|
|
|
|
|
| with gr.Tab("Metrics", id="metrics"):
|
| gr.Markdown("### Live Performance Benchmarks")
|
|
|
| metrics_btn = gr.Button("Run All Benchmarks", variant="primary")
|
|
|
| with gr.Row():
|
| metrics_routing = gr.Plot(label="Routing Latency by Expert Count")
|
| metrics_cells = gr.Plot(label="Cell Execution Latency")
|
|
|
| metrics_json = gr.JSON(label="Raw Metrics")
|
|
|
| def run_benchmarks():
|
|
|
|
|
| route_task("warmup", top_k=1)
|
| execute_cell("reasoning", "warmup")
|
|
|
|
|
| ks = list(range(1, 9))
|
| latencies = []
|
| for k in ks:
|
| times = []
|
| for _ in range(10):
|
| r = route_task("Test routing benchmark task", top_k=k)
|
| times.append(r["elapsed_ms"])
|
|
|
| times.sort()
|
| latencies.append(times[len(times) // 2])
|
|
|
| fig_routing = go.Figure(
|
| data=[go.Scatter(x=ks, y=latencies, mode="lines+markers", name="Routing Latency")],
|
| layout=go.Layout(
|
| title="Routing Latency vs Top-K",
|
| xaxis_title="Top-K Experts",
|
| yaxis_title="Latency (ms)",
|
| height=350,
|
| template="plotly_dark",
|
| ),
|
| )
|
|
|
|
|
| cell_times: dict[str, float] = {}
|
| for ct in CELL_TYPES:
|
| times = []
|
| for _ in range(5):
|
| r = execute_cell(ct, "Benchmark test input for cell")
|
| times.append(r["elapsed_ms"])
|
| times.sort()
|
| cell_times[ct] = times[len(times) // 2]
|
|
|
| fig_cells = go.Figure(
|
| data=[go.Bar(
|
| x=list(cell_times.keys()),
|
| y=list(cell_times.values()),
|
| marker_color=["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7",
|
| "#DDA0DD", "#F0E68C", "#87CEEB", "#FFA07A", "#98FB98"],
|
| )],
|
| layout=go.Layout(
|
| title="Cell Execution Latency",
|
| xaxis=dict(
|
| title="Cell Type",
|
| tickangle=-30,
|
| tickfont=dict(size=9),
|
| ),
|
| yaxis_title="Latency (ms)",
|
| height=400,
|
| template="plotly_dark",
|
| margin=dict(b=100),
|
| ),
|
| )
|
|
|
| summary = {
|
| "torch_available": _TORCH,
|
| "routing_latency_p50_ms": round(sorted(latencies)[len(latencies) // 2], 3),
|
| "cell_latency_avg_ms": round(sum(cell_times.values()) / len(cell_times), 3),
|
| "total_nn_parameters": "~7.15M" if _TORCH else "N/A (CPU fallback)",
|
| }
|
|
|
| return fig_routing, fig_cells, summary
|
|
|
| metrics_btn.click(run_benchmarks, outputs=[metrics_routing, metrics_cells, metrics_json])
|
|
|
| return app
|
|
|
|
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
| app = build_app()
|
| app.launch(
|
| server_name="0.0.0.0",
|
| server_port=7860,
|
| share=False,
|
| )
|
|
|