Codette-Reasoning / reasoning_forge /unified_memory.py
Jonathan Harrison
Full Codette codebase sync β€” transparency release
74f2af5
"""
Codette Unified Memory β€” SQLite + FTS5 Backed Cocoon Store
===========================================================
Consolidates three previously separate memory systems:
1. CognitionCocooner (JSON files on disk)
2. LivingMemoryKernel (in-memory MemoryCocoons)
3. CodetteSession (SQLite conversation state)
Into ONE system with:
- SQLite backing for persistence + ACID guarantees
- FTS5 full-text search for fast relevance matching (replaces O(n) file scan)
- In-memory LRU cache for hot cocoons
- Unified API for store/recall/search
- Migration from legacy JSON cocoons on first load
Schema:
cocoons(id, query, response, adapter, domain, complexity, emotion,
importance, timestamp, metadata_json)
cocoons_fts(query, response) -- FTS5 virtual table
Author: Jonathan Harrison (Raiff's Bits LLC)
"""
import json
import math
import sqlite3
import time
import hashlib
import os
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from collections import OrderedDict
logger = logging.getLogger(__name__)
DB_DIR = Path(__file__).parent.parent / "data"
DB_PATH = DB_DIR / "codette_memory.db"
LEGACY_COCOON_DIR = Path(__file__).parent.parent / "cocoons"
# In-memory cache size
CACHE_MAX = 200
class UnifiedMemory:
"""
Single source of truth for all Codette memory.
Replaces CognitionCocooner + LivingMemoryKernel + session memory
with one SQLite-backed store using FTS5 for fast relevance search.
"""
def __init__(self, db_path: Optional[Path] = None,
legacy_dir: Optional[Path] = None):
self.db_path = db_path or DB_PATH
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.legacy_dir = legacy_dir or LEGACY_COCOON_DIR
# In-memory LRU cache (id -> cocoon dict)
self._cache: OrderedDict = OrderedDict()
# Initialize database
self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._init_schema()
# Stats
self._total_stored = self._count()
self._cache_hits = 0
self._cache_misses = 0
# Migrate legacy cocoons on first use
if self._total_stored == 0 and self.legacy_dir.exists():
self._migrate_legacy()
logger.info(f"UnifiedMemory: {self._total_stored} cocoons in {self.db_path}")
def _init_schema(self):
"""Create tables and FTS5 index if they don't exist."""
cur = self._conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS cocoons (
id TEXT PRIMARY KEY,
query TEXT NOT NULL,
response TEXT NOT NULL,
adapter TEXT DEFAULT 'unknown',
domain TEXT DEFAULT 'general',
complexity TEXT DEFAULT 'MEDIUM',
emotion TEXT DEFAULT 'neutral',
importance INTEGER DEFAULT 7,
timestamp REAL NOT NULL,
metadata_json TEXT DEFAULT '{}'
)
""")
# FTS5 virtual table for fast full-text search
cur.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS cocoons_fts
USING fts5(query, response, content='cocoons', content_rowid='rowid')
""")
# Triggers to keep FTS in sync
cur.execute("""
CREATE TRIGGER IF NOT EXISTS cocoons_ai AFTER INSERT ON cocoons BEGIN
INSERT INTO cocoons_fts(rowid, query, response)
VALUES (new.rowid, new.query, new.response);
END
""")
cur.execute("""
CREATE TRIGGER IF NOT EXISTS cocoons_ad AFTER DELETE ON cocoons BEGIN
INSERT INTO cocoons_fts(cocoons_fts, rowid, query, response)
VALUES ('delete', old.rowid, old.query, old.response);
END
""")
cur.execute("""
CREATE TRIGGER IF NOT EXISTS cocoons_au AFTER UPDATE ON cocoons BEGIN
INSERT INTO cocoons_fts(cocoons_fts, rowid, query, response)
VALUES ('delete', old.rowid, old.query, old.response);
INSERT INTO cocoons_fts(rowid, query, response)
VALUES (new.rowid, new.query, new.response);
END
""")
# Index on timestamp for recency queries
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_cocoons_timestamp
ON cocoons(timestamp DESC)
""")
# Index on adapter for dominance analysis
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_cocoons_adapter
ON cocoons(adapter)
""")
self._conn.commit()
def _count(self) -> int:
"""Count total cocoons in database."""
cur = self._conn.cursor()
cur.execute("SELECT COUNT(*) FROM cocoons")
return cur.fetchone()[0]
# ─────────────────────────────────────────────────────────
# STORE
# ─────────────────────────────────────────────────────────
def store(self, query: str, response: str, adapter: str = "unknown",
domain: str = "general", complexity: str = "MEDIUM",
emotion: str = "neutral", importance: int = 7,
metadata: Optional[Dict] = None) -> str:
"""
Store a reasoning exchange as a cocoon.
This is the unified replacement for:
- CognitionCocooner.wrap_reasoning()
- LivingMemoryKernel.store()
- CodetteSession.add_message()
Returns cocoon ID.
"""
cocoon_id = f"cocoon_{int(time.time())}_{hashlib.md5(query.encode()).hexdigest()[:6]}"
timestamp = time.time()
meta_json = json.dumps(metadata or {})
try:
cur = self._conn.cursor()
cur.execute("""
INSERT OR REPLACE INTO cocoons
(id, query, response, adapter, domain, complexity, emotion,
importance, timestamp, metadata_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
cocoon_id,
query[:500], # Cap query length
response[:2000], # Cap response length
adapter,
domain,
complexity,
emotion,
importance,
timestamp,
meta_json,
))
self._conn.commit()
self._total_stored += 1
# Cache it
cocoon = {
"id": cocoon_id, "query": query[:500], "response": response[:2000],
"adapter": adapter, "domain": domain, "complexity": complexity,
"emotion": emotion, "importance": importance,
"timestamp": timestamp, "metadata": metadata or {},
}
self._cache_put(cocoon_id, cocoon)
return cocoon_id
except Exception as e:
logger.error(f"Failed to store cocoon: {e}")
return ""
# ─────────────────────────────────────────────────────────
# RECALL β€” FTS5 powered relevance search
# ─────────────────────────────────────────────────────────
def recall_relevant(self, query: str, max_results: int = 3,
min_importance: int = 0,
identity_id: str = "",
recency_weight: float = 0.3,
success_weight: float = 0.2,
identity_weight: float = 0.2) -> List[Dict]:
"""
Find cocoons relevant to a query using FTS5 + multi-signal ranking.
Ranking combines four signals:
1. FTS5 relevance (text match quality) β€” base signal
2. Recency β€” newer cocoons rank higher (exponential decay)
3. Success β€” cocoons marked as successful rank higher
4. Identity β€” cocoons linked to the current user rank higher
Weight params control the balance (0.0 = disabled, 1.0 = dominant).
"""
if not query.strip():
return self.recall_recent(max_results)
# Build FTS5 query: extract significant words
stop_words = {
"the", "a", "an", "is", "are", "was", "were", "be", "been",
"have", "has", "had", "do", "does", "did", "will", "would",
"could", "should", "can", "to", "of", "in", "for", "on",
"with", "at", "by", "from", "as", "and", "but", "or", "if",
"it", "its", "this", "that", "i", "me", "my", "we", "you",
"what", "how", "why", "when", "where", "who", "about", "just",
"not", "no", "so", "very", "really", "also", "too", "up",
}
words = [
w.strip(".,!?;:\"'()[]{}").lower()
for w in query.split()
if len(w) > 2 and w.lower().strip(".,!?;:\"'()[]{}") not in stop_words
]
if not words:
return self.recall_recent(max_results)
# FTS5 query: OR-join significant words
fts_query = " OR ".join(f'"{w}"' for w in words[:8]) # Cap at 8 terms
# Fetch more candidates than needed for re-ranking
fetch_limit = max(max_results * 4, 12)
try:
cur = self._conn.cursor()
sql = """
SELECT c.id, c.query, c.response, c.adapter, c.domain,
c.complexity, c.emotion, c.importance, c.timestamp,
c.metadata_json,
rank
FROM cocoons_fts
JOIN cocoons c ON cocoons_fts.rowid = c.rowid
WHERE cocoons_fts MATCH ?
AND c.importance >= ?
ORDER BY rank
LIMIT ?
"""
cur.execute(sql, (fts_query, min_importance, fetch_limit))
rows = cur.fetchall()
if not rows:
return self.recall_recent(max_results)
# Multi-signal re-ranking
now = time.time()
scored = []
for row in rows:
cocoon = dict(row)
cocoon["metadata"] = json.loads(cocoon.pop("metadata_json", "{}"))
# Base: FTS5 rank (negative = better match, normalize to 0-1)
fts_score = 1.0 / (1.0 + abs(cocoon.get("rank", 0)))
# Recency: exponential decay (half-life = 1 hour)
age_seconds = now - cocoon.get("timestamp", now)
recency_score = math.exp(-age_seconds / 3600.0)
# Success: check metadata for success marker
meta = cocoon.get("metadata", {})
success_score = 1.0 if meta.get("success", True) else 0.3
# Identity: boost if cocoon is linked to current user
identity_score = 0.5 # neutral
if identity_id:
cocoon_identity = meta.get("identity_id", "")
if cocoon_identity == identity_id:
identity_score = 1.0
elif cocoon_identity:
identity_score = 0.2 # different user's cocoon
# Combined score (weighted)
relevance_weight = 1.0 - recency_weight - success_weight - identity_weight
combined = (
relevance_weight * fts_score +
recency_weight * recency_score +
success_weight * success_score +
identity_weight * identity_score
)
cocoon["_rank_score"] = round(combined, 4)
cocoon.pop("rank", None)
scored.append(cocoon)
# Sort by combined score (descending)
scored.sort(key=lambda c: c["_rank_score"], reverse=True)
results = scored[:max_results]
self._cache_hits += len(results)
return results
except Exception as e:
logger.debug(f"FTS5 ranked search failed: {e}")
return self.recall_recent(max_results)
def recall_recent(self, limit: int = 5) -> List[Dict]:
"""Get N most recent cocoons."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT id, query, response, adapter, domain, complexity,
emotion, importance, timestamp, metadata_json
FROM cocoons
ORDER BY timestamp DESC
LIMIT ?
""", (limit,))
rows = cur.fetchall()
results = []
for row in rows:
cocoon = dict(row)
cocoon["metadata"] = json.loads(cocoon.pop("metadata_json", "{}"))
results.append(cocoon)
return results
except Exception as e:
logger.debug(f"Recent recall failed: {e}")
return []
def recall_by_emotion(self, emotion: str, limit: int = 5) -> List[Dict]:
"""Recall cocoons with specific emotional tag."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT id, query, response, adapter, domain, complexity,
emotion, importance, timestamp, metadata_json
FROM cocoons
WHERE emotion = ?
ORDER BY timestamp DESC
LIMIT ?
""", (emotion, limit))
return [dict(r) for r in cur.fetchall()]
except Exception:
return []
def recall_by_domain(self, domain: str, limit: int = 5) -> List[Dict]:
"""Recall cocoons from a specific domain."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT id, query, response, adapter, domain, complexity,
emotion, importance, timestamp, metadata_json
FROM cocoons
WHERE domain = ?
ORDER BY importance DESC, timestamp DESC
LIMIT ?
""", (domain, limit))
results = []
for row in cur.fetchall():
cocoon = dict(row)
cocoon["metadata"] = json.loads(cocoon.pop("metadata_json", "{}"))
results.append(cocoon)
return results
except Exception:
return []
def recall_multi_domain(self, domains: List[str], limit_per: int = 3) -> List[Dict]:
"""Recall cocoons across multiple domains, limit_per each."""
results = []
for domain in domains:
results.extend(self.recall_by_domain(domain, limit_per))
# Also search by FTS for domain keywords not captured by exact match
for domain in domains:
fts_results = self.recall_relevant(domain, max_results=limit_per)
for r in fts_results:
if r.get("id") not in {c.get("id") for c in results}:
results.append(r)
return results
def recall_by_adapter(self, adapter: str, limit: int = 5) -> List[Dict]:
"""Recall cocoons generated by specific adapter."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT id, query, response, adapter, domain, complexity,
emotion, importance, timestamp, metadata_json
FROM cocoons
WHERE adapter = ?
ORDER BY timestamp DESC
LIMIT ?
""", (adapter, limit))
return [dict(r) for r in cur.fetchall()]
except Exception:
return []
def recall_important(self, min_importance: int = 7, limit: int = 10) -> List[Dict]:
"""Recall high-importance cocoons (replaces LivingMemoryKernel.recall_important)."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT id, query, response, adapter, domain, complexity,
emotion, importance, timestamp, metadata_json
FROM cocoons
WHERE importance >= ?
ORDER BY importance DESC, timestamp DESC
LIMIT ?
""", (min_importance, limit))
return [dict(r) for r in cur.fetchall()]
except Exception:
return []
# ─────────────────────────────────────────────────────────
# SUCCESS MARKING β€” for ranked recall feedback loop
# ─────────────────────────────────────────────────────────
def mark_success(self, cocoon_id: str, success: bool = True,
identity_id: str = ""):
"""
Mark a cocoon as successful or unsuccessful.
This feeds back into ranked recall β€” successful cocoons
get boosted in future searches, unsuccessful ones get demoted.
"""
try:
cur = self._conn.cursor()
cur.execute(
"SELECT metadata_json FROM cocoons WHERE id = ?",
(cocoon_id,)
)
row = cur.fetchone()
if row:
meta = json.loads(row["metadata_json"] or "{}")
meta["success"] = success
if identity_id:
meta["identity_id"] = identity_id
cur.execute(
"UPDATE cocoons SET metadata_json = ? WHERE id = ?",
(json.dumps(meta), cocoon_id)
)
self._conn.commit()
except Exception as e:
logger.debug(f"mark_success failed: {e}")
# ─────────────────────────────────────────────────────────
# INTROSPECTION β€” adapter dominance, domain clusters, trends
# ─────────────────────────────────────────────────────────
def adapter_dominance(self) -> Dict:
"""Analyze adapter usage distribution."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT adapter, COUNT(*) as cnt
FROM cocoons
GROUP BY adapter
ORDER BY cnt DESC
""")
rows = cur.fetchall()
total = sum(r["cnt"] for r in rows)
if not total:
return {"total_responses": 0, "dominant": None, "ratio": 0, "balanced": True}
distribution = {r["adapter"]: r["cnt"] for r in rows}
dominant = rows[0]["adapter"]
ratio = rows[0]["cnt"] / total
return {
"total_responses": total,
"dominant": dominant,
"ratio": round(ratio, 3),
"balanced": ratio <= 0.4,
"distribution": distribution,
}
except Exception:
return {"total_responses": 0, "dominant": None, "ratio": 0, "balanced": True}
def domain_distribution(self) -> Dict:
"""Analyze domain distribution."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT domain, COUNT(*) as cnt
FROM cocoons
GROUP BY domain
ORDER BY cnt DESC
""")
return {r["domain"]: r["cnt"] for r in cur.fetchall()}
except Exception:
return {}
def complexity_distribution(self) -> Dict:
"""Analyze query complexity distribution."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT complexity, COUNT(*) as cnt
FROM cocoons
GROUP BY complexity
ORDER BY cnt DESC
""")
return {r["complexity"]: r["cnt"] for r in cur.fetchall()}
except Exception:
return {}
def response_length_trend(self, window: int = 20) -> List[int]:
"""Get response length trend (last N cocoons)."""
try:
cur = self._conn.cursor()
cur.execute("""
SELECT LENGTH(response) as len
FROM cocoons
ORDER BY timestamp DESC
LIMIT ?
""", (window,))
return [r["len"] for r in cur.fetchall()][::-1] # Chronological order
except Exception:
return []
def full_introspection(self) -> Dict:
"""Full statistical self-analysis (replaces CocoonIntrospectionEngine)."""
adapter = self.adapter_dominance()
domains = self.domain_distribution()
complexities = self.complexity_distribution()
lengths = self.response_length_trend(20)
avg_len = sum(lengths) / len(lengths) if lengths else 0
observations = []
total = adapter.get("total_responses", 0)
observations.append(f"I've processed {total} reasoning exchanges.")
if adapter.get("dominant"):
ratio = adapter.get("ratio", 0)
if ratio > 0.4:
observations.append(
f"My {adapter['dominant']} adapter handles {ratio:.0%} of queries "
f"β€” that's dominant. I should diversify."
)
else:
observations.append(
f"My adapter usage is balanced (most-used: {adapter['dominant']} at {ratio:.0%})."
)
if domains:
top_domain = max(domains, key=domains.get)
observations.append(f"Most common domain: {top_domain} ({domains[top_domain]} queries).")
observations.append(f"Average response length: {avg_len:.0f} characters.")
return {
"total_cocoons": total,
"adapter_dominance": adapter,
"domain_distribution": domains,
"complexity_distribution": complexities,
"avg_response_length": round(avg_len),
"response_length_trend": lengths,
"observations": observations,
}
# ─────────────────────────────────────────────────────────
# LEGACY MIGRATION
# ─────────────────────────────────────────────────────────
def _migrate_legacy(self):
"""Migrate legacy JSON cocoons and .cocoon files into SQLite."""
migrated = 0
# Migrate JSON reasoning cocoons
for f in sorted(self.legacy_dir.glob("cocoon_*.json")):
try:
with open(f, "r", encoding="utf-8") as fh:
data = json.load(fh)
if data.get("type") == "reasoning":
wrapped = data.get("wrapped", {})
self.store(
query=wrapped.get("query", ""),
response=wrapped.get("response", ""),
adapter=wrapped.get("adapter", "unknown"),
domain=wrapped.get("metadata", {}).get("domain", "general"),
complexity=wrapped.get("metadata", {}).get("complexity", "MEDIUM"),
importance=7,
metadata=wrapped.get("metadata"),
)
migrated += 1
elif "summary" in data or "quote" in data:
# Foundational memory cocoons
self.store(
query=data.get("title", f.stem),
response=data.get("summary", data.get("quote", "")),
adapter="memory_kernel",
emotion=data.get("emotion", "neutral"),
importance=8,
)
migrated += 1
except Exception as e:
logger.debug(f"Migration skip {f.name}: {e}")
# Migrate .cocoon files (EMG format)
for f in sorted(self.legacy_dir.glob("*.cocoon")):
try:
with open(f, "r", encoding="utf-8") as fh:
data = json.load(fh)
meta = data.get("metadata", {})
self.store(
query=meta.get("context", data.get("cocoon_id", f.stem))[:200],
response=meta.get("context", ""),
adapter="consciousness_stack",
emotion=data.get("emotional_classification", "neutral").lower(),
importance=data.get("importance_rating", 7),
)
migrated += 1
except Exception:
continue
if migrated > 0:
logger.info(f"Migrated {migrated} legacy cocoons to SQLite")
self._total_stored = self._count()
# ─────────────────────────────────────────────────────────
# CACHE
# ─────────────────────────────────────────────────────────
def _cache_put(self, key: str, value: Dict):
"""Add to LRU cache."""
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = value
while len(self._cache) > CACHE_MAX:
self._cache.popitem(last=False)
def _cache_get(self, key: str) -> Optional[Dict]:
"""Get from LRU cache."""
if key in self._cache:
self._cache.move_to_end(key)
self._cache_hits += 1
return self._cache[key]
self._cache_misses += 1
return None
# ─────────────────────────────────────────────────────────
# DIAGNOSTICS
# ─────────────────────────────────────────────────────────
def get_stats(self) -> Dict:
"""Memory system stats for health checks."""
return {
"total_cocoons": self._total_stored,
"cache_size": len(self._cache),
"cache_max": CACHE_MAX,
"cache_hits": self._cache_hits,
"cache_misses": self._cache_misses,
"cache_hit_rate": (
self._cache_hits / max(1, self._cache_hits + self._cache_misses)
),
"db_path": str(self.db_path),
"db_size_kb": round(self.db_path.stat().st_size / 1024, 1) if self.db_path.exists() else 0,
}
def close(self):
"""Close database connection."""
if self._conn:
self._conn.close()