| """ |
| Semantic caching system for cost optimization. |
| """ |
| import json |
| import logging |
| from pathlib import Path |
| from typing import Optional, Dict, Any |
| import hashlib |
| import numpy as np |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SemanticCache: |
| """Semantic cache using embeddings and cosine similarity.""" |
|
|
| def __init__( |
| self, |
| cache_dir: str = "data/cache", |
| similarity_threshold: float = 0.95 |
| ): |
| """ |
| Initialize semantic cache. |
| |
| Args: |
| cache_dir: Directory to store cache files |
| similarity_threshold: Cosine similarity threshold for cache hits |
| """ |
| self.cache_dir = Path(cache_dir) |
| self.cache_dir.mkdir(parents=True, exist_ok=True) |
| self.similarity_threshold = similarity_threshold |
| self.cache_file = self.cache_dir / "semantic_cache.json" |
| self.cache_data = self._load_cache() |
|
|
| def _load_cache(self) -> Dict[str, Any]: |
| """Load cache from disk.""" |
| if self.cache_file.exists(): |
| try: |
| with open(self.cache_file, 'r') as f: |
| return json.load(f) |
| except Exception as e: |
| logger.error(f"Error loading cache: {str(e)}") |
| return {} |
| return {} |
|
|
| def _save_cache(self): |
| """Save cache to disk.""" |
| try: |
| with open(self.cache_file, 'w') as f: |
| json.dump(self.cache_data, f, indent=2) |
| except Exception as e: |
| logger.error(f"Error saving cache: {str(e)}") |
|
|
| def _cosine_similarity( |
| self, |
| embedding1: list, |
| embedding2: list |
| ) -> float: |
| """ |
| Calculate cosine similarity between two embeddings. |
| |
| Args: |
| embedding1: First embedding vector |
| embedding2: Second embedding vector |
| |
| Returns: |
| Cosine similarity score |
| """ |
| vec1 = np.array(embedding1) |
| vec2 = np.array(embedding2) |
|
|
| dot_product = np.dot(vec1, vec2) |
| norm1 = np.linalg.norm(vec1) |
| norm2 = np.linalg.norm(vec2) |
|
|
| if norm1 == 0 or norm2 == 0: |
| return 0.0 |
|
|
| return dot_product / (norm1 * norm2) |
|
|
| def _generate_key(self, query: str, category: Optional[str] = None) -> str: |
| """Generate cache key from query and category.""" |
| content = f"{query}_{category or 'none'}" |
| return hashlib.sha256(content.encode()).hexdigest() |
|
|
| def get( |
| self, |
| query: str, |
| query_embedding: list, |
| category: Optional[str] = None |
| ) -> Optional[Dict[str, Any]]: |
| """ |
| Try to retrieve cached result. |
| |
| Args: |
| query: Search query |
| query_embedding: Query embedding vector |
| category: Optional category filter |
| |
| Returns: |
| Cached result if found, None otherwise |
| """ |
| try: |
| |
| exact_key = self._generate_key(query, category) |
| if exact_key in self.cache_data: |
| logger.info("Exact cache hit") |
| return self.cache_data[exact_key]["result"] |
|
|
| |
| best_similarity = 0.0 |
| best_result = None |
|
|
| for key, cached_item in self.cache_data.items(): |
| |
| if cached_item.get("category") != (category or "none"): |
| continue |
|
|
| cached_embedding = cached_item.get("embedding") |
| if not cached_embedding: |
| continue |
|
|
| similarity = self._cosine_similarity(query_embedding, cached_embedding) |
|
|
| if similarity > best_similarity: |
| best_similarity = similarity |
| best_result = cached_item["result"] |
|
|
| if best_similarity >= self.similarity_threshold: |
| logger.info(f"Semantic cache hit with similarity {best_similarity:.3f}") |
| return best_result |
|
|
| logger.info("Cache miss") |
| return None |
|
|
| except Exception as e: |
| logger.error(f"Error retrieving from cache: {str(e)}") |
| return None |
|
|
| def set( |
| self, |
| query: str, |
| query_embedding: list, |
| result: Dict[str, Any], |
| category: Optional[str] = None |
| ): |
| """ |
| Store result in cache. |
| |
| Args: |
| query: Search query |
| query_embedding: Query embedding vector |
| result: Result to cache |
| category: Optional category filter |
| """ |
| try: |
| key = self._generate_key(query, category) |
|
|
| self.cache_data[key] = { |
| "query": query, |
| "category": category or "none", |
| "embedding": query_embedding, |
| "result": result |
| } |
|
|
| self._save_cache() |
| logger.info(f"Cached result for query: {query[:50]}...") |
|
|
| except Exception as e: |
| logger.error(f"Error storing in cache: {str(e)}") |
|
|
|
|