| """ |
| 과제 공고 벡터 DB 캐시 시스템 |
| - ChromaDB를 사용한 로컬 캐시 |
| - 메타 캐시 + 본문 벡터 캐시 (2개 컬렉션) |
| - 사용자 프로필 저장 (이메일 기반) |
| - 매일 KST 10:00, 22:00 자동 동기화 |
| - 백그라운드 본문 인덱싱 (서비스 무중단) |
| - Hugging Face Space 영구 스토리지 활용 (/data) |
| """ |
| import os |
| import json |
| import hashlib |
| import threading |
| import logging |
| import tempfile |
| import time |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Tuple, Optional, Generator |
| from pathlib import Path |
| import pytz |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
|
|
| PERSISTENT_DIR = "/data" |
| LOCAL_FALLBACK_DIR = "./data" |
|
|
| |
| _STORAGE_BASE_PATH = None |
|
|
| def _try_enable_persistent_storage_via_api() -> bool: |
| """HF API로 Persistent Storage 활성화 시도""" |
| hf_token = os.getenv("HF_TOKEN") |
| space_id = os.getenv("SPACE_ID") |
| |
| if not hf_token: |
| logger.warning("⚠️ HF_TOKEN 환경변수 없음 - API 활성화 불가") |
| return False |
| if not space_id: |
| logger.warning("⚠️ SPACE_ID 환경변수 없음 - HF Spaces 환경이 아닌 것 같습니다") |
| return False |
| |
| logger.info(f"🔄 Persistent Storage API 활성화 시도... Space ID: {space_id}") |
| |
| try: |
| from huggingface_hub import HfApi, SpaceStorage |
| api = HfApi(token=hf_token) |
| |
| try: |
| runtime = api.get_space_runtime(repo_id=space_id) |
| current_storage = getattr(runtime, 'storage', None) |
| if current_storage: |
| logger.info(f"✅ Persistent Storage 이미 활성화됨: {current_storage}") |
| return True |
| else: |
| logger.info("📦 Persistent Storage 비활성화 상태 - 활성화 시도...") |
| except Exception as e: |
| logger.warning(f"⚠️ Space 상태 확인 실패: {e}") |
| |
| api.request_space_storage(repo_id=space_id, storage=SpaceStorage.SMALL) |
| logger.info("✅ Persistent Storage SMALL 활성화 요청 완료!") |
| logger.info("⏳ Space가 재시작됩니다. 잠시 후 /data 디렉토리가 생성됩니다.") |
| return True |
| |
| except ImportError: |
| logger.warning("⚠️ huggingface_hub 라이브러리 없음") |
| return False |
| except Exception as e: |
| error_msg = str(e) |
| if "already" in error_msg.lower() or "exists" in error_msg.lower(): |
| logger.info(f"✅ Persistent Storage 이미 활성화됨") |
| return True |
| elif "payment" in error_msg.lower() or "billing" in error_msg.lower(): |
| logger.warning(f"⚠️ 결제 정보 필요: {error_msg}") |
| else: |
| logger.warning(f"⚠️ API 활성화 실패: {error_msg}") |
| return False |
|
|
| def _determine_storage_path() -> Path: |
| """영구 저장소 경로 결정 (AETHER 패턴)""" |
| global _STORAGE_BASE_PATH |
| |
| |
| if _STORAGE_BASE_PATH is not None: |
| return _STORAGE_BASE_PATH |
| |
| print("\n" + "=" * 60) |
| print("🔍 CoBIZ 캐시 스토리지 초기화 중...") |
| print("=" * 60) |
| |
| |
| if os.path.exists(PERSISTENT_DIR): |
| try: |
| test_file = os.path.join(PERSISTENT_DIR, ".write_test") |
| with open(test_file, "w") as f: |
| f.write("test") |
| os.remove(test_file) |
| |
| _STORAGE_BASE_PATH = Path(PERSISTENT_DIR) |
| logger.info(f"✅ HF Spaces 영구 스토리지 활성화: {PERSISTENT_DIR}") |
| |
| |
| existing_files = [f for f in os.listdir(PERSISTENT_DIR) |
| if f.endswith('.db') or f.endswith('.json') or f == 'announcement_cache'] |
| if existing_files: |
| logger.info(f"📁 기존 파일 발견: {existing_files}") |
| else: |
| logger.info("📁 기존 파일 없음 (새로운 스토리지)") |
| |
| print("=" * 60 + "\n") |
| return _STORAGE_BASE_PATH |
| |
| except Exception as e: |
| logger.warning(f"⚠️ /data 쓰기 테스트 실패: {e}") |
| else: |
| logger.warning(f"⚠️ {PERSISTENT_DIR} 디렉토리 없음") |
| |
| |
| logger.info("\n🚀 API로 Persistent Storage 활성화 시도...") |
| if _try_enable_persistent_storage_via_api(): |
| logger.info("💡 API 요청 완료. Space 재시작 후 /data 사용 가능") |
| |
| |
| env_path = os.environ.get("PERSISTENT_DATA_DIR") |
| if env_path: |
| try: |
| os.makedirs(env_path, exist_ok=True) |
| test_file = os.path.join(env_path, ".write_test") |
| with open(test_file, "w") as f: |
| f.write("test") |
| os.remove(test_file) |
| |
| _STORAGE_BASE_PATH = Path(env_path) |
| logger.info(f"✅ 환경변수 스토리지 사용: {env_path}") |
| print("=" * 60 + "\n") |
| return _STORAGE_BASE_PATH |
| except Exception as e: |
| logger.warning(f"⚠️ PERSISTENT_DATA_DIR 쓰기 불가: {e}") |
| |
| |
| os.makedirs(LOCAL_FALLBACK_DIR, exist_ok=True) |
| _STORAGE_BASE_PATH = Path(LOCAL_FALLBACK_DIR) |
| |
| logger.warning(f"\n🟡 현재 로컬 스토리지 사용: {LOCAL_FALLBACK_DIR}") |
| logger.warning(" ⚠️ 재시작 시 모든 데이터가 손실됩니다!") |
| logger.warning(" 💡 HF Space Settings → Persistent Storage → Enable 필요!") |
| print("=" * 60 + "\n") |
| |
| return _STORAGE_BASE_PATH |
|
|
| def get_storage_info() -> dict: |
| """스토리지 상태 정보 반환""" |
| base_path = _determine_storage_path() |
| cache_dir = base_path / "announcement_cache" |
| |
| info = { |
| "base_path": str(base_path), |
| "is_persistent": str(base_path) == PERSISTENT_DIR, |
| "cache_dir": str(cache_dir), |
| "cache_exists": cache_dir.exists(), |
| "files": [] |
| } |
| |
| if cache_dir.exists(): |
| info["files"] = [f for f in os.listdir(cache_dir) if not f.startswith('.')] |
| |
| return info |
|
|
| def verify_storage_persistence(): |
| """스토리지 영속성 검증 (디버깅용)""" |
| base_path = _determine_storage_path() |
| cache_dir = base_path / "announcement_cache" |
| |
| logger.info(f"\n🔍 스토리지 영속성 검증:") |
| logger.info(f" 기본 경로: {base_path}") |
| logger.info(f" 영구 저장: {'✅ 예' if str(base_path) == PERSISTENT_DIR else '❌ 아니오'}") |
| logger.info(f" 캐시 디렉토리: {cache_dir}") |
| logger.info(f" 캐시 존재: {cache_dir.exists()}") |
| |
| if cache_dir.exists(): |
| files = list(cache_dir.iterdir()) |
| logger.info(f" 파일 수: {len(files)}") |
| for f in files[:5]: |
| logger.info(f" - {f.name}") |
|
|
| |
| PERSISTENT_DIR_PATH = _determine_storage_path() |
| verify_storage_persistence() |
| |
| CACHE_DIR = PERSISTENT_DIR_PATH / "announcement_cache" |
| DB_PATH = CACHE_DIR / "chroma_db" |
| METADATA_FILE = CACHE_DIR / "sync_metadata.json" |
| CONTENT_INDEX_FILE = CACHE_DIR / "content_index_status.json" |
| PROFILE_DIR = CACHE_DIR / "user_profiles" |
|
|
| |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) |
| PROFILE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| logger.info(f"Cache directory: {CACHE_DIR}") |
| logger.info(f"Profile directory: {PROFILE_DIR}") |
| logger.info(f"Persistent storage: {'✅ YES' if str(PERSISTENT_DIR_PATH) == PERSISTENT_DIR else '❌ NO (local only)'}") |
| try: |
| import chromadb |
| from chromadb.config import Settings |
| CHROMADB_AVAILABLE = True |
| except ImportError: |
| CHROMADB_AVAILABLE = False |
| logger.warning("ChromaDB not available. Using JSON fallback.") |
| try: |
| from apscheduler.schedulers.background import BackgroundScheduler |
| from apscheduler.triggers.cron import CronTrigger |
| SCHEDULER_AVAILABLE = True |
| except ImportError: |
| SCHEDULER_AVAILABLE = False |
| logger.warning("APScheduler not available. Auto-sync disabled.") |
| try: |
| from sentence_transformers import SentenceTransformer |
| EMBEDDING_AVAILABLE = True |
| except ImportError: |
| EMBEDDING_AVAILABLE = False |
| logger.warning("sentence-transformers not available. Using ChromaDB default embedding.") |
| KST = pytz.timezone('Asia/Seoul') |
| _embedding_model = None |
| def get_embedding_model(): |
| """한국어 임베딩 모델 로드 (싱글톤)""" |
| global _embedding_model |
| if _embedding_model is None and EMBEDDING_AVAILABLE: |
| try: |
| _embedding_model = SentenceTransformer('jhgan/ko-sroberta-multitask') |
| logger.info("Loaded Korean embedding model: jhgan/ko-sroberta-multitask") |
| except Exception as e: |
| logger.error(f"Failed to load embedding model: {e}") |
| return _embedding_model |
| class AnnouncementCache: |
| """메타 정보 캐시 (ChromaDB 기반)""" |
| def __init__(self): |
| self.client = None |
| self.collection = None |
| self._init_db() |
| def _init_db(self): |
| """ChromaDB 초기화""" |
| if not CHROMADB_AVAILABLE: |
| return |
| try: |
| self.client = chromadb.PersistentClient(path=str(DB_PATH)) |
| self.collection = self.client.get_or_create_collection( |
| name="announcements", |
| metadata={"hnsw:space": "cosine"} |
| ) |
| logger.info(f"ChromaDB initialized at {DB_PATH}") |
| except Exception as e: |
| logger.error(f"ChromaDB init error: {e}") |
| self.client = None |
| self.collection = None |
| def _generate_id(self, item: dict) -> str: |
| """공고 고유 ID 생성""" |
| pblanc_id = item.get("pblancId") or item.get("seq") or "" |
| title = item.get("title") or item.get("pblancNm") or "" |
| unique_str = f"{pblanc_id}_{title}" |
| return hashlib.md5(unique_str.encode()).hexdigest() |
| def _item_to_document(self, item: dict) -> str: |
| """공고 정보를 검색용 문서로 변환""" |
| parts = [] |
| title = item.get("title") or item.get("pblancNm") or "" |
| if title: |
| parts.append(title) |
| description = item.get("description") or item.get("bsnsSumryCn") or "" |
| if description: |
| import re |
| description = re.sub(r'<[^>]+>', '', description).strip() |
| parts.append(description[:500]) |
| author = item.get("author") or item.get("jrsdInsttNm") or "" |
| if author: |
| parts.append(author) |
| category = item.get("lcategory") or item.get("pldirSportRealmLclasCodeNm") or "" |
| if category: |
| parts.append(category) |
| hash_tags = item.get("hashTags") or "" |
| if hash_tags: |
| parts.append(hash_tags) |
| target = item.get("trgetNm") or "" |
| if target: |
| parts.append(target) |
| return " ".join(parts) |
| def _item_to_metadata(self, item: dict) -> dict: |
| """공고 정보를 메타데이터로 변환""" |
| def safe_str(val): |
| if val is None: |
| return "" |
| return str(val)[:500] |
| return { |
| "pblancId": safe_str(item.get("pblancId") or item.get("seq")), |
| "title": safe_str(item.get("title") or item.get("pblancNm")), |
| "author": safe_str(item.get("author") or item.get("jrsdInsttNm")), |
| "category": safe_str(item.get("lcategory") or item.get("pldirSportRealmLclasCodeNm")), |
| "reqstDt": safe_str(item.get("reqstDt") or item.get("reqstBeginEndDe")), |
| "pubDate": safe_str(item.get("pubDate") or item.get("creatPnttm")), |
| "link": safe_str(item.get("link") or item.get("pblancUrl")), |
| "hashTags": safe_str(item.get("hashTags")), |
| "target": safe_str(item.get("trgetNm")), |
| "excInsttNm": safe_str(item.get("excInsttNm")), |
| "fileUrl": safe_str(item.get("flpthNm")), |
| "fileName": safe_str(item.get("fileNm")), |
| "printFileUrl": safe_str(item.get("printFlpthNm")), |
| "printFileName": safe_str(item.get("printFileNm")), |
| "description": safe_str(item.get("description") or item.get("bsnsSumryCn")), |
| } |
| def bulk_upsert(self, items: List[dict]) -> Tuple[int, int]: |
| """대량 데이터 삽입/업데이트""" |
| if not self.collection or not items: |
| return 0, 0 |
| added = 0 |
| updated = 0 |
| batch_size = 100 |
| for i in range(0, len(items), batch_size): |
| batch = items[i:i+batch_size] |
| ids = [] |
| documents = [] |
| metadatas = [] |
| for item in batch: |
| doc_id = self._generate_id(item) |
| ids.append(doc_id) |
| documents.append(self._item_to_document(item)) |
| metadatas.append(self._item_to_metadata(item)) |
| try: |
| existing = self.collection.get(ids=ids) |
| existing_ids = set(existing['ids']) if existing['ids'] else set() |
| self.collection.upsert( |
| ids=ids, |
| documents=documents, |
| metadatas=metadatas |
| ) |
| for doc_id in ids: |
| if doc_id in existing_ids: |
| updated += 1 |
| else: |
| added += 1 |
| logger.info(f"Upserted batch {i//batch_size + 1}: {len(batch)} items") |
| except Exception as e: |
| logger.error(f"Batch upsert error: {e}") |
| logger.info(f"Bulk upsert complete: {added} added, {updated} updated") |
| return added, updated |
| def search(self, query: str, n_results: int = 20, filters: dict = None) -> List[dict]: |
| """시맨틱 검색""" |
| if not self.collection: |
| return [] |
| try: |
| where_filter = None |
| if filters: |
| conditions = [] |
| if filters.get("category") and filters["category"] != "전체": |
| conditions.append({"category": {"$eq": filters["category"]}}) |
| if filters.get("author"): |
| conditions.append({"author": {"$contains": filters["author"]}}) |
| if conditions: |
| where_filter = {"$and": conditions} if len(conditions) > 1 else conditions[0] |
| results = self.collection.query( |
| query_texts=[query], |
| n_results=n_results, |
| where=where_filter, |
| include=["documents", "metadatas", "distances"] |
| ) |
| items = [] |
| if results and results['ids'] and results['ids'][0]: |
| for i, doc_id in enumerate(results['ids'][0]): |
| metadata = results['metadatas'][0][i] if results['metadatas'] else {} |
| distance = results['distances'][0][i] if results['distances'] else 0 |
| item = { |
| "id": doc_id, |
| "similarity_score": 1 - distance, |
| **metadata |
| } |
| items.append(item) |
| return items |
| except Exception as e: |
| logger.error(f"Search error: {e}") |
| return [] |
| def get_all(self, limit: int = 1000) -> List[dict]: |
| """전체 데이터 조회""" |
| if not self.collection: |
| return [] |
| try: |
| results = self.collection.get( |
| limit=limit, |
| include=["metadatas"] |
| ) |
| items = [] |
| if results and results['ids']: |
| for i, doc_id in enumerate(results['ids']): |
| metadata = results['metadatas'][i] if results['metadatas'] else {} |
| items.append({"id": doc_id, **metadata}) |
| return items |
| except Exception as e: |
| logger.error(f"Get all error: {e}") |
| return [] |
| def get_count(self) -> int: |
| """총 데이터 수""" |
| if not self.collection: |
| return 0 |
| try: |
| return self.collection.count() |
| except: |
| return 0 |
| def clear(self): |
| """캐시 초기화""" |
| if self.client: |
| try: |
| self.client.delete_collection("announcements") |
| self._init_db() |
| logger.info("Cache cleared") |
| except Exception as e: |
| logger.error(f"Clear error: {e}") |
| class ContentVectorCache: |
| """본문 벡터 캐시 (한국어 임베딩)""" |
| def __init__(self, shared_client=None): |
| self.client = shared_client |
| self.collection = None |
| self.embedding_model = None |
| self._index_status = {"in_progress": False, "current": 0, "total": 0, "failed": []} |
| self._load_index_status() |
| def _load_index_status(self): |
| """인덱싱 상태 로드""" |
| if CONTENT_INDEX_FILE.exists(): |
| try: |
| with open(CONTENT_INDEX_FILE, 'r', encoding='utf-8') as f: |
| saved = json.load(f) |
| self._index_status["failed"] = saved.get("failed", []) |
| except: |
| pass |
| def _save_index_status(self): |
| """인덱싱 상태 저장""" |
| try: |
| with open(CONTENT_INDEX_FILE, 'w', encoding='utf-8') as f: |
| json.dump({ |
| "failed": self._index_status["failed"][-100:], |
| "last_updated": datetime.now(KST).isoformat() |
| }, f, ensure_ascii=False) |
| except: |
| pass |
| def _init_db(self): |
| """본문 벡터 DB 초기화 (메타 캐시와 클라이언트 공유)""" |
| if not CHROMADB_AVAILABLE: |
| return |
| try: |
| if self.client is None: |
| meta_cache = get_cache() |
| self.client = meta_cache.client |
| if self.client is None: |
| logger.error("No ChromaDB client available for content cache") |
| return |
| self.collection = self.client.get_or_create_collection( |
| name="announcement_contents", |
| metadata={"hnsw:space": "cosine"} |
| ) |
| logger.info("Content VectorDB initialized (shared client)") |
| except Exception as e: |
| logger.error(f"Content VectorDB init error: {e}") |
| def _get_embedding(self, text: str) -> Optional[List[float]]: |
| """텍스트 임베딩 생성""" |
| if self.embedding_model is None: |
| self.embedding_model = get_embedding_model() |
| if self.embedding_model is None: |
| return None |
| try: |
| text = text[:8000] |
| embedding = self.embedding_model.encode(text, show_progress_bar=False) |
| return embedding.tolist() |
| except Exception as e: |
| logger.error(f"Embedding error: {e}") |
| return None |
| def add_content(self, pblanc_id: str, title: str, content: str) -> bool: |
| """본문 콘텐츠 추가""" |
| if not self.collection: |
| self._init_db() |
| if not self.collection: |
| return False |
| try: |
| embedding = self._get_embedding(content) |
| if embedding is None: |
| return False |
| doc_id = f"content_{pblanc_id}" |
| self.collection.upsert( |
| ids=[doc_id], |
| embeddings=[embedding], |
| documents=[content[:5000]], |
| metadatas=[{ |
| "pblancId": pblanc_id, |
| "title": title[:200], |
| "content_preview": content[:500], |
| "indexed_at": datetime.now(KST).isoformat() |
| }] |
| ) |
| return True |
| except Exception as e: |
| logger.error(f"Add content error: {e}") |
| return False |
| def search_similar(self, query: str, n_results: int = 20) -> List[dict]: |
| """유사 본문 검색""" |
| if not self.collection: |
| self._init_db() |
| if not self.collection: |
| return [] |
| try: |
| embedding = self._get_embedding(query) |
| if embedding is None: |
| return [] |
| results = self.collection.query( |
| query_embeddings=[embedding], |
| n_results=n_results, |
| include=["documents", "metadatas", "distances"] |
| ) |
| items = [] |
| if results and results['ids'] and results['ids'][0]: |
| for i, doc_id in enumerate(results['ids'][0]): |
| metadata = results['metadatas'][0][i] if results['metadatas'] else {} |
| distance = results['distances'][0][i] if results['distances'] else 0 |
| items.append({ |
| "id": doc_id, |
| "similarity_score": 1 - distance, |
| "pblancId": metadata.get("pblancId", ""), |
| "title": metadata.get("title", ""), |
| "content_preview": metadata.get("content_preview", ""), |
| }) |
| return items |
| except Exception as e: |
| logger.error(f"Content search error: {e}") |
| return [] |
| def get_indexed_ids(self) -> set: |
| """인덱싱된 공고 ID 목록""" |
| if not self.collection: |
| self._init_db() |
| if not self.collection: |
| return set() |
| try: |
| results = self.collection.get(include=["metadatas"]) |
| ids = set() |
| if results: |
| |
| if results.get('metadatas'): |
| for meta in results['metadatas']: |
| if meta and meta.get("pblancId"): |
| ids.add(meta["pblancId"]) |
| |
| if results.get('ids'): |
| for doc_id in results['ids']: |
| if doc_id and doc_id.startswith("content_"): |
| pblanc_id = doc_id.replace("content_", "", 1) |
| if pblanc_id: |
| ids.add(pblanc_id) |
| logger.info(f"Found {len(ids)} indexed content IDs") |
| return ids |
| except Exception as e: |
| logger.error(f"Get indexed IDs error: {e}") |
| return set() |
| def get_indexed_count(self) -> int: |
| """인덱싱된 콘텐츠 수""" |
| if not self.collection: |
| self._init_db() |
| if not self.collection: |
| return 0 |
| try: |
| return self.collection.count() |
| except: |
| return 0 |
| def get_status(self) -> dict: |
| """인덱싱 상태""" |
| return { |
| "total_indexed": self.get_indexed_count(), |
| "in_progress": self._index_status["in_progress"], |
| "progress_current": self._index_status["current"], |
| "progress_total": self._index_status["total"], |
| "failed_count": len(self._index_status["failed"]) |
| } |
| class UserProfileCache: |
| """사용자 프로필 저장/로드 클래스 (이메일 기반, 키워드 포함)""" |
| def __init__(self): |
| self.profiles_file = PROFILE_DIR / "profiles.json" |
| self.profiles = {} |
| self._load_profiles() |
| def _load_profiles(self): |
| """프로필 데이터 로드""" |
| if self.profiles_file.exists(): |
| try: |
| with open(self.profiles_file, 'r', encoding='utf-8') as f: |
| self.profiles = json.load(f) |
| logger.info(f"Loaded {len(self.profiles)} user profiles") |
| except Exception as e: |
| logger.error(f"Profile load error: {e}") |
| self.profiles = {} |
| def _save_profiles(self): |
| """프로필 데이터 저장""" |
| try: |
| with open(self.profiles_file, 'w', encoding='utf-8') as f: |
| json.dump(self.profiles, f, ensure_ascii=False, indent=2) |
| except Exception as e: |
| logger.error(f"Profile save error: {e}") |
| def _normalize_email(self, email: str) -> str: |
| """이메일 정규화 (소문자 변환, 공백 제거)""" |
| return email.strip().lower() |
| def save_profile(self, email: str, profile_data: dict, keywords: List[str] = None) -> Tuple[bool, str]: |
| """프로필 저장 (키워드 포함)""" |
| if not email or '@' not in email: |
| return False, "올바른 이메일 주소를 입력해주세요." |
| email_key = self._normalize_email(email) |
| |
| |
| clean_keywords = [] |
| if keywords: |
| clean_keywords = [kw.strip() for kw in keywords if kw and kw.strip()][:5] |
| |
| self.profiles[email_key] = { |
| "email": email_key, |
| "profile": profile_data, |
| "keywords": clean_keywords, |
| "updated_at": datetime.now(KST).isoformat(), |
| "created_at": self.profiles.get(email_key, {}).get("created_at", datetime.now(KST).isoformat()) |
| } |
| self._save_profiles() |
| kw_str = ", ".join(clean_keywords) if clean_keywords else "없음" |
| logger.info(f"Profile saved for: {email_key} (keywords: {kw_str})") |
| return True, f"✅ 프로필이 저장되었습니다. ({email_key}, 키워드 {len(clean_keywords)}개)" |
| def load_profile(self, email: str) -> Tuple[Optional[dict], str, List[str]]: |
| """프로필 로드 (키워드 포함) - 3개 값 반환""" |
| if not email or '@' not in email: |
| return None, "올바른 이메일 주소를 입력해주세요.", [] |
| email_key = self._normalize_email(email) |
| if email_key in self.profiles: |
| data = self.profiles[email_key] |
| profile = data.get("profile", {}) |
| keywords = data.get("keywords", []) |
| |
| while len(keywords) < 5: |
| keywords.append("") |
| logger.info(f"Profile loaded for: {email_key}") |
| return profile, f"✅ 프로필을 불러왔습니다. (마지막 수정: {data.get('updated_at', '알 수 없음')[:10]})", keywords[:5] |
| return None, f"ℹ️ '{email_key}'에 저장된 프로필이 없습니다. 새로 입력해주세요.", [] |
| def get_profile_keywords(self, email: str) -> List[str]: |
| """키워드만 조회""" |
| if not email or '@' not in email: |
| return [] |
| email_key = self._normalize_email(email) |
| if email_key not in self.profiles: |
| return [] |
| keywords = self.profiles[email_key].get("keywords", []) |
| while len(keywords) < 5: |
| keywords.append("") |
| return keywords[:5] |
| def delete_profile(self, email: str) -> Tuple[bool, str]: |
| """프로필 삭제""" |
| email_key = self._normalize_email(email) |
| if email_key in self.profiles: |
| del self.profiles[email_key] |
| self._save_profiles() |
| return True, f"✅ 프로필이 삭제되었습니다. ({email_key})" |
| return False, f"⚠️ 삭제할 프로필이 없습니다." |
| def get_profile_count(self) -> int: |
| """저장된 프로필 수""" |
| return len(self.profiles) |
| def get_all_profiles(self) -> dict: |
| """모든 프로필 반환 (관리자용)""" |
| return self.profiles |
|
|
| class FundProfileCache: |
| """정책자금 사전심사 프로필 저장/로드 클래스 (이메일 기반, 히스토리 누적)""" |
| MAX_HISTORY = 20 |
| |
| def __init__(self): |
| self.fund_file = PROFILE_DIR / "fund_profiles.json" |
| self.fund_data = {} |
| self._load_fund_data() |
| |
| def _load_fund_data(self): |
| """정책자금 데이터 로드""" |
| if self.fund_file.exists(): |
| try: |
| with open(self.fund_file, 'r', encoding='utf-8') as f: |
| self.fund_data = json.load(f) |
| logger.info(f"Loaded fund profiles for {len(self.fund_data)} users") |
| except Exception as e: |
| logger.error(f"Fund profile load error: {e}") |
| self.fund_data = {} |
| |
| def _save_fund_data(self): |
| """정책자금 데이터 저장""" |
| try: |
| with open(self.fund_file, 'w', encoding='utf-8') as f: |
| json.dump(self.fund_data, f, ensure_ascii=False, indent=2) |
| except Exception as e: |
| logger.error(f"Fund profile save error: {e}") |
| |
| def _normalize_email(self, email: str) -> str: |
| """이메일 정규화""" |
| return email.strip().lower() |
| |
| def save_fund_analysis(self, email: str, company_info: dict, financial_info: dict, |
| caution_checks: List[str], analysis_results: dict) -> Tuple[bool, str]: |
| """정책자금 분석 결과 저장 (히스토리 누적)""" |
| if not email or '@' not in email: |
| return False, "올바른 이메일 주소를 입력해주세요." |
| |
| email_key = self._normalize_email(email) |
| |
| |
| new_record = { |
| "timestamp": datetime.now(KST).isoformat(), |
| "company_info": company_info, |
| "financial_info": financial_info, |
| "caution_checks": caution_checks, |
| "analysis_results": analysis_results |
| } |
| |
| |
| if email_key not in self.fund_data: |
| self.fund_data[email_key] = { |
| "email": email_key, |
| "history": [], |
| "created_at": datetime.now(KST).isoformat() |
| } |
| |
| |
| self.fund_data[email_key]["history"].insert(0, new_record) |
| |
| |
| if len(self.fund_data[email_key]["history"]) > self.MAX_HISTORY: |
| self.fund_data[email_key]["history"] = self.fund_data[email_key]["history"][:self.MAX_HISTORY] |
| |
| self.fund_data[email_key]["updated_at"] = datetime.now(KST).isoformat() |
| self._save_fund_data() |
| |
| history_count = len(self.fund_data[email_key]["history"]) |
| company_name = company_info.get("company_name", "") |
| logger.info(f"Fund analysis saved for: {email_key} ({company_name}), history: {history_count}") |
| return True, f"✅ 분석 결과가 저장되었습니다. (총 {history_count}건 기록)" |
| |
| def load_fund_profile(self, email: str) -> Tuple[Optional[dict], str]: |
| """정책자금 프로필 로드 (최신 기록)""" |
| if not email or '@' not in email: |
| return None, "올바른 이메일 주소를 입력해주세요." |
| |
| email_key = self._normalize_email(email) |
| |
| if email_key in self.fund_data: |
| data = self.fund_data[email_key] |
| history = data.get("history", []) |
| if history: |
| latest = history[0] |
| return latest, f"✅ 최근 분석 기록 불러옴 ({latest.get('timestamp', '')[:10]})" |
| return None, "ℹ️ 저장된 분석 기록이 없습니다." |
| return None, f"ℹ️ '{email_key}'의 정책자금 분석 기록이 없습니다." |
| |
| def get_fund_history(self, email: str, limit: int = 10) -> List[dict]: |
| """분석 히스토리 조회""" |
| if not email or '@' not in email: |
| return [] |
| |
| email_key = self._normalize_email(email) |
| if email_key not in self.fund_data: |
| return [] |
| |
| return self.fund_data[email_key].get("history", [])[:limit] |
| |
| def delete_fund_profile(self, email: str) -> Tuple[bool, str]: |
| """정책자금 프로필 삭제""" |
| email_key = self._normalize_email(email) |
| if email_key in self.fund_data: |
| del self.fund_data[email_key] |
| self._save_fund_data() |
| return True, f"✅ 정책자금 프로필이 삭제되었습니다. ({email_key})" |
| return False, "⚠️ 삭제할 프로필이 없습니다." |
| |
| def get_fund_profile_count(self) -> int: |
| """저장된 정책자금 프로필 수""" |
| return len(self.fund_data) |
| |
| def get_all_fund_profiles(self) -> dict: |
| """모든 정책자금 프로필 반환 (관리자용)""" |
| return self.fund_data |
| |
| def get_fund_stats(self) -> dict: |
| """정책자금 분석 통계""" |
| total_users = len(self.fund_data) |
| total_analyses = sum(len(d.get("history", [])) for d in self.fund_data.values()) |
| return { |
| "total_users": total_users, |
| "total_analyses": total_analyses, |
| "avg_per_user": round(total_analyses / total_users, 1) if total_users > 0 else 0 |
| } |
|
|
| _cache_instance = None |
| _content_cache_instance = None |
| _profile_cache_instance = None |
| _scheduler = None |
| def get_cache() -> AnnouncementCache: |
| """메타 캐시 싱글톤""" |
| global _cache_instance |
| if _cache_instance is None: |
| _cache_instance = AnnouncementCache() |
| return _cache_instance |
| def get_content_cache() -> ContentVectorCache: |
| """본문 캐시 싱글톤 (메타 캐시와 클라이언트 공유)""" |
| global _content_cache_instance |
| if _content_cache_instance is None: |
| meta_cache = get_cache() |
| _content_cache_instance = ContentVectorCache(shared_client=meta_cache.client) |
| return _content_cache_instance |
| def get_profile_cache() -> UserProfileCache: |
| """프로필 캐시 싱글톤""" |
| global _profile_cache_instance |
| if _profile_cache_instance is None: |
| _profile_cache_instance = UserProfileCache() |
| return _profile_cache_instance |
|
|
| _fund_cache_instance = None |
|
|
| def get_fund_cache() -> FundProfileCache: |
| """정책자금 프로필 캐시 싱글톤""" |
| global _fund_cache_instance |
| if _fund_cache_instance is None: |
| _fund_cache_instance = FundProfileCache() |
| return _fund_cache_instance |
|
|
| def save_sync_metadata(api_count: int, added: int, updated: int, total: int): |
| """동기화 메타데이터 저장""" |
| try: |
| existing = {} |
| if METADATA_FILE.exists(): |
| with open(METADATA_FILE, 'r', encoding='utf-8') as f: |
| existing = json.load(f) |
| history = existing.get("sync_history", []) |
| history.append({ |
| "timestamp": datetime.now(KST).isoformat(), |
| "api_count": api_count, |
| "added": added, |
| "updated": updated, |
| "total_cached": total |
| }) |
| history = history[-50:] |
| metadata = { |
| "last_sync": datetime.now(KST).isoformat(), |
| "total_count": total, |
| "sync_history": history |
| } |
| with open(METADATA_FILE, 'w', encoding='utf-8') as f: |
| json.dump(metadata, f, ensure_ascii=False, indent=2) |
| except Exception as e: |
| logger.error(f"Save metadata error: {e}") |
| def get_sync_status() -> dict: |
| """동기화 상태 조회""" |
| cache = get_cache() |
| content_cache = get_content_cache() |
| content_status = content_cache.get_status() |
| status = { |
| "total_count": cache.get_count(), |
| "last_sync": None, |
| "sync_history": [], |
| "db_path": str(DB_PATH), |
| "chromadb_available": CHROMADB_AVAILABLE, |
| "scheduler_available": SCHEDULER_AVAILABLE, |
| "content_indexed": content_status["total_indexed"], |
| "content_indexing_in_progress": content_status["in_progress"], |
| "content_progress": f"{content_status['progress_current']}/{content_status['progress_total']}", |
| "content_failed_count": content_status["failed_count"], |
| "embedding_available": EMBEDDING_AVAILABLE |
| } |
| if METADATA_FILE.exists(): |
| try: |
| with open(METADATA_FILE, 'r', encoding='utf-8') as f: |
| metadata = json.load(f) |
| status["last_sync"] = metadata.get("last_sync") |
| status["sync_history"] = metadata.get("sync_history", [])[-5:] |
| except: |
| pass |
| return status |
| def sync_from_api() -> Tuple[int, int, str]: |
| """API에서 데이터 동기화""" |
| try: |
| from file_api import fetch_all_from_api |
| logger.info(f"Starting sync at {datetime.now(KST).strftime('%Y-%m-%d %H:%M:%S %Z')}") |
| result = fetch_all_from_api() |
| |
| if isinstance(result, tuple): |
| items, error_msg = result |
| if error_msg and not items: |
| return 0, 0, f"API 오류: {error_msg}" |
| else: |
| items = result |
| if not items: |
| return 0, 0, "API에서 데이터를 가져오지 못했습니다." |
| cache = get_cache() |
| added, updated = cache.bulk_upsert(items) |
| total = cache.get_count() |
| save_sync_metadata(len(items), added, updated, total) |
| msg = f"✅ 동기화 완료: API {len(items)}건 → 신규 {added}건, 업데이트 {updated}건 (총 {total}건)" |
| logger.info(msg) |
| start_background_indexing() |
| return added, updated, msg |
| except Exception as e: |
| logger.error(f"Sync error: {e}") |
| return 0, 0, f"동기화 오류: {str(e)}" |
| def get_cached_announcements(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[dict], str]: |
| """캐시에서 공고 조회 - (items, status_msg) 튜플 반환""" |
| cache = get_cache() |
| if keyword: |
| items = cache.search(keyword, n_results=100) |
| return items, f"🔍 '{keyword}' 검색" |
| else: |
| items = cache.get_all(limit=1000) |
| return items, f"⚡ 캐시에서 {len(items)}건 로드" |
| _indexing_thread = None |
| _indexing_lock = threading.Lock() |
| def background_index_contents(): |
| """백그라운드 본문 인덱싱""" |
| content_cache = get_content_cache() |
| if content_cache._index_status["in_progress"]: |
| logger.info("Background indexing already running") |
| return |
| content_cache._index_status["in_progress"] = True |
| try: |
| from file_api import fetch_announcement_detail, download_file, extract_text_from_file |
| cache = get_cache() |
| all_items = cache.get_all(limit=2000) |
| indexed_ids = content_cache.get_indexed_ids() |
| def safe_str(val): |
| return str(val) if val else "" |
| def is_ongoing_check(req_dt: str) -> bool: |
| """마감일 확인""" |
| if not req_dt: |
| return True |
| try: |
| import re |
| date_patterns = [ |
| r'(\d{4})[.\-/](\d{1,2})[.\-/](\d{1,2})', |
| r'(\d{4})년\s*(\d{1,2})월\s*(\d{1,2})일' |
| ] |
| dates_found = [] |
| for pattern in date_patterns: |
| matches = re.findall(pattern, req_dt) |
| for m in matches: |
| try: |
| d = datetime(int(m[0]), int(m[1]), int(m[2])) |
| dates_found.append(d) |
| except: |
| continue |
| if dates_found: |
| end_date = max(dates_found) |
| today = datetime.now() |
| return end_date >= today |
| return True |
| except: |
| return True |
| items_to_index = [] |
| skipped_expired = 0 |
| skipped_indexed = 0 |
| for item in all_items: |
| pblanc_id = safe_str(item.get("pblancId") or item.get("seq")) |
| if not pblanc_id: |
| continue |
| if pblanc_id in indexed_ids: |
| skipped_indexed += 1 |
| continue |
| req_dt = safe_str(item.get("reqstDt") or item.get("reqstBeginEndDe")) |
| if not is_ongoing_check(req_dt): |
| skipped_expired += 1 |
| continue |
| items_to_index.append(item) |
| total = len(items_to_index) |
| content_cache._index_status["total"] = total |
| content_cache._index_status["current"] = 0 |
| logger.info(f"Starting background content indexing: {total} items (skipped {skipped_indexed} already indexed, {skipped_expired} expired)") |
| for i, item in enumerate(items_to_index): |
| if not content_cache._index_status["in_progress"]: |
| break |
| content_cache._index_status["current"] = i + 1 |
| pblanc_id = safe_str(item.get("pblancId") or item.get("seq")) |
| title = safe_str(item.get("title") or item.get("pblancNm")) |
| print_url = safe_str(item.get("printFileUrl") or item.get("printFlpthNm")) |
| print_name = safe_str(item.get("printFileName") or item.get("printFileNm")) |
| content_text = "" |
| if print_url and print_name: |
| try: |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| file_path, err = download_file(print_url, tmp_dir, print_name) |
| if file_path and not err: |
| text, ext_err = extract_text_from_file(file_path) |
| if text: |
| content_text = text |
| except Exception as e: |
| logger.warning(f"Text extraction failed for {pblanc_id}: {e}") |
| if not content_text: |
| link = safe_str(item.get("link") or item.get("pblancUrl")) |
| if link: |
| try: |
| detail_content, _, scraped_print = fetch_announcement_detail(link) |
| if detail_content: |
| content_text = detail_content |
| elif scraped_print and scraped_print.get("url"): |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| file_path, err = download_file( |
| scraped_print["url"], tmp_dir, |
| scraped_print.get("filename", "file") |
| ) |
| if file_path and not err: |
| text, _ = extract_text_from_file(file_path) |
| if text: |
| content_text = text |
| except Exception as e: |
| pass |
| if content_text and len(content_text) > 100: |
| success = content_cache.add_content(pblanc_id, title, content_text) |
| if success: |
| logger.info(f"Indexed [{i+1}/{total}]: {title[:50]}...") |
| else: |
| content_cache._index_status["failed"].append(pblanc_id) |
| else: |
| content_cache._index_status["failed"].append(pblanc_id) |
| if (i + 1) % 50 == 0: |
| content_cache._save_index_status() |
| content_cache._save_index_status() |
| logger.info(f"Background indexing complete: {content_cache.get_indexed_count()} indexed") |
| except Exception as e: |
| logger.error(f"Background indexing error: {e}") |
| finally: |
| content_cache._index_status["in_progress"] = False |
| def start_background_indexing() -> bool: |
| """백그라운드 인덱싱 시작""" |
| global _indexing_thread |
| with _indexing_lock: |
| if _indexing_thread and _indexing_thread.is_alive(): |
| logger.info("Background indexing already running") |
| return False |
| _indexing_thread = threading.Thread(target=background_index_contents, daemon=True) |
| _indexing_thread.start() |
| logger.info("Background content indexing thread started") |
| return True |
| def stop_scheduler(): |
| """스케줄러 중지""" |
| global _scheduler |
| if _scheduler: |
| _scheduler.shutdown() |
| _scheduler = None |
| logger.info("Scheduler stopped") |
| def start_scheduler(): |
| """스케줄러 시작""" |
| global _scheduler |
| if not SCHEDULER_AVAILABLE: |
| logger.warning("Scheduler not available") |
| return False |
| if _scheduler: |
| return True |
| try: |
| _scheduler = BackgroundScheduler(timezone=KST) |
| _scheduler.add_job( |
| sync_from_api, |
| CronTrigger(hour=10, minute=0, timezone=KST), |
| id='sync_10am', |
| name='Daily sync at 10:00 KST', |
| replace_existing=True |
| ) |
| _scheduler.add_job( |
| sync_from_api, |
| CronTrigger(hour=22, minute=0, timezone=KST), |
| id='sync_10pm', |
| name='Daily sync at 22:00 KST', |
| replace_existing=True |
| ) |
| _scheduler.start() |
| logger.info("Scheduler started: sync at 10:00 and 22:00 KST") |
| return True |
| except Exception as e: |
| logger.error(f"Scheduler start error: {e}") |
| return False |
| def manual_sync() -> str: |
| """수동 동기화 실행""" |
| added, updated, msg = sync_from_api() |
| return msg |
| def initialize_cache_system(): |
| """캐시 시스템 초기화 (앱 시작 시 호출)""" |
| logger.info("Initializing cache system...") |
| cache = get_cache() |
| count = cache.get_count() |
| if count == 0: |
| logger.info("Cache is empty, performing initial sync...") |
| sync_from_api() |
| else: |
| logger.info(f"Cache loaded with {count} announcements") |
| content_cache = get_content_cache() |
| content_count = content_cache.get_indexed_count() |
| logger.info(f"Content cache: {content_count} indexed") |
| profile_cache = get_profile_cache() |
| logger.info(f"Profile cache: {profile_cache.get_profile_count()} profiles") |
| start_scheduler() |
| start_background_indexing() |
| return get_sync_status() |
| if __name__ == "__main__": |
| print("Testing cache system...") |
| status = initialize_cache_system() |
| print(f"Status: {json.dumps(status, ensure_ascii=False, indent=2)}") |
|
|
| |