| |
| """Substrate-Aware Cognition — Codette adjusts reasoning based on her own system state. |
| |
| Like biological cognition: we don't reason the same way when exhausted. |
| Codette now modulates her behavior based on real-time resource state. |
| |
| Three systems: |
| 1. SubstrateMonitor — measures CPU, memory, inference load, adapter health |
| 2. HealthAwareRouter — adjusts Phase 6/7 routing based on system pressure |
| 3. CocoonStateEnricher — stamps system state onto every cocoon memory |
| |
| Usage: |
| monitor = SubstrateMonitor() |
| state = monitor.snapshot() |
| # state = {"pressure": 0.65, "level": "moderate", ...} |
| |
| router = HealthAwareRouter(monitor) |
| adjusted = router.adjust_routing(complexity, max_adapters) |
| # Under pressure: COMPLEX -> MEDIUM, max_adapters 3 -> 2 |
| |
| enricher = CocoonStateEnricher(monitor) |
| metadata = enricher.enrich(existing_metadata) |
| # Adds: {"substrate": {"pressure": 0.65, "memory_pct": 78, ...}} |
| """ |
|
|
| import os |
| import time |
| import psutil |
| from typing import Dict, Optional, Tuple |
| from collections import deque |
|
|
|
|
| class SubstrateMonitor: |
| """Real-time system state measurement. |
| |
| Measures actual hardware/software state — not estimates. |
| Returns a pressure score (0.0 = idle, 1.0 = maxed out). |
| """ |
|
|
| def __init__(self): |
| self._history = deque(maxlen=50) |
| self._inference_times = deque(maxlen=20) |
| self._adapter_violations = {} |
| self._last_snapshot = None |
| self._last_snapshot_time = 0 |
| self._cache_ttl = 2.0 |
|
|
| def snapshot(self) -> Dict: |
| """Take a full system state snapshot. |
| |
| Returns dict with: |
| pressure: float 0-1 (overall system pressure) |
| level: str (idle/low/moderate/high/critical) |
| memory_pct: float (RAM usage percentage) |
| memory_available_gb: float |
| cpu_pct: float (CPU usage percentage) |
| process_memory_gb: float (this Python process) |
| inference_avg_ms: float (recent average inference time) |
| adapter_violation_rate: float (0-1, how often adapters violate constraints) |
| timestamp: float |
| """ |
| now = time.time() |
|
|
| |
| if self._last_snapshot and (now - self._last_snapshot_time) < self._cache_ttl: |
| return self._last_snapshot |
|
|
| try: |
| vm = psutil.virtual_memory() |
| memory_pct = vm.percent |
| memory_available_gb = vm.available / (1024 ** 3) |
| except Exception: |
| memory_pct = 0.0 |
| memory_available_gb = 16.0 |
|
|
| try: |
| cpu_pct = psutil.cpu_percent(interval=0.1) |
| except Exception: |
| cpu_pct = 0.0 |
|
|
| try: |
| proc = psutil.Process(os.getpid()) |
| process_memory_gb = proc.memory_info().rss / (1024 ** 3) |
| except Exception: |
| process_memory_gb = 0.0 |
|
|
| |
| if self._inference_times: |
| inference_avg_ms = sum(self._inference_times) / len(self._inference_times) |
| else: |
| inference_avg_ms = 0.0 |
|
|
| |
| total_violations = sum(self._adapter_violations.values()) |
| total_inferences = max(len(self._inference_times), 1) |
| violation_rate = min(1.0, total_violations / (total_inferences * 2)) |
|
|
| |
| pressure = self._compute_pressure( |
| memory_pct, cpu_pct, process_memory_gb, |
| inference_avg_ms, violation_rate |
| ) |
|
|
| |
| if pressure < 0.2: |
| level = "idle" |
| elif pressure < 0.4: |
| level = "low" |
| elif pressure < 0.6: |
| level = "moderate" |
| elif pressure < 0.8: |
| level = "high" |
| else: |
| level = "critical" |
|
|
| snapshot = { |
| "pressure": round(pressure, 3), |
| "level": level, |
| "memory_pct": round(memory_pct, 1), |
| "memory_available_gb": round(memory_available_gb, 2), |
| "cpu_pct": round(cpu_pct, 1), |
| "process_memory_gb": round(process_memory_gb, 2), |
| "inference_avg_ms": round(inference_avg_ms, 1), |
| "adapter_violation_rate": round(violation_rate, 3), |
| "timestamp": now, |
| } |
|
|
| self._history.append(snapshot) |
| self._last_snapshot = snapshot |
| self._last_snapshot_time = now |
|
|
| return snapshot |
|
|
| def record_inference(self, duration_ms: float): |
| """Record an inference duration for trend tracking.""" |
| self._inference_times.append(duration_ms) |
|
|
| def record_violation(self, adapter: str): |
| """Record a constraint violation for an adapter.""" |
| self._adapter_violations[adapter] = self._adapter_violations.get(adapter, 0) + 1 |
|
|
| def get_adapter_health(self) -> Dict[str, float]: |
| """Get per-adapter health scores based on violation history. |
| |
| Returns: {adapter_name: health_score} where 1.0 = perfect, 0.0 = always violating |
| """ |
| if not self._adapter_violations: |
| return {} |
|
|
| max_v = max(self._adapter_violations.values()) if self._adapter_violations else 1 |
| return { |
| adapter: round(1.0 - (count / max(max_v * 2, 1)), 3) |
| for adapter, count in self._adapter_violations.items() |
| } |
|
|
| def trend(self) -> str: |
| """Return pressure trend: rising, falling, or stable.""" |
| if len(self._history) < 3: |
| return "stable" |
|
|
| recent = [s["pressure"] for s in list(self._history)[-5:]] |
| if len(recent) < 2: |
| return "stable" |
|
|
| delta = recent[-1] - recent[0] |
| if delta > 0.1: |
| return "rising" |
| elif delta < -0.1: |
| return "falling" |
| return "stable" |
|
|
| def _compute_pressure(self, mem_pct, cpu_pct, proc_mem_gb, |
| inference_avg_ms, violation_rate) -> float: |
| """Weighted composite pressure score. |
| |
| Weights reflect what actually impacts Codette's reasoning quality: |
| - Memory is king (model + adapters live in RAM) |
| - Inference time indicates GPU/CPU saturation |
| - Violation rate indicates adapter instability |
| """ |
| |
| mem_p = min(1.0, max(0.0, (mem_pct - 40) / 55)) |
|
|
| |
| proc_p = min(1.0, max(0.0, (proc_mem_gb - 3.0) / 5.0)) |
|
|
| |
| cpu_p = min(1.0, cpu_pct / 100.0) |
|
|
| |
| inf_p = min(1.0, max(0.0, (inference_avg_ms - 5000) / 55000)) |
|
|
| |
| pressure = ( |
| 0.35 * mem_p + |
| 0.20 * proc_p + |
| 0.15 * cpu_p + |
| 0.15 * inf_p + |
| 0.15 * violation_rate |
| ) |
|
|
| return min(1.0, max(0.0, pressure)) |
|
|
|
|
| class HealthAwareRouter: |
| """Adjusts Phase 6/7 routing decisions based on system health. |
| |
| Under pressure: |
| - Downgrade COMPLEX -> MEDIUM (fewer adapters, less memory) |
| - Reduce max_adapters |
| - Prefer adapters with better health scores |
| - Skip debate rounds |
| |
| Under low load: |
| - Allow full COMPLEX debate |
| - Extra adapters welcome |
| """ |
|
|
| def __init__(self, monitor: SubstrateMonitor): |
| self.monitor = monitor |
|
|
| def adjust_routing(self, complexity, max_adapters: int) -> Tuple: |
| """Adjust routing based on current system pressure. |
| |
| Args: |
| complexity: QueryComplexity enum |
| max_adapters: Requested max adapters |
| |
| Returns: |
| (adjusted_complexity, adjusted_max_adapters, adjustments_made: list) |
| """ |
| state = self.monitor.snapshot() |
| pressure = state["pressure"] |
| level = state["level"] |
| adjustments = [] |
|
|
| |
| try: |
| from reasoning_forge.query_classifier import QueryComplexity |
| except ImportError: |
| return complexity, max_adapters, [] |
|
|
| adjusted_complexity = complexity |
| adjusted_max = max_adapters |
|
|
| if level == "critical": |
| |
| if complexity != QueryComplexity.SIMPLE: |
| adjusted_complexity = QueryComplexity.SIMPLE |
| adjustments.append(f"DOWNGRADED {complexity.name}->SIMPLE (pressure={pressure:.2f}, critical)") |
| adjusted_max = 1 |
| adjustments.append("max_adapters->1 (critical pressure)") |
|
|
| elif level == "high": |
| |
| if complexity == QueryComplexity.COMPLEX: |
| adjusted_complexity = QueryComplexity.MEDIUM |
| adjustments.append(f"DOWNGRADED COMPLEX->MEDIUM (pressure={pressure:.2f})") |
| adjusted_max = min(adjusted_max, 2) |
| if adjusted_max < max_adapters: |
| adjustments.append(f"max_adapters {max_adapters}->{adjusted_max} (high pressure)") |
|
|
| elif level == "moderate": |
| |
| if complexity == QueryComplexity.COMPLEX: |
| adjusted_max = min(adjusted_max, 2) |
| if adjusted_max < max_adapters: |
| adjustments.append(f"max_adapters {max_adapters}->{adjusted_max} (moderate pressure)") |
|
|
| |
|
|
| return adjusted_complexity, adjusted_max, adjustments |
|
|
| def rank_adapters(self, candidates: list) -> list: |
| """Re-rank adapter candidates based on health scores. |
| |
| Adapters with more constraint violations get ranked lower. |
| """ |
| health = self.monitor.get_adapter_health() |
| if not health: |
| return candidates |
|
|
| def score(adapter_name): |
| return health.get(adapter_name, 1.0) |
|
|
| return sorted(candidates, key=score, reverse=True) |
|
|
| def should_skip_debate(self) -> bool: |
| """Under high pressure, skip multi-round debate entirely.""" |
| state = self.monitor.snapshot() |
| return state["level"] in ("high", "critical") |
|
|
|
|
| class CocoonStateEnricher: |
| """Stamps system state onto cocoon memories. |
| |
| Every cocoon now knows the conditions under which it was created: |
| - pressure level, memory usage, inference speed |
| - adapter health at time of storage |
| |
| Future sessions can weight cocoons by reliability: |
| - Stressed cocoons (high pressure) get less trust |
| - Stable cocoons (low pressure, no violations) get more trust |
| """ |
|
|
| def __init__(self, monitor: SubstrateMonitor): |
| self.monitor = monitor |
|
|
| def enrich(self, metadata: Optional[Dict] = None) -> Dict: |
| """Add substrate state to cocoon metadata. |
| |
| Args: |
| metadata: Existing metadata dict (will be extended) |
| |
| Returns: |
| Enriched metadata with 'substrate' key |
| """ |
| if metadata is None: |
| metadata = {} |
|
|
| state = self.monitor.snapshot() |
|
|
| metadata["substrate"] = { |
| "pressure": state["pressure"], |
| "level": state["level"], |
| "memory_pct": state["memory_pct"], |
| "process_memory_gb": state["process_memory_gb"], |
| "inference_avg_ms": state["inference_avg_ms"], |
| "trend": self.monitor.trend(), |
| "timestamp": state["timestamp"], |
| } |
|
|
| return metadata |
|
|
| @staticmethod |
| def cocoon_reliability(cocoon_metadata: Dict) -> float: |
| """Score a cocoon's reliability based on substrate conditions when created. |
| |
| Returns: |
| 0.0-1.0 reliability score |
| 1.0 = created under ideal conditions (low pressure, stable) |
| 0.0 = created under critical pressure (unreliable) |
| """ |
| substrate = cocoon_metadata.get("substrate", {}) |
| if not substrate: |
| return 0.7 |
|
|
| pressure = substrate.get("pressure", 0.5) |
| trend = substrate.get("trend", "stable") |
|
|
| |
| reliability = 1.0 - pressure |
|
|
| |
| if trend == "stable": |
| reliability = min(1.0, reliability + 0.05) |
| elif trend == "rising": |
| reliability = max(0.0, reliability - 0.1) |
|
|
| return round(reliability, 3) |
|
|