| import os |
| import time |
| import threading |
| import logging |
| from typing import Dict, Any, Optional, Tuple, List, Callable, Generic, TypeVar, Union |
| from datetime import datetime |
| from dotenv import load_dotenv |
| import json |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| |
| load_dotenv() |
|
|
| |
| DEFAULT_CACHE_TTL = int(os.getenv("CACHE_TTL_SECONDS", "300")) |
| DEFAULT_CACHE_CLEANUP_INTERVAL = int(os.getenv("CACHE_CLEANUP_INTERVAL", "60")) |
| DEFAULT_CACHE_MAX_SIZE = int(os.getenv("CACHE_MAX_SIZE", "1000")) |
|
|
| |
| T = TypeVar('T') |
|
|
| |
| class CacheItem(Generic[T]): |
| def __init__(self, value: T, ttl: int = DEFAULT_CACHE_TTL): |
| self.value = value |
| self.expire_at = time.time() + ttl |
| self.last_accessed = time.time() |
| |
| def is_expired(self) -> bool: |
| """Kiểm tra xem item có hết hạn chưa""" |
| return time.time() > self.expire_at |
| |
| def touch(self) -> None: |
| """Cập nhật thời gian truy cập lần cuối""" |
| self.last_accessed = time.time() |
| |
| def extend(self, ttl: int = DEFAULT_CACHE_TTL) -> None: |
| """Gia hạn thời gian sống của item""" |
| self.expire_at = time.time() + ttl |
|
|
| |
| class InMemoryCache: |
| def __init__( |
| self, |
| ttl: int = DEFAULT_CACHE_TTL, |
| cleanup_interval: int = DEFAULT_CACHE_CLEANUP_INTERVAL, |
| max_size: int = DEFAULT_CACHE_MAX_SIZE |
| ): |
| self.cache: Dict[str, CacheItem] = {} |
| self.ttl = ttl |
| self.cleanup_interval = cleanup_interval |
| self.max_size = max_size |
| self.lock = threading.RLock() |
| |
| |
| self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True) |
| self.cleanup_thread.start() |
| |
| def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: |
| """Lưu một giá trị vào cache""" |
| with self.lock: |
| ttl_value = ttl if ttl is not None else self.ttl |
| |
| |
| if len(self.cache) >= self.max_size and key not in self.cache: |
| self._evict_lru_items() |
| |
| self.cache[key] = CacheItem(value, ttl_value) |
| logger.debug(f"Cache set: {key} (expires in {ttl_value}s)") |
| |
| def get(self, key: str, default: Any = None) -> Any: |
| """ |
| Lấy giá trị từ cache. Nếu key không tồn tại hoặc đã hết hạn, trả về giá trị mặc định. |
| Áp dụng lazy expiration: kiểm tra và xóa các item hết hạn khi truy cập. |
| """ |
| with self.lock: |
| item = self.cache.get(key) |
| |
| |
| if item is None or item.is_expired(): |
| |
| if item is not None: |
| logger.debug(f"Cache miss (expired): {key}") |
| del self.cache[key] |
| else: |
| logger.debug(f"Cache miss (not found): {key}") |
| return default |
| |
| |
| item.touch() |
| logger.debug(f"Cache hit: {key}") |
| return item.value |
| |
| def delete(self, key: str) -> bool: |
| """Xóa một key khỏi cache""" |
| with self.lock: |
| if key in self.cache: |
| del self.cache[key] |
| logger.debug(f"Cache delete: {key}") |
| return True |
| return False |
| |
| def clear(self) -> None: |
| """Xóa tất cả dữ liệu trong cache""" |
| with self.lock: |
| self.cache.clear() |
| logger.debug("Cache cleared") |
| |
| def get_or_set(self, key: str, callback: Callable[[], T], ttl: Optional[int] = None) -> T: |
| """ |
| Lấy giá trị từ cache nếu tồn tại, nếu không thì gọi callback để lấy giá trị |
| và lưu vào cache trước khi trả về. |
| """ |
| with self.lock: |
| value = self.get(key) |
| if value is None: |
| value = callback() |
| self.set(key, value, ttl) |
| return value |
| |
| def _cleanup_task(self) -> None: |
| """Thread để dọn dẹp các item đã hết hạn (active expiration)""" |
| while True: |
| time.sleep(self.cleanup_interval) |
| try: |
| self._remove_expired_items() |
| except Exception as e: |
| logger.error(f"Error in cache cleanup task: {e}") |
| |
| def _remove_expired_items(self) -> None: |
| """Xóa tất cả các item đã hết hạn trong cache""" |
| with self.lock: |
| now = time.time() |
| expired_keys = [k for k, v in self.cache.items() if v.is_expired()] |
| for key in expired_keys: |
| del self.cache[key] |
| |
| if expired_keys: |
| logger.debug(f"Cleaned up {len(expired_keys)} expired cache items") |
| |
| def _evict_lru_items(self, count: int = 1) -> None: |
| """Xóa bỏ các item ít được truy cập nhất khi cache đầy""" |
| items = sorted(self.cache.items(), key=lambda x: x[1].last_accessed) |
| for i in range(min(count, len(items))): |
| del self.cache[items[i][0]] |
| logger.debug(f"Evicted {min(count, len(items))} least recently used items from cache") |
| |
| def stats(self) -> Dict[str, Any]: |
| """Trả về thống kê về cache""" |
| with self.lock: |
| now = time.time() |
| total_items = len(self.cache) |
| expired_items = sum(1 for item in self.cache.values() if item.is_expired()) |
| memory_usage = self._estimate_memory_usage() |
| return { |
| "total_items": total_items, |
| "expired_items": expired_items, |
| "active_items": total_items - expired_items, |
| "memory_usage_bytes": memory_usage, |
| "memory_usage_mb": memory_usage / (1024 * 1024), |
| "max_size": self.max_size |
| } |
| |
| def _estimate_memory_usage(self) -> int: |
| """Ước tính dung lượng bộ nhớ của cache (gần đúng)""" |
| |
| cache_size = sum(len(k) for k in self.cache.keys()) |
| for item in self.cache.values(): |
| try: |
| |
| if isinstance(item.value, (str, bytes)): |
| cache_size += len(item.value) |
| elif isinstance(item.value, (dict, list)): |
| cache_size += len(json.dumps(item.value)) |
| else: |
| |
| cache_size += 100 |
| except: |
| cache_size += 100 |
| |
| return cache_size |
|
|
| |
| _cache_instance = None |
|
|
| def get_cache() -> InMemoryCache: |
| """Trả về instance singleton của InMemoryCache""" |
| global _cache_instance |
| if _cache_instance is None: |
| _cache_instance = InMemoryCache() |
| return _cache_instance |