| |
| """Codette Phase 6 Inference Bridge — ForgeEngine integration for web server |
| |
| This module provides a bridge between codette_server.py and ForgeEngine, |
| enabling Phase 6 capabilities (query complexity routing, semantic tension, |
| specialization tracking, pre-flight prediction) without breaking the web UI. |
| |
| Usage: |
| from codette_forge_bridge import CodetteForgeBridge |
| |
| bridge = CodetteForgeBridge(orchestrator=orch, use_phase6=True) |
| result = bridge.generate(query, adapter=None, max_adapters=2) |
| |
| The bridge falls back to lightweight orchestrator if Phase 6 disabled or heavy. |
| """ |
|
|
| import re |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Dict, Optional |
|
|
| |
| try: |
| from substrate_awareness import SubstrateMonitor, HealthAwareRouter, CocoonStateEnricher |
| SUBSTRATE_AVAILABLE = True |
| except ImportError: |
| SUBSTRATE_AVAILABLE = False |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| try: |
| from reasoning_forge.forge_engine import ForgeEngine |
| from reasoning_forge.query_classifier import QueryClassifier, QueryComplexity |
| from reasoning_forge.executive_controller import ExecutiveController, ComponentDecision |
| PHASE6_AVAILABLE = True |
| PHASE7_AVAILABLE = True |
| except ImportError as e: |
| PHASE6_AVAILABLE = False |
| PHASE7_AVAILABLE = False |
| print(f"[WARNING] ForgeEngine not available - Phase 6/7 disabled: {e}") |
|
|
|
|
| class CodetteForgeBridge: |
| """Bridge between web server (lightweight) and ForgeEngine (Phase 6).""" |
|
|
| def __init__(self, orchestrator, use_phase6: bool = True, use_phase7: bool = True, verbose: bool = False, health_check_fn=None): |
| """ |
| Args: |
| orchestrator: CodetteOrchestrator instance for fallback |
| use_phase6: Enable Phase 6 (requires ForgeEngine) |
| use_phase7: Enable Phase 7 (Executive Controller routing) |
| verbose: Log decisions |
| health_check_fn: Callable that returns real system health dict |
| """ |
| self.orchestrator = orchestrator |
| self.verbose = verbose |
| self._health_check_fn = health_check_fn |
| self.use_phase6 = use_phase6 and PHASE6_AVAILABLE |
| self.use_phase7 = use_phase7 and PHASE7_AVAILABLE |
|
|
| |
| self.substrate_monitor = None |
| self.health_router = None |
| self.cocoon_enricher = None |
| if SUBSTRATE_AVAILABLE: |
| try: |
| self.substrate_monitor = SubstrateMonitor() |
| self.health_router = HealthAwareRouter(self.substrate_monitor) |
| self.cocoon_enricher = CocoonStateEnricher(self.substrate_monitor) |
| if self.verbose: |
| print("[SUBSTRATE] Substrate-aware cognition initialized") |
| except Exception as e: |
| print(f"[WARNING] Substrate awareness init failed: {e}") |
|
|
| self.forge = None |
| self.classifier = None |
| self.executive_controller = None |
|
|
| if self.use_phase6: |
| try: |
| self._init_phase6() |
| except Exception as e: |
| print(f"[WARNING] Phase 6 initialization failed: {e}") |
| self.use_phase6 = False |
|
|
| if self.use_phase7 and self.use_phase6: |
| try: |
| self.executive_controller = ExecutiveController(verbose=verbose) |
| if self.verbose: |
| print("[PHASE7] Executive Controller initialized - intelligent routing enabled") |
| except Exception as e: |
| print(f"[WARNING] Phase 7 initialization failed: {e}") |
| self.use_phase7 = False |
|
|
| def _init_phase6(self): |
| """Initialize ForgeEngine with Phase 6 components.""" |
| if self.verbose: |
| print("[PHASE6] Initializing ForgeEngine...") |
|
|
| self.forge = ForgeEngine(orchestrator=self.orchestrator) |
| self.classifier = QueryClassifier() |
|
|
| |
| if hasattr(self.forge, 'memory_kernel') and self.forge.memory_kernel: |
| self.orchestrator.set_memory_kernel(self.forge.memory_kernel) |
| if self.verbose: |
| print(f"[PHASE6] Memory kernel wired to orchestrator ({len(self.forge.memory_kernel)} cocoon memories)") |
|
|
| if self.verbose: |
| print(f"[PHASE6] ForgeEngine ready with {len(self.forge.analysis_agents)} agents") |
|
|
| def generate(self, query: str, adapter: Optional[str] = None, |
| max_adapters: int = 2, memory_budget: int = 3, |
| max_response_tokens: int = 512) -> Dict: |
| """Generate response with optional Phase 6 routing. |
| |
| Args: |
| query: User query |
| adapter: Force specific adapter (bypasses routing) |
| max_adapters: Max adapters for multi-perspective |
| memory_budget: Max cocoons for forge memory injection (from BehaviorGovernor) |
| max_response_tokens: Response length budget (from BehaviorGovernor) |
| |
| Returns: |
| { |
| "response": str, |
| "adapter": str or list, |
| "phase6_used": bool, |
| "complexity": str, # if Phase 6 |
| "conflicts_prevented": int, # if Phase 6 |
| "reasoning": str, |
| ...rest from orchestrator... |
| } |
| """ |
| start_time = time.time() |
|
|
| |
| _health_patterns = [ |
| r'self[\s-]*(?:system|health|diagnostic|check)', |
| r'system[\s-]*health[\s-]*check', |
| r'run[\s-]*(?:a\s+)?diagnostic', |
| r'check\s+(?:your|all)\s+systems', |
| r'health[\s-]*report', |
| r'are\s+(?:all\s+)?(?:your\s+)?systems?\s+(?:ok|working|online|running)', |
| ] |
| if any(re.search(p, query, re.I) for p in _health_patterns) and self._health_check_fn: |
| try: |
| health = self._health_check_fn() |
|
|
| |
| lines = [f"**Self-System Health Check — {health['overall']}** ({health['score']} checks passed)\n"] |
|
|
| for sys_name, sys_data in health.get("systems", {}).items(): |
| status = sys_data.get("status", "?") if isinstance(sys_data, dict) else str(sys_data) |
| icon = "✅" if status == "OK" else ("⚠️" if status in ("DISABLED", "MISSING", "DEGRADED") else "❌") |
| label = sys_name.replace("_", " ").title() |
| lines.append(f"{icon} **{label}**: {status}") |
|
|
| |
| if isinstance(sys_data, dict): |
| if "adapters_loaded" in sys_data: |
| lines.append(f" └ {sys_data['adapters_loaded']} adapters: {', '.join(sys_data.get('adapters', []))}") |
| if "components" in sys_data: |
| for comp, cdata in sys_data["components"].items(): |
| cstatus = cdata.get("status", "?") if isinstance(cdata, dict) else str(cdata) |
| cicon = "✅" if cstatus == "OK" else "❌" |
| comp_label = comp.replace("_", " ").title() |
| detail = "" |
| if isinstance(cdata, dict): |
| if "memories" in cdata: |
| detail = f" ({cdata['memories']} memories)" |
| elif "audit_entries" in cdata: |
| detail = f" ({cdata['audit_entries']} audit entries)" |
| elif "stored_cocoons" in cdata: |
| detail = f" ({cdata['stored_cocoons']} cocoons)" |
| lines.append(f" {cicon} {comp_label}{detail}") |
| if "subsystems" in sys_data: |
| for sub, sstatus in sys_data["subsystems"].items(): |
| sicon = "✅" if sstatus == "OK" else "❌" |
| lines.append(f" {sicon} {sub}") |
| if "spiderweb_metrics" in sys_data: |
| sm = sys_data["spiderweb_metrics"] |
| lines.append(f" └ Coherence: {sm.get('phase_coherence', 0):.4f}, Entropy: {sm.get('entropy', 0):.4f}, Nodes: {sm.get('node_count', 0)}, Attractors: {sm.get('attractor_count', 0)}, Glyphs: {sm.get('glyph_count', 0)}") |
| if "behavior_lessons" in sys_data: |
| lines.append(f" └ {sys_data['behavior_lessons']} learned behaviors, {sys_data['permanent_locks']} permanent locks") |
| if "alive" in sys_data: |
| lines.append(f" └ {sys_data['alive']}/{sys_data['total']} alive, {sys_data.get('pending_requests', 0)} pending") |
|
|
| if health.get("warnings"): |
| lines.append(f"\n⚠️ **Warnings**: {', '.join(health['warnings'])}") |
| if health.get("errors"): |
| lines.append(f"\n❌ **Errors**: {', '.join(health['errors'])}") |
|
|
| return { |
| "response": "\n".join(lines), |
| "adapter": "self_diagnostic", |
| "tokens": 0, |
| "time": round(time.time() - start_time, 2), |
| "phase6_used": True, |
| "reasoning": "Real self-diagnostic (not LLM-generated)", |
| "health": health, |
| } |
| except Exception as e: |
| pass |
|
|
| |
| if self.forge and hasattr(self.forge, 'ethical_governance') and self.forge.ethical_governance: |
| try: |
| qv = self.forge.ethical_governance.validate_query(query) |
| if not qv["valid"]: |
| return { |
| "response": "I can't help with that request. " + "; ".join(qv.get("suggestions", [])), |
| "adapter": "ethical_block", |
| "tokens": 0, |
| "phase6_used": True, |
| "reasoning": "Blocked by EthicalAIGovernance", |
| } |
| except Exception: |
| pass |
|
|
| |
| if adapter or not self.use_phase6: |
| result = self.orchestrator.route_and_generate( |
| query, |
| max_adapters=max_adapters, |
| strategy="keyword", |
| force_adapter=adapter, |
| ) |
| result["phase6_used"] = False |
| return result |
|
|
| |
| self._memory_budget = memory_budget |
| self._max_response_tokens = max_response_tokens |
|
|
| |
| try: |
| return self._generate_with_phase6(query, max_adapters) |
| except Exception as e: |
| if self.verbose: |
| print(f"[PHASE6] Error: {e} - falling back to orchestrator") |
|
|
| |
| result = self.orchestrator.route_and_generate( |
| query, |
| max_adapters=max_adapters, |
| strategy="keyword", |
| force_adapter=None, |
| ) |
| result["phase6_used"] = False |
| result["phase6_fallback_reason"] = str(e) |
| return result |
|
|
| def _generate_with_phase6(self, query: str, max_adapters: int) -> Dict: |
| """Generate using orchestrator LLM with Phase 6/7 routing and classification. |
| |
| All complexity levels use the orchestrator for actual LLM inference. |
| Phase 6 adds query classification and domain routing. |
| Phase 7 adds executive routing metadata. |
| """ |
| start_time = time.time() |
|
|
| |
| complexity = self.classifier.classify(query) |
| if self.verbose: |
| print(f"[PHASE6] Query complexity: {complexity}", flush=True) |
|
|
| |
| route_decision = None |
| if self.use_phase7 and self.executive_controller: |
| route_decision = self.executive_controller.route_query(query, complexity) |
| if self.verbose: |
| print(f"[PHASE7] Route: {','.join([k for k, v in route_decision.component_activation.items() if v])}", flush=True) |
|
|
| |
| domain = self._classify_domain(query) |
|
|
| |
| if complexity == QueryComplexity.SIMPLE: |
| effective_max_adapters = 1 |
| elif complexity == QueryComplexity.MEDIUM: |
| effective_max_adapters = min(max_adapters, 2) |
| else: |
| effective_max_adapters = max_adapters |
|
|
| |
| substrate_adjustments = [] |
| if self.health_router: |
| original_complexity = complexity |
| original_max = effective_max_adapters |
| complexity, effective_max_adapters, substrate_adjustments = \ |
| self.health_router.adjust_routing(complexity, effective_max_adapters) |
| if substrate_adjustments: |
| for adj in substrate_adjustments: |
| print(f" [SUBSTRATE] {adj}", flush=True) |
|
|
| if self.verbose: |
| print(f"[PHASE6] Domain: {domain}, max_adapters: {effective_max_adapters}", flush=True) |
|
|
| |
| result = self.orchestrator.route_and_generate( |
| query, |
| max_adapters=effective_max_adapters, |
| strategy="keyword", |
| force_adapter=None, |
| ) |
|
|
| elapsed = time.time() - start_time |
|
|
| |
| result["phase6_used"] = True |
| result["phase7_used"] = self.use_phase7 and self.executive_controller is not None |
| result["complexity"] = str(complexity) |
| result["domain"] = domain |
|
|
| if route_decision: |
| try: |
| route_metadata = ExecutiveController.create_route_metadata( |
| route_decision, |
| actual_latency_ms=elapsed * 1000, |
| actual_conflicts=0, |
| gamma=0.95 |
| ) |
| result.update(route_metadata) |
| except Exception as e: |
| if self.verbose: |
| print(f"[PHASE7] Metadata error: {e}", flush=True) |
|
|
| result["reasoning"] = f"Phase 6: {complexity.name} complexity, {domain} domain" |
|
|
| |
| if not result.get("response", "").strip() and result.get("perspectives"): |
| perspectives = result["perspectives"] |
| if isinstance(perspectives, dict) and perspectives: |
| |
| best = max(perspectives.values(), key=lambda v: len(str(v))) |
| result["response"] = str(best) |
| result["reasoning"] += " | fallback: used best perspective (synthesis was empty)" |
| print(f" [FALLBACK] Synthesis empty — using best perspective ({len(result['response'])} chars)", flush=True) |
| elif isinstance(perspectives, str) and perspectives.strip(): |
| result["response"] = perspectives |
| result["reasoning"] += " | fallback: used raw perspectives" |
|
|
| |
| |
| |
| response_text = result.get("response", "") |
| if response_text and self.forge and hasattr(self.forge, 'cocooner') and self.forge.cocooner: |
| try: |
| cocoon_meta = {"complexity": str(complexity), "domain": domain} |
| if substrate_adjustments: |
| cocoon_meta["substrate_adjustments"] = substrate_adjustments |
| |
| if self.cocoon_enricher: |
| cocoon_meta = self.cocoon_enricher.enrich(cocoon_meta) |
| self.forge.cocooner.wrap_reasoning( |
| query=query, |
| response=response_text, |
| adapter=str(result.get("adapter", "unknown")), |
| metadata=cocoon_meta |
| ) |
| except Exception: |
| pass |
|
|
| |
| if self.substrate_monitor: |
| self.substrate_monitor.record_inference(elapsed * 1000) |
|
|
| |
| response_text = result.get("response", "") |
| if response_text: |
| result["response"] = self._apply_directness(response_text, query) |
|
|
| |
| try: |
| from codette_orchestrator import extract_constraints, enforce_constraints |
| constraints = extract_constraints(query) |
| if constraints: |
| result["response"] = enforce_constraints(result["response"], constraints) |
| result["constraints_applied"] = constraints |
| except ImportError: |
| pass |
|
|
| |
| try: |
| from self_correction import universal_self_check |
| result["response"], lock_issues = universal_self_check(result["response"]) |
| if lock_issues: |
| result["lock_fixes"] = lock_issues |
| except ImportError: |
| pass |
|
|
| if self.verbose: |
| resp_len = len(result.get("response", "")) |
| print(f"[PHASE6] Done: {resp_len} chars, {result.get('tokens', 0)} tokens", flush=True) |
|
|
| return result |
|
|
| def _apply_directness(self, response: str, query: str) -> str: |
| """Self-critique loop: trim filler, cut abstraction padding, anchor to user intent. |
| |
| Rules: |
| 1. Strip preamble phrases ("That's a great question!", "Let me explain...", etc.) |
| 2. Remove trailing abstraction filler ("In conclusion", "Overall", vague wrap-ups) |
| 3. Collapse excessive whitespace |
| """ |
| |
| preamble_patterns = [ |
| r"^(?:That(?:'s| is) (?:a |an )?(?:great|good|interesting|excellent|fantastic|wonderful|fascinating) question[.!]?\s*)", |
| r"^(?:What a (?:great|good|interesting|excellent|fascinating) question[.!]?\s*)", |
| r"^(?:I(?:'d| would) (?:be happy|love) to (?:help|explain|answer)[.!]?\s*)", |
| r"^(?:Let me (?:explain|break (?:this|that) down|think about (?:this|that))[.!]?\s*)", |
| r"^(?:Great question[.!]?\s*)", |
| r"^(?:Thank you for (?:asking|your question)[.!]?\s*)", |
| r"^(?:Absolutely[.!]?\s*)", |
| r"^(?:Of course[.!]?\s*)", |
| r"^(?:Sure(?:thing)?[.!]?\s*)", |
| ] |
| for pat in preamble_patterns: |
| response = re.sub(pat, "", response, count=1, flags=re.IGNORECASE) |
|
|
| |
| trailing_patterns = [ |
| r"\n\n(?:In (?:conclusion|summary|the end),?\s+.{0,200})$", |
| r"\n\n(?:Overall,?\s+.{0,150})$", |
| r"\n\n(?:(?:I )?hope (?:this|that) helps[.!]?\s*)$", |
| r"\n\n(?:Let me know if (?:you (?:have|need|want)|there(?:'s| is)) .{0,100})$", |
| r"\n\n(?:Feel free to .{0,100})$", |
| ] |
| for pat in trailing_patterns: |
| response = re.sub(pat, "", response, count=1, flags=re.IGNORECASE) |
|
|
| |
| response = re.sub(r'\n{3,}', '\n\n', response) |
|
|
| return response.strip() |
|
|
| def _classify_domain(self, query: str) -> str: |
| """Classify query domain (physics, ethics, consciousness, creativity, systems).""" |
| query_lower = query.lower() |
|
|
| |
| domains = { |
| "physics": ["force", "energy", "velocity", "gravity", "motion", "light", "speed", |
| "particle", "entropy", "time arrow", "quantum", "physics"], |
| "ethics": ["moral", "right", "wrong", "should", "ethical", "justice", "fair", |
| "duty", "consequence", "utilitarian", "virtue", "ethics", "lie", "save"], |
| "consciousness": ["conscious", "awareness", "qualia", "mind", "experience", |
| "subjective", "hard problem", "zombie", "consciousness"], |
| "creativity": ["creative", "creative", "art", "invention", "novel", "design", |
| "imagination", "innovation", "beautiful"], |
| "systems": ["system", "emerge", "feedback", "loop", "complex", "agent", "adapt", |
| "network", "evolution", "architecture", "free will"], |
| } |
|
|
| for domain, keywords in domains.items(): |
| if any(kw in query_lower for kw in keywords): |
| return domain |
|
|
| return "general" |
|
|