| import logging |
| import time |
| import uuid |
| import threading |
| import os |
| from functools import wraps |
| from datetime import datetime, timedelta |
| import pytz |
| from typing import Callable, Any, Dict, Optional, List, Tuple, Set |
| import gc |
| import heapq |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| asia_tz = pytz.timezone('Asia/Ho_Chi_Minh') |
|
|
| def generate_uuid(): |
| """Generate a unique identifier""" |
| return str(uuid.uuid4()) |
|
|
| def get_current_time(): |
| """Get current time in ISO format""" |
| return datetime.now().isoformat() |
|
|
| def get_local_time(): |
| """Get current time in Asia/Ho_Chi_Minh timezone""" |
| return datetime.now(asia_tz).strftime("%Y-%m-%d %H:%M:%S") |
|
|
| def get_local_datetime(): |
| """Get current datetime object in Asia/Ho_Chi_Minh timezone""" |
| return datetime.now(asia_tz) |
|
|
| |
| get_vietnam_time = get_local_time |
| get_vietnam_datetime = get_local_datetime |
|
|
| def timer_decorator(func: Callable) -> Callable: |
| """ |
| Decorator to time function execution and log results. |
| """ |
| @wraps(func) |
| async def wrapper(*args, **kwargs): |
| start_time = time.time() |
| try: |
| result = await func(*args, **kwargs) |
| elapsed_time = time.time() - start_time |
| logger.info(f"Function {func.__name__} executed in {elapsed_time:.4f} seconds") |
| return result |
| except Exception as e: |
| elapsed_time = time.time() - start_time |
| logger.error(f"Function {func.__name__} failed after {elapsed_time:.4f} seconds: {e}") |
| raise |
| return wrapper |
|
|
| def sanitize_input(text): |
| """Sanitize input text""" |
| if not text: |
| return "" |
| |
| return text.strip() |
|
|
| def truncate_text(text, max_length=100): |
| """ |
| Truncate text to given max length and add ellipsis. |
| """ |
| if not text or len(text) <= max_length: |
| return text |
| return text[:max_length] + "..." |
|
|
| class CacheStrategy: |
| """Cache loading strategy enumeration""" |
| LAZY = "lazy" |
| EAGER = "eager" |
| MIXED = "mixed" |
|
|
| class CacheItem: |
| """Represents an item in the cache with metadata""" |
| def __init__(self, key: str, value: Any, ttl: int = 300, priority: int = 1): |
| self.key = key |
| self.value = value |
| self.expiry = datetime.now() + timedelta(seconds=ttl) |
| self.priority = priority |
| self.access_count = 0 |
| self.last_accessed = datetime.now() |
| |
| def is_expired(self) -> bool: |
| """Check if the item is expired""" |
| return datetime.now() > self.expiry |
| |
| def touch(self): |
| """Update last accessed time and access count""" |
| self.last_accessed = datetime.now() |
| self.access_count += 1 |
| |
| def __lt__(self, other): |
| """For heap comparisons - lower priority items are evicted first""" |
| |
| if self.priority != other.priority: |
| return self.priority < other.priority |
| |
| if self.access_count != other.access_count: |
| return self.access_count < other.access_count |
| |
| return self.last_accessed < other.last_accessed |
|
|
| def get_size(self) -> int: |
| """Approximate memory size of the cache item in bytes""" |
| try: |
| import sys |
| return sys.getsizeof(self.value) + sys.getsizeof(self.key) + 64 |
| except: |
| |
| return 1024 |
|
|
| |
| class EnhancedCache: |
| def __init__(self, |
| strategy: str = "lazy", |
| max_items: int = 10000, |
| max_size_mb: int = 100, |
| cleanup_interval: int = 60, |
| stats_enabled: bool = True): |
| """ |
| Initialize enhanced cache with configurable strategy. |
| |
| Args: |
| strategy: Cache loading strategy (lazy, eager, mixed) |
| max_items: Maximum number of items to store in cache |
| max_size_mb: Maximum size of cache in MB |
| cleanup_interval: Interval in seconds to run cleanup |
| stats_enabled: Whether to collect cache statistics |
| """ |
| self._cache: Dict[str, CacheItem] = {} |
| self._namespace_cache: Dict[str, Set[str]] = {} |
| self._strategy = strategy |
| self._max_items = max_items |
| self._max_size_bytes = max_size_mb * 1024 * 1024 |
| self._current_size_bytes = 0 |
| self._stats_enabled = stats_enabled |
| |
| |
| self._hits = 0 |
| self._misses = 0 |
| self._evictions = 0 |
| self._total_get_time = 0 |
| self._total_set_time = 0 |
| |
| |
| self._last_cleanup = datetime.now() |
| self._cleanup_interval = cleanup_interval |
| self._lock = threading.RLock() |
| |
| if cleanup_interval > 0: |
| self._start_cleanup_thread(cleanup_interval) |
| |
| logger.info(f"Enhanced cache initialized with strategy={strategy}, max_items={max_items}, max_size={max_size_mb}MB") |
| |
| def _start_cleanup_thread(self, interval: int): |
| """Start background thread for periodic cleanup""" |
| def cleanup_worker(): |
| while True: |
| time.sleep(interval) |
| try: |
| self.cleanup() |
| except Exception as e: |
| logger.error(f"Error in cache cleanup: {e}") |
| |
| thread = threading.Thread(target=cleanup_worker, daemon=True) |
| thread.start() |
| logger.info(f"Cache cleanup thread started with interval {interval}s") |
| |
| def get(self, key: str, namespace: str = None) -> Optional[Any]: |
| """Get value from cache if it exists and hasn't expired""" |
| if self._stats_enabled: |
| start_time = time.time() |
| |
| |
| cache_key = f"{namespace}:{key}" if namespace else key |
| |
| with self._lock: |
| cache_item = self._cache.get(cache_key) |
| |
| if cache_item: |
| if cache_item.is_expired(): |
| |
| self._remove_item(cache_key, namespace) |
| if self._stats_enabled: |
| self._misses += 1 |
| value = None |
| else: |
| |
| cache_item.touch() |
| if self._stats_enabled: |
| self._hits += 1 |
| value = cache_item.value |
| else: |
| if self._stats_enabled: |
| self._misses += 1 |
| value = None |
| |
| if self._stats_enabled: |
| self._total_get_time += time.time() - start_time |
| |
| return value |
| |
| def set(self, key: str, value: Any, ttl: int = 300, priority: int = 1, namespace: str = None) -> None: |
| """Set a value in the cache with TTL in seconds""" |
| if self._stats_enabled: |
| start_time = time.time() |
| |
| |
| cache_key = f"{namespace}:{key}" if namespace else key |
| |
| with self._lock: |
| |
| cache_item = CacheItem(cache_key, value, ttl, priority) |
| item_size = cache_item.get_size() |
| |
| |
| if (len(self._cache) >= self._max_items or |
| self._current_size_bytes + item_size > self._max_size_bytes): |
| self._evict_items(item_size) |
| |
| |
| if cache_key in self._cache: |
| |
| self._current_size_bytes -= self._cache[cache_key].get_size() |
| self._current_size_bytes += item_size |
| |
| |
| self._cache[cache_key] = cache_item |
| |
| |
| if namespace: |
| if namespace not in self._namespace_cache: |
| self._namespace_cache[namespace] = set() |
| self._namespace_cache[namespace].add(cache_key) |
| |
| if self._stats_enabled: |
| self._total_set_time += time.time() - start_time |
| |
| def delete(self, key: str, namespace: str = None) -> None: |
| """Delete a key from the cache""" |
| |
| cache_key = f"{namespace}:{key}" if namespace else key |
| |
| with self._lock: |
| self._remove_item(cache_key, namespace) |
| |
| def _remove_item(self, key: str, namespace: str = None): |
| """Internal method to remove an item and update tracking""" |
| if key in self._cache: |
| |
| self._current_size_bytes -= self._cache[key].get_size() |
| |
| del self._cache[key] |
| |
| |
| if namespace and namespace in self._namespace_cache: |
| if key in self._namespace_cache[namespace]: |
| self._namespace_cache[namespace].remove(key) |
| |
| if not self._namespace_cache[namespace]: |
| del self._namespace_cache[namespace] |
| |
| def _evict_items(self, needed_space: int = 0) -> None: |
| """Evict items to make room in the cache""" |
| if not self._cache: |
| return |
| |
| with self._lock: |
| |
| items = list(self._cache.values()) |
| |
| |
| items.sort() |
| |
| |
| space_freed = 0 |
| evicted_count = 0 |
| |
| for item in items: |
| |
| if (len(self._cache) - evicted_count <= self._max_items * 0.9 and |
| (space_freed >= needed_space or |
| self._current_size_bytes - space_freed <= self._max_size_bytes * 0.9)): |
| break |
| |
| |
| if item.priority > 9 and evicted_count < len(items) // 2: |
| continue |
| |
| |
| item_size = item.get_size() |
| namespace = item.key.split(':', 1)[0] if ':' in item.key else None |
| self._remove_item(item.key, namespace) |
| |
| space_freed += item_size |
| evicted_count += 1 |
| if self._stats_enabled: |
| self._evictions += 1 |
| |
| logger.info(f"Cache eviction: removed {evicted_count} items, freed {space_freed / 1024:.2f}KB") |
| |
| def clear(self, namespace: str = None) -> None: |
| """ |
| Clear the cache or a specific namespace |
| """ |
| with self._lock: |
| if namespace: |
| |
| if namespace in self._namespace_cache: |
| keys_to_remove = list(self._namespace_cache[namespace]) |
| for key in keys_to_remove: |
| self._remove_item(key, namespace) |
| |
| else: |
| |
| self._cache.clear() |
| self._namespace_cache.clear() |
| self._current_size_bytes = 0 |
| |
| logger.info(f"Cache cleared{' for namespace ' + namespace if namespace else ''}") |
| |
| def cleanup(self) -> None: |
| """Remove expired items and run garbage collection if needed""" |
| with self._lock: |
| now = datetime.now() |
| |
| if (now - self._last_cleanup).total_seconds() < self._cleanup_interval: |
| return |
| |
| |
| expired_keys = [] |
| for key, item in self._cache.items(): |
| if item.is_expired(): |
| expired_keys.append((key, key.split(':', 1)[0] if ':' in key else None)) |
| |
| |
| for key, namespace in expired_keys: |
| self._remove_item(key, namespace) |
| |
| |
| self._last_cleanup = now |
| |
| |
| if len(expired_keys) > 100: |
| gc.collect() |
| |
| logger.info(f"Cache cleanup: removed {len(expired_keys)} expired items") |
| |
| def get_stats(self) -> Dict: |
| """Get cache statistics""" |
| with self._lock: |
| if not self._stats_enabled: |
| return {"stats_enabled": False} |
| |
| |
| total_requests = self._hits + self._misses |
| hit_rate = (self._hits / total_requests) * 100 if total_requests > 0 else 0 |
| |
| |
| avg_get_time = (self._total_get_time / total_requests) * 1000 if total_requests > 0 else 0 |
| avg_set_time = (self._total_set_time / self._evictions) * 1000 if self._evictions > 0 else 0 |
| |
| return { |
| "stats_enabled": True, |
| "item_count": len(self._cache), |
| "max_items": self._max_items, |
| "size_bytes": self._current_size_bytes, |
| "max_size_bytes": self._max_size_bytes, |
| "hits": self._hits, |
| "misses": self._misses, |
| "hit_rate_percent": round(hit_rate, 2), |
| "evictions": self._evictions, |
| "avg_get_time_ms": round(avg_get_time, 3), |
| "avg_set_time_ms": round(avg_set_time, 3), |
| "namespace_count": len(self._namespace_cache), |
| "namespaces": list(self._namespace_cache.keys()) |
| } |
| |
| def preload(self, items: List[Tuple[str, Any, int, int]], namespace: str = None) -> None: |
| """ |
| Preload a list of items into the cache |
| |
| Args: |
| items: List of (key, value, ttl, priority) tuples |
| namespace: Optional namespace for all items |
| """ |
| for key, value, ttl, priority in items: |
| self.set(key, value, ttl, priority, namespace) |
| |
| logger.info(f"Preloaded {len(items)} items into cache{' namespace ' + namespace if namespace else ''}") |
| |
| def get_or_load(self, key: str, loader_func: Callable[[], Any], |
| ttl: int = 300, priority: int = 1, namespace: str = None) -> Any: |
| """ |
| Get from cache or load using the provided function |
| |
| Args: |
| key: Cache key |
| loader_func: Function to call if cache miss occurs |
| ttl: TTL in seconds |
| priority: Item priority |
| namespace: Optional namespace |
| |
| Returns: |
| Cached or freshly loaded value |
| """ |
| |
| value = self.get(key, namespace) |
| |
| |
| if value is None: |
| value = loader_func() |
| |
| if value is not None: |
| self.set(key, value, ttl, priority, namespace) |
| |
| return value |
|
|
| |
| CACHE_STRATEGY = os.getenv("CACHE_STRATEGY", "mixed") |
| CACHE_MAX_ITEMS = int(os.getenv("CACHE_MAX_ITEMS", "10000")) |
| CACHE_MAX_SIZE_MB = int(os.getenv("CACHE_MAX_SIZE_MB", "100")) |
| CACHE_CLEANUP_INTERVAL = int(os.getenv("CACHE_CLEANUP_INTERVAL", "60")) |
| CACHE_STATS_ENABLED = os.getenv("CACHE_STATS_ENABLED", "true").lower() in ("true", "1", "yes") |
|
|
| |
| cache = EnhancedCache( |
| strategy=CACHE_STRATEGY, |
| max_items=CACHE_MAX_ITEMS, |
| max_size_mb=CACHE_MAX_SIZE_MB, |
| cleanup_interval=CACHE_CLEANUP_INTERVAL, |
| stats_enabled=CACHE_STATS_ENABLED |
| ) |
|
|
| |
| class SimpleCache: |
| def __init__(self): |
| """Legacy SimpleCache implementation that uses EnhancedCache underneath""" |
| logger.warning("SimpleCache is deprecated, please use EnhancedCache directly") |
| |
| def get(self, key: str) -> Optional[Any]: |
| """Get value from cache if it exists and hasn't expired""" |
| return cache.get(key) |
| |
| def set(self, key: str, value: Any, ttl: int = 300) -> None: |
| """Set a value in the cache with TTL in seconds""" |
| cache.set(key, value, ttl) |
| |
| def delete(self, key: str) -> None: |
| """Delete a key from the cache""" |
| cache.delete(key) |
| |
| def clear(self) -> None: |
| """Clear the entire cache""" |
| cache.clear() |
|
|
| def get_host_url(request) -> str: |
| """ |
| Get the host URL from a request object. |
| """ |
| host = request.headers.get("host", "localhost") |
| scheme = request.headers.get("x-forwarded-proto", "http") |
| return f"{scheme}://{host}" |
|
|
| def format_time(timestamp): |
| """ |
| Format a timestamp into a human-readable string. |
| """ |
| return timestamp.strftime("%Y-%m-%d %H:%M:%S") |