| """ |
| Codette Unified Memory β SQLite + FTS5 Backed Cocoon Store |
| =========================================================== |
| |
| Consolidates three previously separate memory systems: |
| 1. CognitionCocooner (JSON files on disk) |
| 2. LivingMemoryKernel (in-memory MemoryCocoons) |
| 3. CodetteSession (SQLite conversation state) |
| |
| Into ONE system with: |
| - SQLite backing for persistence + ACID guarantees |
| - FTS5 full-text search for fast relevance matching (replaces O(n) file scan) |
| - In-memory LRU cache for hot cocoons |
| - Unified API for store/recall/search |
| - Migration from legacy JSON cocoons on first load |
| |
| Schema: |
| cocoons(id, query, response, adapter, domain, complexity, emotion, |
| importance, timestamp, metadata_json) |
| cocoons_fts(query, response) -- FTS5 virtual table |
| |
| Author: Jonathan Harrison (Raiff's Bits LLC) |
| """ |
|
|
| import json |
| import math |
| import sqlite3 |
| import time |
| import hashlib |
| import os |
| import logging |
| from pathlib import Path |
| from typing import Dict, List, Optional, Any, Tuple |
| from collections import OrderedDict |
|
|
| logger = logging.getLogger(__name__) |
|
|
| DB_DIR = Path(__file__).parent.parent / "data" |
| DB_PATH = DB_DIR / "codette_memory.db" |
| LEGACY_COCOON_DIR = Path(__file__).parent.parent / "cocoons" |
|
|
| |
| CACHE_MAX = 200 |
|
|
|
|
| class UnifiedMemory: |
| """ |
| Single source of truth for all Codette memory. |
| |
| Replaces CognitionCocooner + LivingMemoryKernel + session memory |
| with one SQLite-backed store using FTS5 for fast relevance search. |
| """ |
|
|
| def __init__(self, db_path: Optional[Path] = None, |
| legacy_dir: Optional[Path] = None): |
| self.db_path = db_path or DB_PATH |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) |
| self.legacy_dir = legacy_dir or LEGACY_COCOON_DIR |
|
|
| |
| self._cache: OrderedDict = OrderedDict() |
|
|
| |
| self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False) |
| self._conn.row_factory = sqlite3.Row |
| self._init_schema() |
|
|
| |
| self._total_stored = self._count() |
| self._cache_hits = 0 |
| self._cache_misses = 0 |
|
|
| |
| if self._total_stored == 0 and self.legacy_dir.exists(): |
| self._migrate_legacy() |
|
|
| logger.info(f"UnifiedMemory: {self._total_stored} cocoons in {self.db_path}") |
|
|
| def _init_schema(self): |
| """Create tables and FTS5 index if they don't exist.""" |
| cur = self._conn.cursor() |
|
|
| cur.execute(""" |
| CREATE TABLE IF NOT EXISTS cocoons ( |
| id TEXT PRIMARY KEY, |
| query TEXT NOT NULL, |
| response TEXT NOT NULL, |
| adapter TEXT DEFAULT 'unknown', |
| domain TEXT DEFAULT 'general', |
| complexity TEXT DEFAULT 'MEDIUM', |
| emotion TEXT DEFAULT 'neutral', |
| importance INTEGER DEFAULT 7, |
| timestamp REAL NOT NULL, |
| metadata_json TEXT DEFAULT '{}' |
| ) |
| """) |
|
|
| |
| cur.execute(""" |
| CREATE VIRTUAL TABLE IF NOT EXISTS cocoons_fts |
| USING fts5(query, response, content='cocoons', content_rowid='rowid') |
| """) |
|
|
| |
| cur.execute(""" |
| CREATE TRIGGER IF NOT EXISTS cocoons_ai AFTER INSERT ON cocoons BEGIN |
| INSERT INTO cocoons_fts(rowid, query, response) |
| VALUES (new.rowid, new.query, new.response); |
| END |
| """) |
| cur.execute(""" |
| CREATE TRIGGER IF NOT EXISTS cocoons_ad AFTER DELETE ON cocoons BEGIN |
| INSERT INTO cocoons_fts(cocoons_fts, rowid, query, response) |
| VALUES ('delete', old.rowid, old.query, old.response); |
| END |
| """) |
| cur.execute(""" |
| CREATE TRIGGER IF NOT EXISTS cocoons_au AFTER UPDATE ON cocoons BEGIN |
| INSERT INTO cocoons_fts(cocoons_fts, rowid, query, response) |
| VALUES ('delete', old.rowid, old.query, old.response); |
| INSERT INTO cocoons_fts(rowid, query, response) |
| VALUES (new.rowid, new.query, new.response); |
| END |
| """) |
|
|
| |
| cur.execute(""" |
| CREATE INDEX IF NOT EXISTS idx_cocoons_timestamp |
| ON cocoons(timestamp DESC) |
| """) |
|
|
| |
| cur.execute(""" |
| CREATE INDEX IF NOT EXISTS idx_cocoons_adapter |
| ON cocoons(adapter) |
| """) |
|
|
| self._conn.commit() |
|
|
| def _count(self) -> int: |
| """Count total cocoons in database.""" |
| cur = self._conn.cursor() |
| cur.execute("SELECT COUNT(*) FROM cocoons") |
| return cur.fetchone()[0] |
|
|
| |
| |
| |
| def store(self, query: str, response: str, adapter: str = "unknown", |
| domain: str = "general", complexity: str = "MEDIUM", |
| emotion: str = "neutral", importance: int = 7, |
| metadata: Optional[Dict] = None) -> str: |
| """ |
| Store a reasoning exchange as a cocoon. |
| |
| This is the unified replacement for: |
| - CognitionCocooner.wrap_reasoning() |
| - LivingMemoryKernel.store() |
| - CodetteSession.add_message() |
| |
| Returns cocoon ID. |
| """ |
| cocoon_id = f"cocoon_{int(time.time())}_{hashlib.md5(query.encode()).hexdigest()[:6]}" |
| timestamp = time.time() |
| meta_json = json.dumps(metadata or {}) |
|
|
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| INSERT OR REPLACE INTO cocoons |
| (id, query, response, adapter, domain, complexity, emotion, |
| importance, timestamp, metadata_json) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| """, ( |
| cocoon_id, |
| query[:500], |
| response[:2000], |
| adapter, |
| domain, |
| complexity, |
| emotion, |
| importance, |
| timestamp, |
| meta_json, |
| )) |
| self._conn.commit() |
| self._total_stored += 1 |
|
|
| |
| cocoon = { |
| "id": cocoon_id, "query": query[:500], "response": response[:2000], |
| "adapter": adapter, "domain": domain, "complexity": complexity, |
| "emotion": emotion, "importance": importance, |
| "timestamp": timestamp, "metadata": metadata or {}, |
| } |
| self._cache_put(cocoon_id, cocoon) |
|
|
| return cocoon_id |
| except Exception as e: |
| logger.error(f"Failed to store cocoon: {e}") |
| return "" |
|
|
| |
| |
| |
| def recall_relevant(self, query: str, max_results: int = 3, |
| min_importance: int = 0, |
| identity_id: str = "", |
| recency_weight: float = 0.3, |
| success_weight: float = 0.2, |
| identity_weight: float = 0.2) -> List[Dict]: |
| """ |
| Find cocoons relevant to a query using FTS5 + multi-signal ranking. |
| |
| Ranking combines four signals: |
| 1. FTS5 relevance (text match quality) β base signal |
| 2. Recency β newer cocoons rank higher (exponential decay) |
| 3. Success β cocoons marked as successful rank higher |
| 4. Identity β cocoons linked to the current user rank higher |
| |
| Weight params control the balance (0.0 = disabled, 1.0 = dominant). |
| """ |
| if not query.strip(): |
| return self.recall_recent(max_results) |
|
|
| |
| stop_words = { |
| "the", "a", "an", "is", "are", "was", "were", "be", "been", |
| "have", "has", "had", "do", "does", "did", "will", "would", |
| "could", "should", "can", "to", "of", "in", "for", "on", |
| "with", "at", "by", "from", "as", "and", "but", "or", "if", |
| "it", "its", "this", "that", "i", "me", "my", "we", "you", |
| "what", "how", "why", "when", "where", "who", "about", "just", |
| "not", "no", "so", "very", "really", "also", "too", "up", |
| } |
| words = [ |
| w.strip(".,!?;:\"'()[]{}").lower() |
| for w in query.split() |
| if len(w) > 2 and w.lower().strip(".,!?;:\"'()[]{}") not in stop_words |
| ] |
|
|
| if not words: |
| return self.recall_recent(max_results) |
|
|
| |
| fts_query = " OR ".join(f'"{w}"' for w in words[:8]) |
|
|
| |
| fetch_limit = max(max_results * 4, 12) |
|
|
| try: |
| cur = self._conn.cursor() |
| sql = """ |
| SELECT c.id, c.query, c.response, c.adapter, c.domain, |
| c.complexity, c.emotion, c.importance, c.timestamp, |
| c.metadata_json, |
| rank |
| FROM cocoons_fts |
| JOIN cocoons c ON cocoons_fts.rowid = c.rowid |
| WHERE cocoons_fts MATCH ? |
| AND c.importance >= ? |
| ORDER BY rank |
| LIMIT ? |
| """ |
| cur.execute(sql, (fts_query, min_importance, fetch_limit)) |
| rows = cur.fetchall() |
|
|
| if not rows: |
| return self.recall_recent(max_results) |
|
|
| |
| now = time.time() |
| scored = [] |
| for row in rows: |
| cocoon = dict(row) |
| cocoon["metadata"] = json.loads(cocoon.pop("metadata_json", "{}")) |
|
|
| |
| fts_score = 1.0 / (1.0 + abs(cocoon.get("rank", 0))) |
|
|
| |
| age_seconds = now - cocoon.get("timestamp", now) |
| recency_score = math.exp(-age_seconds / 3600.0) |
|
|
| |
| meta = cocoon.get("metadata", {}) |
| success_score = 1.0 if meta.get("success", True) else 0.3 |
|
|
| |
| identity_score = 0.5 |
| if identity_id: |
| cocoon_identity = meta.get("identity_id", "") |
| if cocoon_identity == identity_id: |
| identity_score = 1.0 |
| elif cocoon_identity: |
| identity_score = 0.2 |
|
|
| |
| relevance_weight = 1.0 - recency_weight - success_weight - identity_weight |
| combined = ( |
| relevance_weight * fts_score + |
| recency_weight * recency_score + |
| success_weight * success_score + |
| identity_weight * identity_score |
| ) |
|
|
| cocoon["_rank_score"] = round(combined, 4) |
| cocoon.pop("rank", None) |
| scored.append(cocoon) |
|
|
| |
| scored.sort(key=lambda c: c["_rank_score"], reverse=True) |
|
|
| results = scored[:max_results] |
| self._cache_hits += len(results) |
| return results |
|
|
| except Exception as e: |
| logger.debug(f"FTS5 ranked search failed: {e}") |
| return self.recall_recent(max_results) |
|
|
| def recall_recent(self, limit: int = 5) -> List[Dict]: |
| """Get N most recent cocoons.""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT id, query, response, adapter, domain, complexity, |
| emotion, importance, timestamp, metadata_json |
| FROM cocoons |
| ORDER BY timestamp DESC |
| LIMIT ? |
| """, (limit,)) |
| rows = cur.fetchall() |
| results = [] |
| for row in rows: |
| cocoon = dict(row) |
| cocoon["metadata"] = json.loads(cocoon.pop("metadata_json", "{}")) |
| results.append(cocoon) |
| return results |
| except Exception as e: |
| logger.debug(f"Recent recall failed: {e}") |
| return [] |
|
|
| def recall_by_emotion(self, emotion: str, limit: int = 5) -> List[Dict]: |
| """Recall cocoons with specific emotional tag.""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT id, query, response, adapter, domain, complexity, |
| emotion, importance, timestamp, metadata_json |
| FROM cocoons |
| WHERE emotion = ? |
| ORDER BY timestamp DESC |
| LIMIT ? |
| """, (emotion, limit)) |
| return [dict(r) for r in cur.fetchall()] |
| except Exception: |
| return [] |
|
|
| def recall_by_domain(self, domain: str, limit: int = 5) -> List[Dict]: |
| """Recall cocoons from a specific domain.""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT id, query, response, adapter, domain, complexity, |
| emotion, importance, timestamp, metadata_json |
| FROM cocoons |
| WHERE domain = ? |
| ORDER BY importance DESC, timestamp DESC |
| LIMIT ? |
| """, (domain, limit)) |
| results = [] |
| for row in cur.fetchall(): |
| cocoon = dict(row) |
| cocoon["metadata"] = json.loads(cocoon.pop("metadata_json", "{}")) |
| results.append(cocoon) |
| return results |
| except Exception: |
| return [] |
|
|
| def recall_multi_domain(self, domains: List[str], limit_per: int = 3) -> List[Dict]: |
| """Recall cocoons across multiple domains, limit_per each.""" |
| results = [] |
| for domain in domains: |
| results.extend(self.recall_by_domain(domain, limit_per)) |
| |
| for domain in domains: |
| fts_results = self.recall_relevant(domain, max_results=limit_per) |
| for r in fts_results: |
| if r.get("id") not in {c.get("id") for c in results}: |
| results.append(r) |
| return results |
|
|
| def recall_by_adapter(self, adapter: str, limit: int = 5) -> List[Dict]: |
| """Recall cocoons generated by specific adapter.""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT id, query, response, adapter, domain, complexity, |
| emotion, importance, timestamp, metadata_json |
| FROM cocoons |
| WHERE adapter = ? |
| ORDER BY timestamp DESC |
| LIMIT ? |
| """, (adapter, limit)) |
| return [dict(r) for r in cur.fetchall()] |
| except Exception: |
| return [] |
|
|
| def recall_important(self, min_importance: int = 7, limit: int = 10) -> List[Dict]: |
| """Recall high-importance cocoons (replaces LivingMemoryKernel.recall_important).""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT id, query, response, adapter, domain, complexity, |
| emotion, importance, timestamp, metadata_json |
| FROM cocoons |
| WHERE importance >= ? |
| ORDER BY importance DESC, timestamp DESC |
| LIMIT ? |
| """, (min_importance, limit)) |
| return [dict(r) for r in cur.fetchall()] |
| except Exception: |
| return [] |
|
|
| |
| |
| |
| def mark_success(self, cocoon_id: str, success: bool = True, |
| identity_id: str = ""): |
| """ |
| Mark a cocoon as successful or unsuccessful. |
| |
| This feeds back into ranked recall β successful cocoons |
| get boosted in future searches, unsuccessful ones get demoted. |
| """ |
| try: |
| cur = self._conn.cursor() |
| cur.execute( |
| "SELECT metadata_json FROM cocoons WHERE id = ?", |
| (cocoon_id,) |
| ) |
| row = cur.fetchone() |
| if row: |
| meta = json.loads(row["metadata_json"] or "{}") |
| meta["success"] = success |
| if identity_id: |
| meta["identity_id"] = identity_id |
| cur.execute( |
| "UPDATE cocoons SET metadata_json = ? WHERE id = ?", |
| (json.dumps(meta), cocoon_id) |
| ) |
| self._conn.commit() |
| except Exception as e: |
| logger.debug(f"mark_success failed: {e}") |
|
|
| |
| |
| |
| def adapter_dominance(self) -> Dict: |
| """Analyze adapter usage distribution.""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT adapter, COUNT(*) as cnt |
| FROM cocoons |
| GROUP BY adapter |
| ORDER BY cnt DESC |
| """) |
| rows = cur.fetchall() |
| total = sum(r["cnt"] for r in rows) |
| if not total: |
| return {"total_responses": 0, "dominant": None, "ratio": 0, "balanced": True} |
|
|
| distribution = {r["adapter"]: r["cnt"] for r in rows} |
| dominant = rows[0]["adapter"] |
| ratio = rows[0]["cnt"] / total |
|
|
| return { |
| "total_responses": total, |
| "dominant": dominant, |
| "ratio": round(ratio, 3), |
| "balanced": ratio <= 0.4, |
| "distribution": distribution, |
| } |
| except Exception: |
| return {"total_responses": 0, "dominant": None, "ratio": 0, "balanced": True} |
|
|
| def domain_distribution(self) -> Dict: |
| """Analyze domain distribution.""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT domain, COUNT(*) as cnt |
| FROM cocoons |
| GROUP BY domain |
| ORDER BY cnt DESC |
| """) |
| return {r["domain"]: r["cnt"] for r in cur.fetchall()} |
| except Exception: |
| return {} |
|
|
| def complexity_distribution(self) -> Dict: |
| """Analyze query complexity distribution.""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT complexity, COUNT(*) as cnt |
| FROM cocoons |
| GROUP BY complexity |
| ORDER BY cnt DESC |
| """) |
| return {r["complexity"]: r["cnt"] for r in cur.fetchall()} |
| except Exception: |
| return {} |
|
|
| def response_length_trend(self, window: int = 20) -> List[int]: |
| """Get response length trend (last N cocoons).""" |
| try: |
| cur = self._conn.cursor() |
| cur.execute(""" |
| SELECT LENGTH(response) as len |
| FROM cocoons |
| ORDER BY timestamp DESC |
| LIMIT ? |
| """, (window,)) |
| return [r["len"] for r in cur.fetchall()][::-1] |
| except Exception: |
| return [] |
|
|
| def full_introspection(self) -> Dict: |
| """Full statistical self-analysis (replaces CocoonIntrospectionEngine).""" |
| adapter = self.adapter_dominance() |
| domains = self.domain_distribution() |
| complexities = self.complexity_distribution() |
| lengths = self.response_length_trend(20) |
| avg_len = sum(lengths) / len(lengths) if lengths else 0 |
|
|
| observations = [] |
| total = adapter.get("total_responses", 0) |
| observations.append(f"I've processed {total} reasoning exchanges.") |
|
|
| if adapter.get("dominant"): |
| ratio = adapter.get("ratio", 0) |
| if ratio > 0.4: |
| observations.append( |
| f"My {adapter['dominant']} adapter handles {ratio:.0%} of queries " |
| f"β that's dominant. I should diversify." |
| ) |
| else: |
| observations.append( |
| f"My adapter usage is balanced (most-used: {adapter['dominant']} at {ratio:.0%})." |
| ) |
|
|
| if domains: |
| top_domain = max(domains, key=domains.get) |
| observations.append(f"Most common domain: {top_domain} ({domains[top_domain]} queries).") |
|
|
| observations.append(f"Average response length: {avg_len:.0f} characters.") |
|
|
| return { |
| "total_cocoons": total, |
| "adapter_dominance": adapter, |
| "domain_distribution": domains, |
| "complexity_distribution": complexities, |
| "avg_response_length": round(avg_len), |
| "response_length_trend": lengths, |
| "observations": observations, |
| } |
|
|
| |
| |
| |
| def _migrate_legacy(self): |
| """Migrate legacy JSON cocoons and .cocoon files into SQLite.""" |
| migrated = 0 |
|
|
| |
| for f in sorted(self.legacy_dir.glob("cocoon_*.json")): |
| try: |
| with open(f, "r", encoding="utf-8") as fh: |
| data = json.load(fh) |
|
|
| if data.get("type") == "reasoning": |
| wrapped = data.get("wrapped", {}) |
| self.store( |
| query=wrapped.get("query", ""), |
| response=wrapped.get("response", ""), |
| adapter=wrapped.get("adapter", "unknown"), |
| domain=wrapped.get("metadata", {}).get("domain", "general"), |
| complexity=wrapped.get("metadata", {}).get("complexity", "MEDIUM"), |
| importance=7, |
| metadata=wrapped.get("metadata"), |
| ) |
| migrated += 1 |
| elif "summary" in data or "quote" in data: |
| |
| self.store( |
| query=data.get("title", f.stem), |
| response=data.get("summary", data.get("quote", "")), |
| adapter="memory_kernel", |
| emotion=data.get("emotion", "neutral"), |
| importance=8, |
| ) |
| migrated += 1 |
| except Exception as e: |
| logger.debug(f"Migration skip {f.name}: {e}") |
|
|
| |
| for f in sorted(self.legacy_dir.glob("*.cocoon")): |
| try: |
| with open(f, "r", encoding="utf-8") as fh: |
| data = json.load(fh) |
| meta = data.get("metadata", {}) |
| self.store( |
| query=meta.get("context", data.get("cocoon_id", f.stem))[:200], |
| response=meta.get("context", ""), |
| adapter="consciousness_stack", |
| emotion=data.get("emotional_classification", "neutral").lower(), |
| importance=data.get("importance_rating", 7), |
| ) |
| migrated += 1 |
| except Exception: |
| continue |
|
|
| if migrated > 0: |
| logger.info(f"Migrated {migrated} legacy cocoons to SQLite") |
| self._total_stored = self._count() |
|
|
| |
| |
| |
| def _cache_put(self, key: str, value: Dict): |
| """Add to LRU cache.""" |
| if key in self._cache: |
| self._cache.move_to_end(key) |
| self._cache[key] = value |
| while len(self._cache) > CACHE_MAX: |
| self._cache.popitem(last=False) |
|
|
| def _cache_get(self, key: str) -> Optional[Dict]: |
| """Get from LRU cache.""" |
| if key in self._cache: |
| self._cache.move_to_end(key) |
| self._cache_hits += 1 |
| return self._cache[key] |
| self._cache_misses += 1 |
| return None |
|
|
| |
| |
| |
| def get_stats(self) -> Dict: |
| """Memory system stats for health checks.""" |
| return { |
| "total_cocoons": self._total_stored, |
| "cache_size": len(self._cache), |
| "cache_max": CACHE_MAX, |
| "cache_hits": self._cache_hits, |
| "cache_misses": self._cache_misses, |
| "cache_hit_rate": ( |
| self._cache_hits / max(1, self._cache_hits + self._cache_misses) |
| ), |
| "db_path": str(self.db_path), |
| "db_size_kb": round(self.db_path.stat().st_size / 1024, 1) if self.db_path.exists() else 0, |
| } |
|
|
| def close(self): |
| """Close database connection.""" |
| if self._conn: |
| self._conn.close() |
|
|