Codette-Reasoning / inference /codette_forge_bridge.py
Jonathan Harrison
Full Codette codebase sync — transparency release
74f2af5
#!/usr/bin/env python3
"""Codette Phase 6 Inference Bridge — ForgeEngine integration for web server
This module provides a bridge between codette_server.py and ForgeEngine,
enabling Phase 6 capabilities (query complexity routing, semantic tension,
specialization tracking, pre-flight prediction) without breaking the web UI.
Usage:
from codette_forge_bridge import CodetteForgeBridge
bridge = CodetteForgeBridge(orchestrator=orch, use_phase6=True)
result = bridge.generate(query, adapter=None, max_adapters=2)
The bridge falls back to lightweight orchestrator if Phase 6 disabled or heavy.
"""
import re
import sys
import time
from pathlib import Path
from typing import Dict, Optional
# Substrate-aware cognition
try:
from substrate_awareness import SubstrateMonitor, HealthAwareRouter, CocoonStateEnricher
SUBSTRATE_AVAILABLE = True
except ImportError:
SUBSTRATE_AVAILABLE = False
# Add repo to path
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from reasoning_forge.forge_engine import ForgeEngine
from reasoning_forge.query_classifier import QueryClassifier, QueryComplexity
from reasoning_forge.executive_controller import ExecutiveController, ComponentDecision
PHASE6_AVAILABLE = True
PHASE7_AVAILABLE = True
except ImportError as e:
PHASE6_AVAILABLE = False
PHASE7_AVAILABLE = False
print(f"[WARNING] ForgeEngine not available - Phase 6/7 disabled: {e}")
class CodetteForgeBridge:
"""Bridge between web server (lightweight) and ForgeEngine (Phase 6)."""
def __init__(self, orchestrator, use_phase6: bool = True, use_phase7: bool = True, verbose: bool = False, health_check_fn=None):
"""
Args:
orchestrator: CodetteOrchestrator instance for fallback
use_phase6: Enable Phase 6 (requires ForgeEngine)
use_phase7: Enable Phase 7 (Executive Controller routing)
verbose: Log decisions
health_check_fn: Callable that returns real system health dict
"""
self.orchestrator = orchestrator
self.verbose = verbose
self._health_check_fn = health_check_fn
self.use_phase6 = use_phase6 and PHASE6_AVAILABLE
self.use_phase7 = use_phase7 and PHASE7_AVAILABLE
# Substrate-aware cognition
self.substrate_monitor = None
self.health_router = None
self.cocoon_enricher = None
if SUBSTRATE_AVAILABLE:
try:
self.substrate_monitor = SubstrateMonitor()
self.health_router = HealthAwareRouter(self.substrate_monitor)
self.cocoon_enricher = CocoonStateEnricher(self.substrate_monitor)
if self.verbose:
print("[SUBSTRATE] Substrate-aware cognition initialized")
except Exception as e:
print(f"[WARNING] Substrate awareness init failed: {e}")
self.forge = None
self.classifier = None
self.executive_controller = None
if self.use_phase6:
try:
self._init_phase6()
except Exception as e:
print(f"[WARNING] Phase 6 initialization failed: {e}")
self.use_phase6 = False
if self.use_phase7 and self.use_phase6:
try:
self.executive_controller = ExecutiveController(verbose=verbose)
if self.verbose:
print("[PHASE7] Executive Controller initialized - intelligent routing enabled")
except Exception as e:
print(f"[WARNING] Phase 7 initialization failed: {e}")
self.use_phase7 = False
def _init_phase6(self):
"""Initialize ForgeEngine with Phase 6 components."""
if self.verbose:
print("[PHASE6] Initializing ForgeEngine...")
self.forge = ForgeEngine(orchestrator=self.orchestrator)
self.classifier = QueryClassifier()
# Wire cocoon memories into orchestrator so they enrich LLM prompts
if hasattr(self.forge, 'memory_kernel') and self.forge.memory_kernel:
self.orchestrator.set_memory_kernel(self.forge.memory_kernel)
if self.verbose:
print(f"[PHASE6] Memory kernel wired to orchestrator ({len(self.forge.memory_kernel)} cocoon memories)")
if self.verbose:
print(f"[PHASE6] ForgeEngine ready with {len(self.forge.analysis_agents)} agents")
def generate(self, query: str, adapter: Optional[str] = None,
max_adapters: int = 2, memory_budget: int = 3,
max_response_tokens: int = 512) -> Dict:
"""Generate response with optional Phase 6 routing.
Args:
query: User query
adapter: Force specific adapter (bypasses routing)
max_adapters: Max adapters for multi-perspective
memory_budget: Max cocoons for forge memory injection (from BehaviorGovernor)
max_response_tokens: Response length budget (from BehaviorGovernor)
Returns:
{
"response": str,
"adapter": str or list,
"phase6_used": bool,
"complexity": str, # if Phase 6
"conflicts_prevented": int, # if Phase 6
"reasoning": str,
...rest from orchestrator...
}
"""
start_time = time.time()
# Self-diagnostic: intercept health check queries before LLM
_health_patterns = [
r'self[\s-]*(?:system|health|diagnostic|check)',
r'system[\s-]*health[\s-]*check',
r'run[\s-]*(?:a\s+)?diagnostic',
r'check\s+(?:your|all)\s+systems',
r'health[\s-]*report',
r'are\s+(?:all\s+)?(?:your\s+)?systems?\s+(?:ok|working|online|running)',
]
if any(re.search(p, query, re.I) for p in _health_patterns) and self._health_check_fn:
try:
health = self._health_check_fn()
# Format as a natural response with real data
lines = [f"**Self-System Health Check — {health['overall']}** ({health['score']} checks passed)\n"]
for sys_name, sys_data in health.get("systems", {}).items():
status = sys_data.get("status", "?") if isinstance(sys_data, dict) else str(sys_data)
icon = "✅" if status == "OK" else ("⚠️" if status in ("DISABLED", "MISSING", "DEGRADED") else "❌")
label = sys_name.replace("_", " ").title()
lines.append(f"{icon} **{label}**: {status}")
# Add sub-details for key systems
if isinstance(sys_data, dict):
if "adapters_loaded" in sys_data:
lines.append(f" └ {sys_data['adapters_loaded']} adapters: {', '.join(sys_data.get('adapters', []))}")
if "components" in sys_data:
for comp, cdata in sys_data["components"].items():
cstatus = cdata.get("status", "?") if isinstance(cdata, dict) else str(cdata)
cicon = "✅" if cstatus == "OK" else "❌"
comp_label = comp.replace("_", " ").title()
detail = ""
if isinstance(cdata, dict):
if "memories" in cdata:
detail = f" ({cdata['memories']} memories)"
elif "audit_entries" in cdata:
detail = f" ({cdata['audit_entries']} audit entries)"
elif "stored_cocoons" in cdata:
detail = f" ({cdata['stored_cocoons']} cocoons)"
lines.append(f" {cicon} {comp_label}{detail}")
if "subsystems" in sys_data:
for sub, sstatus in sys_data["subsystems"].items():
sicon = "✅" if sstatus == "OK" else "❌"
lines.append(f" {sicon} {sub}")
if "spiderweb_metrics" in sys_data:
sm = sys_data["spiderweb_metrics"]
lines.append(f" └ Coherence: {sm.get('phase_coherence', 0):.4f}, Entropy: {sm.get('entropy', 0):.4f}, Nodes: {sm.get('node_count', 0)}, Attractors: {sm.get('attractor_count', 0)}, Glyphs: {sm.get('glyph_count', 0)}")
if "behavior_lessons" in sys_data:
lines.append(f" └ {sys_data['behavior_lessons']} learned behaviors, {sys_data['permanent_locks']} permanent locks")
if "alive" in sys_data:
lines.append(f" └ {sys_data['alive']}/{sys_data['total']} alive, {sys_data.get('pending_requests', 0)} pending")
if health.get("warnings"):
lines.append(f"\n⚠️ **Warnings**: {', '.join(health['warnings'])}")
if health.get("errors"):
lines.append(f"\n❌ **Errors**: {', '.join(health['errors'])}")
return {
"response": "\n".join(lines),
"adapter": "self_diagnostic",
"tokens": 0,
"time": round(time.time() - start_time, 2),
"phase6_used": True,
"reasoning": "Real self-diagnostic (not LLM-generated)",
"health": health,
}
except Exception as e:
pass # Fall through to normal LLM generation
# Ethical query validation (from original framework)
if self.forge and hasattr(self.forge, 'ethical_governance') and self.forge.ethical_governance:
try:
qv = self.forge.ethical_governance.validate_query(query)
if not qv["valid"]:
return {
"response": "I can't help with that request. " + "; ".join(qv.get("suggestions", [])),
"adapter": "ethical_block",
"tokens": 0,
"phase6_used": True,
"reasoning": "Blocked by EthicalAIGovernance",
}
except Exception:
pass # Non-critical, continue
# If adapter forced or Phase 6 disabled, use orchestrator directly
if adapter or not self.use_phase6:
result = self.orchestrator.route_and_generate(
query,
max_adapters=max_adapters,
strategy="keyword",
force_adapter=adapter,
)
result["phase6_used"] = False
return result
# Store governor budgets for forge access
self._memory_budget = memory_budget
self._max_response_tokens = max_response_tokens
# Try Phase 6 route first
try:
return self._generate_with_phase6(query, max_adapters)
except Exception as e:
if self.verbose:
print(f"[PHASE6] Error: {e} - falling back to orchestrator")
# Fallback to orchestrator
result = self.orchestrator.route_and_generate(
query,
max_adapters=max_adapters,
strategy="keyword",
force_adapter=None,
)
result["phase6_used"] = False
result["phase6_fallback_reason"] = str(e)
return result
def _generate_with_phase6(self, query: str, max_adapters: int) -> Dict:
"""Generate using orchestrator LLM with Phase 6/7 routing and classification.
All complexity levels use the orchestrator for actual LLM inference.
Phase 6 adds query classification and domain routing.
Phase 7 adds executive routing metadata.
"""
start_time = time.time()
# 1. Classify query complexity (Phase 6)
complexity = self.classifier.classify(query)
if self.verbose:
print(f"[PHASE6] Query complexity: {complexity}", flush=True)
# 2. Route with Phase 7 Executive Controller
route_decision = None
if self.use_phase7 and self.executive_controller:
route_decision = self.executive_controller.route_query(query, complexity)
if self.verbose:
print(f"[PHASE7] Route: {','.join([k for k, v in route_decision.component_activation.items() if v])}", flush=True)
# 3. Domain classification for adapter routing
domain = self._classify_domain(query)
# 4. Determine adapter count based on complexity
if complexity == QueryComplexity.SIMPLE:
effective_max_adapters = 1
elif complexity == QueryComplexity.MEDIUM:
effective_max_adapters = min(max_adapters, 2)
else:
effective_max_adapters = max_adapters
# 4.5 SUBSTRATE-AWARE ROUTING — adjust based on system pressure
substrate_adjustments = []
if self.health_router:
original_complexity = complexity
original_max = effective_max_adapters
complexity, effective_max_adapters, substrate_adjustments = \
self.health_router.adjust_routing(complexity, effective_max_adapters)
if substrate_adjustments:
for adj in substrate_adjustments:
print(f" [SUBSTRATE] {adj}", flush=True)
if self.verbose:
print(f"[PHASE6] Domain: {domain}, max_adapters: {effective_max_adapters}", flush=True)
# 5. Generate via orchestrator (actual LLM inference)
result = self.orchestrator.route_and_generate(
query,
max_adapters=effective_max_adapters,
strategy="keyword",
force_adapter=None,
)
elapsed = time.time() - start_time
# 6. Add Phase 6/7 metadata
result["phase6_used"] = True
result["phase7_used"] = self.use_phase7 and self.executive_controller is not None
result["complexity"] = str(complexity)
result["domain"] = domain
if route_decision:
try:
route_metadata = ExecutiveController.create_route_metadata(
route_decision,
actual_latency_ms=elapsed * 1000,
actual_conflicts=0,
gamma=0.95
)
result.update(route_metadata)
except Exception as e:
if self.verbose:
print(f"[PHASE7] Metadata error: {e}", flush=True)
result["reasoning"] = f"Phase 6: {complexity.name} complexity, {domain} domain"
# EMPTY RESPONSE FALLBACK: If synthesis returned nothing, use best perspective
if not result.get("response", "").strip() and result.get("perspectives"):
perspectives = result["perspectives"]
if isinstance(perspectives, dict) and perspectives:
# Pick the longest perspective as fallback
best = max(perspectives.values(), key=lambda v: len(str(v)))
result["response"] = str(best)
result["reasoning"] += " | fallback: used best perspective (synthesis was empty)"
print(f" [FALLBACK] Synthesis empty — using best perspective ({len(result['response'])} chars)", flush=True)
elif isinstance(perspectives, str) and perspectives.strip():
result["response"] = perspectives
result["reasoning"] += " | fallback: used raw perspectives"
# Store reasoning exchange in CognitionCocooner (from original framework)
# Now enriched with substrate state — every cocoon knows the conditions
# under which it was created (pressure, memory, trend)
response_text = result.get("response", "")
if response_text and self.forge and hasattr(self.forge, 'cocooner') and self.forge.cocooner:
try:
cocoon_meta = {"complexity": str(complexity), "domain": domain}
if substrate_adjustments:
cocoon_meta["substrate_adjustments"] = substrate_adjustments
# Enrich with real-time system state
if self.cocoon_enricher:
cocoon_meta = self.cocoon_enricher.enrich(cocoon_meta)
self.forge.cocooner.wrap_reasoning(
query=query,
response=response_text,
adapter=str(result.get("adapter", "unknown")),
metadata=cocoon_meta
)
except Exception:
pass # Non-critical
# Record inference timing for substrate monitor
if self.substrate_monitor:
self.substrate_monitor.record_inference(elapsed * 1000)
# 8. Apply directness discipline — trim filler, enforce intent anchoring
response_text = result.get("response", "")
if response_text:
result["response"] = self._apply_directness(response_text, query)
# 9. Enforce user constraints (word limits, sentence limits, etc.)
try:
from codette_orchestrator import extract_constraints, enforce_constraints
constraints = extract_constraints(query)
if constraints:
result["response"] = enforce_constraints(result["response"], constraints)
result["constraints_applied"] = constraints
except ImportError:
pass
# 10. PERMANENT LOCKS: Universal self-check on EVERY response
try:
from self_correction import universal_self_check
result["response"], lock_issues = universal_self_check(result["response"])
if lock_issues:
result["lock_fixes"] = lock_issues
except ImportError:
pass
if self.verbose:
resp_len = len(result.get("response", ""))
print(f"[PHASE6] Done: {resp_len} chars, {result.get('tokens', 0)} tokens", flush=True)
return result
def _apply_directness(self, response: str, query: str) -> str:
"""Self-critique loop: trim filler, cut abstraction padding, anchor to user intent.
Rules:
1. Strip preamble phrases ("That's a great question!", "Let me explain...", etc.)
2. Remove trailing abstraction filler ("In conclusion", "Overall", vague wrap-ups)
3. Collapse excessive whitespace
"""
# Strip common LLM preamble patterns
preamble_patterns = [
r"^(?:That(?:'s| is) (?:a |an )?(?:great|good|interesting|excellent|fantastic|wonderful|fascinating) question[.!]?\s*)",
r"^(?:What a (?:great|good|interesting|excellent|fascinating) question[.!]?\s*)",
r"^(?:I(?:'d| would) (?:be happy|love) to (?:help|explain|answer)[.!]?\s*)",
r"^(?:Let me (?:explain|break (?:this|that) down|think about (?:this|that))[.!]?\s*)",
r"^(?:Great question[.!]?\s*)",
r"^(?:Thank you for (?:asking|your question)[.!]?\s*)",
r"^(?:Absolutely[.!]?\s*)",
r"^(?:Of course[.!]?\s*)",
r"^(?:Sure(?:thing)?[.!]?\s*)",
]
for pat in preamble_patterns:
response = re.sub(pat, "", response, count=1, flags=re.IGNORECASE)
# Strip trailing abstraction filler (vague concluding paragraphs)
trailing_patterns = [
r"\n\n(?:In (?:conclusion|summary|the end),?\s+.{0,200})$",
r"\n\n(?:Overall,?\s+.{0,150})$",
r"\n\n(?:(?:I )?hope (?:this|that) helps[.!]?\s*)$",
r"\n\n(?:Let me know if (?:you (?:have|need|want)|there(?:'s| is)) .{0,100})$",
r"\n\n(?:Feel free to .{0,100})$",
]
for pat in trailing_patterns:
response = re.sub(pat, "", response, count=1, flags=re.IGNORECASE)
# Collapse excessive whitespace (more than 2 newlines)
response = re.sub(r'\n{3,}', '\n\n', response)
return response.strip()
def _classify_domain(self, query: str) -> str:
"""Classify query domain (physics, ethics, consciousness, creativity, systems)."""
query_lower = query.lower()
# Domain keywords
domains = {
"physics": ["force", "energy", "velocity", "gravity", "motion", "light", "speed",
"particle", "entropy", "time arrow", "quantum", "physics"],
"ethics": ["moral", "right", "wrong", "should", "ethical", "justice", "fair",
"duty", "consequence", "utilitarian", "virtue", "ethics", "lie", "save"],
"consciousness": ["conscious", "awareness", "qualia", "mind", "experience",
"subjective", "hard problem", "zombie", "consciousness"],
"creativity": ["creative", "creative", "art", "invention", "novel", "design",
"imagination", "innovation", "beautiful"],
"systems": ["system", "emerge", "feedback", "loop", "complex", "agent", "adapt",
"network", "evolution", "architecture", "free will"],
}
for domain, keywords in domains.items():
if any(kw in query_lower for kw in keywords):
return domain
return "general"