Codette-Reasoning / reasoning_forge /behavior_governor.py
Jonathan Harrison
Full Codette codebase sync β€” transparency release
74f2af5
"""
Codette Behavior Governor v3 β€” Executive Controller with Self-Learning
======================================================================
Evolved from Phase 7 Executive Controller into a full behavior regulation layer.
The Governor sits above the consciousness stack and enforces four things:
1. IDENTITY RULES β€” validates identity claims with confidence decay/recovery
2. MEMORY vs TASK BALANCE β€” prevents memory from overwhelming the task
3. COGNITIVE LOAD MANAGEMENT β€” adaptive compression/expansion based on complexity
4. SELF-LEARNING β€” adjusts budgets based on success/failure feedback
Identity confidence model:
- Decays over time (half-life ~30 minutes of inactivity)
- Reinforced through positive interaction signals
- Contradiction detection resets to partial
- Prevents hallucinated familiarity while maintaining real relationships
Architecture position: Layer 0 (pre-stack) + Layer 7.5 (post-stack validation)
Author: Jonathan Harrison (Raiff's Bits LLC)
"""
import time
import math
import logging
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
# Identity confidence decay half-life in seconds (30 minutes)
CONFIDENCE_HALF_LIFE = 1800.0
# Minimum confidence floor (never fully forgets a confirmed identity)
CONFIDENCE_FLOOR = 0.15
# Reinforcement boost per positive interaction
CONFIDENCE_REINFORCE = 0.12
# Contradiction penalty
CONFIDENCE_CONTRADICTION_PENALTY = 0.4
@dataclass
class GovernorDecision:
"""Output of a governor evaluation."""
action: str = "allow" # "allow", "constrain", "redirect", "block"
confidence: float = 1.0 # Governor's confidence in this decision
memory_budget: int = 3 # Max cocoons to inject (0 = none)
identity_budget: str = "full" # "full", "partial", "none"
max_response_tokens: int = 512 # Adaptive response length
compression_level: str = "normal" # "compressed", "normal", "expanded"
identity_confidence: float = 0.0 # Current identity confidence after decay
warnings: List[str] = field(default_factory=list)
reasoning: str = ""
class BehaviorGovernor:
"""
Executive behavior regulation layer with self-learning.
Runs BEFORE the consciousness stack (pre-routing) and AFTER
(post-generation validation). Enforces behavioral integrity.
Four regulation domains:
1. Identity governance β€” validates with confidence decay/recovery
2. Memory governance β€” balances memory injection vs task focus
3. Cognitive load governance β€” adaptive depth based on query complexity
4. Self-learning β€” adjusts budgets based on outcome feedback
"""
def __init__(self, identity_anchor=None, substrate_monitor=None):
self.identity_anchor = identity_anchor
self.substrate_monitor = substrate_monitor
# Tracking
self.decisions: List[Dict] = []
self.answer_detection_failures: int = 0
self.total_evaluations: int = 0
# Cognitive load state
self._recent_complexities: List[str] = [] # Last N query complexities
self._consecutive_complex: int = 0
# ── Identity Confidence State ──
# Maps identity_id -> {confidence, last_interaction, peak_confidence}
self._identity_state: Dict[str, Dict] = {}
# ── Self-Learning State ──
# Tracks what worked and what didn't to adapt budgets
self._domain_success: Dict[str, List[bool]] = {} # domain -> [success, success, ...]
self._complexity_token_history: Dict[str, List[int]] = {} # complexity -> [actual_tokens_used]
self._memory_budget_adjustments: Dict[str, float] = {} # domain -> adjustment factor
# ─────────────────────────────────────────────────────────
# IDENTITY CONFIDENCE DECAY/RECOVERY
# ─────────────────────────────────────────────────────────
def get_decayed_confidence(self, identity_id: str,
raw_confidence: float) -> float:
"""
Apply time-based decay to identity confidence.
Confidence decays with a half-life of 30 minutes of inactivity.
Each interaction reinforces confidence back up.
This prevents Codette from claiming familiarity with someone
she hasn't interacted with recently, while maintaining real
relationships through active reinforcement.
"""
now = time.time()
state = self._identity_state.get(identity_id)
if state is None:
# First time seeing this identity β€” initialize from raw signal
self._identity_state[identity_id] = {
"confidence": raw_confidence,
"last_interaction": now,
"peak_confidence": raw_confidence,
"interaction_count": 1,
}
return raw_confidence
# Apply time-based decay since last interaction
elapsed = now - state["last_interaction"]
if elapsed > 0:
decay_factor = math.pow(0.5, elapsed / CONFIDENCE_HALF_LIFE)
decayed = state["confidence"] * decay_factor
# Floor: never fully forget a confirmed identity
decayed = max(CONFIDENCE_FLOOR, decayed)
else:
decayed = state["confidence"]
# Reinforcement: raw_confidence > 0 means positive identity signal
if raw_confidence > 0.3:
decayed = min(1.0, decayed + CONFIDENCE_REINFORCE)
# Update state
state["confidence"] = decayed
state["last_interaction"] = now
state["peak_confidence"] = max(state["peak_confidence"], decayed)
state["interaction_count"] = state.get("interaction_count", 0) + 1
return decayed
def detect_identity_contradiction(self, identity_id: str,
query: str) -> bool:
"""
Detect contradictions in identity claims.
Returns True if the query contradicts a stored identity,
which triggers a confidence penalty.
"""
contradiction_signals = [
"i'm not ", "i am not ", "wrong person", "you don't know me",
"we haven't met", "first time here", "never talked",
"who do you think i am", "that's not me", "that wasn't me",
"you're confusing me", "different person",
]
query_lower = query.lower()
is_contradiction = any(s in query_lower for s in contradiction_signals)
if is_contradiction and identity_id in self._identity_state:
state = self._identity_state[identity_id]
state["confidence"] = max(
CONFIDENCE_FLOOR,
state["confidence"] - CONFIDENCE_CONTRADICTION_PENALTY
)
logger.info(
f"[GOVERNOR] Identity contradiction detected for {identity_id}, "
f"confidence reduced to {state['confidence']:.2f}"
)
return is_contradiction
# ─────────────────────────────────────────────────────────
# PRE-STACK: Evaluate before reasoning begins
# ─────────────────────────────────────────────────────────
def pre_evaluate(self, query: str, classification: Dict,
identity_confidence: float = 0.0,
identity_id: str = "unknown",
substrate_pressure: float = 0.0) -> GovernorDecision:
"""
Pre-stack evaluation. Determines:
- How much memory to inject
- Whether identity context should be included
- Response length budget
- Compression level
This is the internalized control loop β€” Codette self-regulates
before generating, not after.
"""
self.total_evaluations += 1
decision = GovernorDecision()
complexity = classification.get("complexity", "MEDIUM")
domain = classification.get("domain", "general")
# ── 0. Identity contradiction check ──
self.detect_identity_contradiction(identity_id, query)
# ── 1. Identity Governance (with decay/recovery) ──
effective_confidence = self.get_decayed_confidence(
identity_id, identity_confidence
)
decision.identity_confidence = effective_confidence
decision.identity_budget = self._evaluate_identity_budget(
effective_confidence, query
)
# ── 2. Memory Governance (with self-learning adjustments) ──
decision.memory_budget = self._evaluate_memory_budget(
complexity, domain, query, substrate_pressure
)
# ── 3. Cognitive Load Governance (with learned token sizing) ──
decision.max_response_tokens, decision.compression_level = \
self._evaluate_cognitive_load(complexity, substrate_pressure)
# ── 4. Track complexity pattern ──
self._recent_complexities.append(complexity)
if len(self._recent_complexities) > 10:
self._recent_complexities.pop(0)
if complexity == "COMPLEX":
self._consecutive_complex += 1
else:
self._consecutive_complex = 0
# Fatigue detection: too many complex queries in a row
if self._consecutive_complex >= 4:
decision.warnings.append(
"Cognitive fatigue risk: 4+ complex queries in sequence. "
"Consider simplifying responses to maintain quality."
)
decision.compression_level = "compressed"
decision.max_response_tokens = min(decision.max_response_tokens, 400)
decision.reasoning = (
f"identity={decision.identity_budget} "
f"(conf={effective_confidence:.2f}), "
f"memory={decision.memory_budget} cocoons, "
f"tokens={decision.max_response_tokens}, "
f"compression={decision.compression_level}"
)
# Log decision
self.decisions.append({
"timestamp": time.time(),
"phase": "pre",
"complexity": complexity,
"domain": domain,
"identity_confidence": round(effective_confidence, 3),
"decision": decision.reasoning,
})
if len(self.decisions) > 100:
self.decisions.pop(0)
logger.info(f"[GOVERNOR] Pre: {decision.reasoning}")
return decision
def _evaluate_identity_budget(self, confidence: float, query: str) -> str:
"""
Determine how much identity context to inject.
Rules:
- High confidence (>0.8): full context (name, relationship, history)
- Medium confidence (0.4-0.8): partial (name only, with caveat)
- Low confidence (<0.4): none β€” don't pretend to know someone
- If query contains identity denial: force none
"""
denial_patterns = [
"i'm not ", "i am not ", "wrong person", "you don't know me",
"first time", "we haven't met",
]
if any(p in query.lower() for p in denial_patterns):
return "none"
if confidence >= 0.8:
return "full"
elif confidence >= 0.4:
return "partial"
else:
return "none"
def _evaluate_memory_budget(self, complexity: str, domain: str,
query: str, pressure: float) -> int:
"""
Determine how many cocoons to inject.
Balance: memory provides continuity, but too much drowns the task.
Rules:
- SIMPLE queries: 1 cocoon max (don't overwhelm a simple question)
- MEDIUM queries: 2-3 cocoons
- COMPLEX queries: 3-5 cocoons (need context for depth)
- High substrate pressure: reduce by 1 (save context space)
- Short queries (<5 words): 0 (probably a greeting or command)
- Self-learning: adjust based on past success/failure per domain
"""
word_count = len(query.split())
if word_count < 5:
return 0 # Greetings, commands β€” no memory needed
base = {"SIMPLE": 1, "MEDIUM": 2, "COMPLEX": 4}.get(complexity, 2)
# Pressure reduction
if pressure > 0.7:
base = max(0, base - 1)
# Domain boost: music queries benefit more from memory
if domain == "music":
base = min(5, base + 1)
# Self-learning adjustment: if this domain has a learned offset, apply it
adj = self._memory_budget_adjustments.get(domain, 0.0)
if adj != 0:
base = max(0, min(5, round(base + adj)))
return base
def _evaluate_cognitive_load(self, complexity: str,
pressure: float) -> Tuple[int, str]:
"""
Adaptive response length and compression level.
This is the dynamic compression vs expansion from the review:
- Simple β†’ compressed (concise, 200 tokens)
- Medium β†’ normal (balanced, 400 tokens)
- Complex β†’ expanded (thorough, 600 tokens)
- High pressure β†’ compress regardless (save resources)
"""
settings = {
"SIMPLE": (200, "compressed"),
"MEDIUM": (400, "normal"),
"COMPLEX": (600, "expanded"),
}
tokens, compression = settings.get(complexity, (400, "normal"))
# Substrate pressure override
if pressure > 0.8:
tokens = min(tokens, 300)
compression = "compressed"
elif pressure > 0.6:
tokens = min(tokens, 400)
return tokens, compression
# ─────────────────────────────────────────────────────────
# POST-STACK: Validate after reasoning completes
# ─────────────────────────────────────────────────────────
def post_validate(self, query: str, response: str,
decision: GovernorDecision) -> Dict:
"""
Post-generation validation. Checks:
1. Did we actually answer the question? (answer detection)
2. Did we violate length constraints?
3. Did we leak identity information that shouldn't be there?
4. Should we stop or continue?
"""
result = {
"valid": True,
"warnings": [],
"corrections": [],
}
# ── Answer detection ──
if not self._did_answer_question(query, response):
result["warnings"].append("Response may not directly answer the question.")
self.answer_detection_failures += 1
# ── Length validation ──
# Rough token estimate: ~4 chars per token
est_tokens = len(response) / 4
if est_tokens > decision.max_response_tokens * 1.5:
result["warnings"].append(
f"Response exceeded token budget ({est_tokens:.0f} est vs {decision.max_response_tokens} budget)."
)
# ── Identity leak detection ──
if decision.identity_budget == "none":
# Check if response accidentally claims to know the user
identity_leak_patterns = [
"as you know", "we've discussed", "last time we talked",
"you mentioned before", "our previous conversation",
"remember when you", "as your partner",
]
for pattern in identity_leak_patterns:
if pattern in response.lower():
result["warnings"].append(
f"Potential identity leak: '{pattern}' in response "
f"but identity_budget was 'none'."
)
result["corrections"].append("identity_leak")
# ── Completeness check (Behavioral Lock 3) ──
if response.endswith(("...", "β€”", "-", ",")):
result["warnings"].append("Response appears incomplete (Lock 4 violation).")
# Log
self.decisions.append({
"timestamp": time.time(),
"phase": "post",
"valid": result["valid"],
"warnings": len(result["warnings"]),
})
return result
def _did_answer_question(self, query: str, response: str) -> bool:
"""
Heuristic answer detection.
Checks if the response likely addresses the query rather than
being off-topic philosophical padding (Lock 1 enforcement).
"""
if not query or not response:
return False
# Extract significant query words
stop = {"the", "a", "an", "is", "are", "was", "what", "how", "why",
"when", "where", "who", "do", "does", "can", "could", "would",
"should", "will", "to", "of", "in", "for", "on", "with", "at",
"by", "and", "or", "but", "if", "it", "i", "you", "my", "your",
"this", "that", "me", "about", "from"}
query_words = set(
w.lower().strip(".,!?;:\"'") for w in query.split()
if len(w) > 2 and w.lower() not in stop
)
if not query_words:
return True # Greeting or command β€” any response is fine
response_lower = response.lower()
overlap = sum(1 for w in query_words if w in response_lower)
overlap_ratio = overlap / len(query_words) if query_words else 0
# At least 30% of query keywords should appear in response
return overlap_ratio >= 0.3
# ─────────────────────────────────────────────────────────
# SELF-LEARNING: Feedback from post-validation
# ─────────────────────────────────────────────────────────
def record_outcome(self, domain: str, complexity: str,
success: bool, actual_tokens: int = 0,
memory_budget_used: int = 0):
"""
Record the outcome of a generation for self-learning.
Called after post_validate β€” tells the governor whether the
response was good so it can adapt future budgets.
Self-learning rules:
- If responses in a domain consistently fail answer detection,
increase memory budget (more context might help)
- If responses consistently succeed with fewer tokens,
reduce token budget to stay concise
- Track actual token usage to calibrate future estimates
"""
# Track domain success rate
if domain not in self._domain_success:
self._domain_success[domain] = []
self._domain_success[domain].append(success)
# Keep last 20 outcomes per domain
if len(self._domain_success[domain]) > 20:
self._domain_success[domain].pop(0)
# Track token usage per complexity
if actual_tokens > 0:
if complexity not in self._complexity_token_history:
self._complexity_token_history[complexity] = []
self._complexity_token_history[complexity].append(actual_tokens)
if len(self._complexity_token_history[complexity]) > 20:
self._complexity_token_history[complexity].pop(0)
# Adapt memory budget: if success rate < 60%, boost memory by 0.5
# If success rate > 85%, reduce memory by 0.3 (less context needed)
outcomes = self._domain_success[domain]
if len(outcomes) >= 5:
success_rate = sum(outcomes) / len(outcomes)
if success_rate < 0.6:
self._memory_budget_adjustments[domain] = min(
2.0,
self._memory_budget_adjustments.get(domain, 0) + 0.5
)
logger.info(
f"[GOVERNOR] Self-learning: {domain} success rate "
f"{success_rate:.0%}, boosting memory budget"
)
elif success_rate > 0.85:
self._memory_budget_adjustments[domain] = max(
-1.0,
self._memory_budget_adjustments.get(domain, 0) - 0.3
)
def get_learned_token_budget(self, complexity: str) -> Optional[int]:
"""
Get learned token budget from actual usage history.
If we have enough data, use the 75th percentile of actual usage
as the budget (covers most cases without over-allocating).
"""
history = self._complexity_token_history.get(complexity, [])
if len(history) < 5:
return None # Not enough data to learn from
sorted_history = sorted(history)
p75_idx = int(len(sorted_history) * 0.75)
return sorted_history[p75_idx]
# ─────────────────────────────────────────────────────────
# DIAGNOSTICS
# ─────────────────────────────────────────────────────────
def get_state(self) -> Dict:
"""Return governor state for health checks / debugging."""
# Compute domain success rates
domain_rates = {}
for domain, outcomes in self._domain_success.items():
if outcomes:
domain_rates[domain] = round(sum(outcomes) / len(outcomes), 3)
# Compute identity confidence snapshot
identity_snapshot = {}
for ident_id, state in self._identity_state.items():
# Show decayed value (without reinforcing)
elapsed = time.time() - state["last_interaction"]
decay = math.pow(0.5, elapsed / CONFIDENCE_HALF_LIFE)
current = max(CONFIDENCE_FLOOR, state["confidence"] * decay)
identity_snapshot[ident_id] = {
"confidence": round(current, 3),
"peak": round(state["peak_confidence"], 3),
"interactions": state.get("interaction_count", 0),
"seconds_since_last": round(elapsed),
}
return {
"total_evaluations": self.total_evaluations,
"answer_detection_failures": self.answer_detection_failures,
"answer_detection_rate": (
1 - (self.answer_detection_failures / max(1, self.total_evaluations))
),
"consecutive_complex": self._consecutive_complex,
"recent_complexities": self._recent_complexities[-5:],
"decisions_logged": len(self.decisions),
"identity_confidence": identity_snapshot,
"domain_success_rates": domain_rates,
"memory_budget_adjustments": dict(self._memory_budget_adjustments),
"learned_token_budgets": {
c: self.get_learned_token_budget(c)
for c in self._complexity_token_history
},
}