| """ |
| Federated Cocoon Synchronization Protocol — Encrypted state packaging, |
| HMAC signing, and attractor merger for distributed RC+xi nodes. |
| |
| Implements: |
| - Cocoon packaging with full RC+xi metrics |
| - Fernet symmetric encryption (AES-128-CBC + HMAC-SHA256) |
| - Attractor merger via weighted mean-field coupling (Eq. 12) |
| - Phase coherence consensus (Gamma >= 0.98 target) |
| - Secure sync protocol: package -> encrypt -> sign -> transmit -> verify -> merge |
| |
| This module enables Codette Pods (edge nodes on RPi 5) to synchronize |
| their reasoning state without exposing raw data. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import hmac |
| import json |
| import os |
| import time |
| import uuid |
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| |
| try: |
| from cryptography.fernet import Fernet |
| HAS_CRYPTO = True |
| except ImportError: |
| HAS_CRYPTO = False |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class CocoonPackage: |
| """A packaged cocoon ready for sync.""" |
| cocoon_id: str |
| node_id: str |
| timestamp: float |
| state_snapshot: Dict[str, Any] |
| attractors: List[Dict] |
| glyphs: List[Dict] |
| metrics: Dict[str, float] |
| payload_hash: str |
| encrypted: bool = False |
| raw_payload: Optional[bytes] = None |
| signature: Optional[str] = None |
|
|
|
|
| @dataclass |
| class SyncResult: |
| """Result of a cocoon synchronization.""" |
| success: bool |
| merged_attractors: int |
| new_glyphs: int |
| coherence_before: float |
| coherence_after: float |
| tension_delta: float |
| errors: List[str] = field(default_factory=list) |
|
|
|
|
| |
| |
| |
|
|
| class CocoonKeyManager: |
| """Manages encryption keys for cocoon sync.""" |
|
|
| def __init__(self, key: Optional[bytes] = None): |
| if key: |
| self._key = key |
| elif HAS_CRYPTO: |
| self._key = Fernet.generate_key() |
| else: |
| self._key = os.urandom(32) |
|
|
| @property |
| def key(self) -> bytes: |
| return self._key |
|
|
| def derive_hmac_key(self) -> bytes: |
| return hashlib.sha256(self._key + b"hmac_salt_cocoon").digest() |
|
|
|
|
| |
| |
| |
|
|
| class CocoonSync: |
| """Federated cocoon synchronization protocol.""" |
|
|
| def __init__( |
| self, |
| node_id: str, |
| key_manager: Optional[CocoonKeyManager] = None, |
| coherence_target: float = 0.98, |
| tension_target: float = 0.05, |
| ethical_target: float = 0.90, |
| ): |
| self.node_id = node_id |
| self.key_manager = key_manager or CocoonKeyManager() |
| self.coherence_target = coherence_target |
| self.tension_target = tension_target |
| self.ethical_target = ethical_target |
|
|
| self._local_attractors: List[Dict] = [] |
| self._local_glyphs: List[Dict] = [] |
| self._sync_history: List[Dict] = [] |
|
|
| |
|
|
| def package_cocoon( |
| self, |
| spiderweb_state: Dict[str, Any], |
| phase_coherence: float, |
| epistemic_tension: float, |
| ethical_alignment: float, |
| attractors: Optional[List[Dict]] = None, |
| glyphs: Optional[List[Dict]] = None, |
| ) -> CocoonPackage: |
| """Package current state into a cocoon for transmission. |
| |
| Args: |
| spiderweb_state: Serialized QuantumSpiderweb state. |
| phase_coherence: Current Gamma value. |
| epistemic_tension: Current xi value. |
| ethical_alignment: Current AEGIS eta value. |
| attractors: Detected attractor manifolds. |
| glyphs: Identity glyphs formed. |
| |
| Returns: |
| CocoonPackage ready for encryption and transmission. |
| """ |
| cocoon_id = f"cocoon_{uuid.uuid4().hex[:12]}" |
|
|
| metrics = { |
| "phase_coherence": round(phase_coherence, 4), |
| "epistemic_tension": round(epistemic_tension, 4), |
| "ethical_alignment": round(ethical_alignment, 4), |
| "timestamp": time.time(), |
| } |
|
|
| |
| payload = { |
| "cocoon_id": cocoon_id, |
| "node_id": self.node_id, |
| "state": spiderweb_state, |
| "attractors": attractors or [], |
| "glyphs": glyphs or [], |
| "metrics": metrics, |
| } |
|
|
| payload_json = json.dumps(payload, sort_keys=True, default=str) |
| payload_hash = hashlib.sha256(payload_json.encode()).hexdigest() |
|
|
| return CocoonPackage( |
| cocoon_id=cocoon_id, |
| node_id=self.node_id, |
| timestamp=time.time(), |
| state_snapshot=spiderweb_state, |
| attractors=attractors or [], |
| glyphs=glyphs or [], |
| metrics=metrics, |
| payload_hash=payload_hash, |
| raw_payload=payload_json.encode(), |
| ) |
|
|
| |
|
|
| def encrypt_cocoon(self, package: CocoonPackage) -> CocoonPackage: |
| """Encrypt cocoon payload with Fernet (AES-128-CBC + HMAC-SHA256). |
| |
| Returns a new CocoonPackage; does not mutate the input. |
| Falls back to XOR obfuscation if cryptography is not installed. |
| """ |
| import copy |
| result = copy.copy(package) |
|
|
| if result.raw_payload is None: |
| payload_json = json.dumps({ |
| "cocoon_id": result.cocoon_id, |
| "node_id": result.node_id, |
| "state": result.state_snapshot, |
| "attractors": result.attractors, |
| "glyphs": result.glyphs, |
| "metrics": result.metrics, |
| }, sort_keys=True, default=str) |
| result.raw_payload = payload_json.encode() |
|
|
| if HAS_CRYPTO: |
| fernet = Fernet(self.key_manager.key) |
| encrypted = fernet.encrypt(result.raw_payload) |
| result.raw_payload = encrypted |
| result.encrypted = True |
| else: |
| |
| key_bytes = self.key_manager.key[:len(result.raw_payload)] |
| obfuscated = bytes( |
| a ^ b for a, b in |
| zip(result.raw_payload, key_bytes * (len(result.raw_payload) // len(key_bytes) + 1)) |
| ) |
| result.raw_payload = obfuscated |
| result.encrypted = True |
|
|
| return result |
|
|
| |
|
|
| def sign_cocoon(self, package: CocoonPackage) -> CocoonPackage: |
| """Sign cocoon with HMAC-SHA256 for integrity verification. |
| |
| Returns a new CocoonPackage; does not mutate the input. |
| """ |
| import copy |
| result = copy.copy(package) |
| hmac_key = self.key_manager.derive_hmac_key() |
| data_to_sign = result.raw_payload or result.payload_hash.encode() |
| signature = hmac.new(hmac_key, data_to_sign, hashlib.sha256).hexdigest() |
| result.signature = signature |
| return result |
|
|
| |
|
|
| def verify_cocoon(self, package: CocoonPackage) -> bool: |
| """Verify HMAC signature of incoming cocoon.""" |
| if not package.signature: |
| return False |
| hmac_key = self.key_manager.derive_hmac_key() |
| data_to_verify = package.raw_payload or package.payload_hash.encode() |
| expected = hmac.new(hmac_key, data_to_verify, hashlib.sha256).hexdigest() |
| return hmac.compare_digest(expected, package.signature) |
|
|
| |
|
|
| def decrypt_cocoon(self, package: CocoonPackage) -> Dict[str, Any]: |
| """Decrypt cocoon payload. |
| |
| Returns the deserialized payload dict. |
| """ |
| if not package.encrypted or package.raw_payload is None: |
| return { |
| "state": package.state_snapshot, |
| "attractors": package.attractors, |
| "glyphs": package.glyphs, |
| "metrics": package.metrics, |
| } |
|
|
| if HAS_CRYPTO: |
| fernet = Fernet(self.key_manager.key) |
| decrypted = fernet.decrypt(package.raw_payload) |
| else: |
| |
| key_bytes = self.key_manager.key[:len(package.raw_payload)] |
| decrypted = bytes( |
| a ^ b for a, b in |
| zip(package.raw_payload, key_bytes * (len(package.raw_payload) // len(key_bytes) + 1)) |
| ) |
|
|
| return json.loads(decrypted.decode()) |
|
|
| |
|
|
| def merge_attractors( |
| self, |
| local_attractors: List[Dict], |
| remote_attractors: List[Dict], |
| local_coherence: float = 0.95, |
| merge_radius: float = 2.0, |
| ) -> List[Dict]: |
| """Weighted attractor merger via mean-field coupling (Eq. 12). |
| |
| alpha = local_coherence: higher coherence = trust local more. |
| """ |
| alpha = min(local_coherence, 0.95) |
| merged = list(local_attractors) |
|
|
| for remote_att in remote_attractors: |
| r_center = remote_att.get("center", [0] * 5) |
| matched = False |
|
|
| for local_att in merged: |
| l_center = local_att.get("center", [0] * 5) |
| |
| dist = sum((a - b) ** 2 for a, b in zip(l_center, r_center)) ** 0.5 |
| if dist <= merge_radius: |
| |
| new_center = [ |
| alpha * lc + (1 - alpha) * rc |
| for lc, rc in zip(l_center, r_center) |
| ] |
| local_att["center"] = new_center |
| |
| local_att.setdefault("remote_members", []) |
| local_att["remote_members"].extend( |
| remote_att.get("members", []) |
| ) |
| matched = True |
| break |
|
|
| if not matched: |
| |
| merged.append({ |
| "attractor_id": remote_att.get("attractor_id", f"remote_{len(merged)}"), |
| "center": r_center, |
| "members": remote_att.get("members", []), |
| "source": "remote", |
| }) |
|
|
| return merged |
|
|
| |
|
|
| def sync_with_remote( |
| self, |
| incoming_package: CocoonPackage, |
| local_spiderweb_state: Dict[str, Any], |
| local_coherence: float, |
| local_tension: float, |
| ) -> SyncResult: |
| """Full sync protocol: verify -> decrypt -> merge -> report. |
| |
| Args: |
| incoming_package: Encrypted cocoon from remote node. |
| local_spiderweb_state: Current local web state. |
| local_coherence: Current local Gamma. |
| local_tension: Current local xi. |
| |
| Returns: |
| SyncResult with merge statistics. |
| """ |
| errors: List[str] = [] |
|
|
| |
| if not self.verify_cocoon(incoming_package): |
| return SyncResult( |
| success=False, merged_attractors=0, new_glyphs=0, |
| coherence_before=local_coherence, coherence_after=local_coherence, |
| tension_delta=0.0, errors=["HMAC verification failed"], |
| ) |
|
|
| |
| try: |
| remote_data = self.decrypt_cocoon(incoming_package) |
| except Exception as e: |
| return SyncResult( |
| success=False, merged_attractors=0, new_glyphs=0, |
| coherence_before=local_coherence, coherence_after=local_coherence, |
| tension_delta=0.0, errors=[f"Decryption failed: {e}"], |
| ) |
|
|
| |
| remote_eta = remote_data.get("metrics", {}).get("ethical_alignment", 0) |
| if remote_eta < self.ethical_target: |
| errors.append( |
| f"Remote ethical alignment {remote_eta:.3f} below target {self.ethical_target}" |
| ) |
|
|
| |
| remote_attractors = remote_data.get("attractors", []) |
| local_attractors = self._extract_attractors(local_spiderweb_state) |
| merged = self.merge_attractors( |
| local_attractors, remote_attractors, local_coherence |
| ) |
| new_attractor_count = len(merged) - len(local_attractors) |
|
|
| |
| remote_glyphs = remote_data.get("glyphs", []) |
| existing_ids = {g.get("glyph_id") for g in self._local_glyphs} |
| new_glyphs = [g for g in remote_glyphs if g.get("glyph_id") not in existing_ids] |
| self._local_glyphs.extend(new_glyphs) |
|
|
| |
| remote_coherence = remote_data.get("metrics", {}).get("phase_coherence", 0.5) |
| new_coherence = 0.7 * local_coherence + 0.3 * remote_coherence |
|
|
| remote_tension = remote_data.get("metrics", {}).get("epistemic_tension", 0.5) |
| tension_delta = remote_tension - local_tension |
|
|
| |
| self._sync_history.append({ |
| "timestamp": time.time(), |
| "remote_node": incoming_package.node_id, |
| "merged_attractors": len(merged), |
| "new_glyphs": len(new_glyphs), |
| "coherence_after": new_coherence, |
| }) |
|
|
| return SyncResult( |
| success=True, |
| merged_attractors=new_attractor_count, |
| new_glyphs=len(new_glyphs), |
| coherence_before=local_coherence, |
| coherence_after=round(new_coherence, 4), |
| tension_delta=round(tension_delta, 4), |
| errors=errors, |
| ) |
|
|
| def check_consensus( |
| self, |
| local_coherence: float, |
| local_tension: float, |
| local_eta: float, |
| ) -> Dict[str, bool]: |
| """Check if local node meets consensus criteria. |
| |
| Target: Gamma >= 0.98, xi <= 0.05, eta >= 0.90 |
| """ |
| return { |
| "phase_coherence_met": local_coherence >= self.coherence_target, |
| "tension_met": local_tension <= self.tension_target, |
| "ethical_met": local_eta >= self.ethical_target, |
| "consensus": ( |
| local_coherence >= self.coherence_target |
| and local_tension <= self.tension_target |
| and local_eta >= self.ethical_target |
| ), |
| } |
|
|
| def _extract_attractors(self, web_state: Dict) -> List[Dict]: |
| """Extract attractors from spiderweb state dict.""" |
| |
| if isinstance(web_state, dict): |
| if "attractors" in web_state: |
| return web_state["attractors"] |
| return self._local_attractors |
|
|
| def get_sync_status(self) -> Dict[str, Any]: |
| """Return sync protocol status.""" |
| return { |
| "node_id": self.node_id, |
| "total_syncs": len(self._sync_history), |
| "local_attractors": len(self._local_attractors), |
| "local_glyphs": len(self._local_glyphs), |
| "has_encryption": HAS_CRYPTO, |
| "recent_syncs": self._sync_history[-5:], |
| } |
|
|