app / src /rag_engine.py
CareerAI-app's picture
Deploy CareerAI to HuggingFace Spaces
b7934cd
"""
RAG Engine v2.0 - Advanced retrieval with:
β€’ Multilingual embeddings (BGE-M3 / gte-multilingual / multilingual-e5)
β€’ Hybrid search (Vector + BM25 keyword via Reciprocal Rank Fusion)
β€’ Reranking with BGE-Reranker-v2
β€’ Metadata filtering by document type (CV vs Job Offer vs LinkedIn)
All 100% free & local.
"""
import os
import hashlib
import logging
from typing import List, Tuple, Optional, Dict
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
logger = logging.getLogger(__name__)
# ======================== EMBEDDING MODEL CATALOG ========================
EMBEDDING_MODELS = {
"bge-m3": {
"name": "BAAI/bge-m3",
"display": "🌍 BGE-M3 (Multilingual · Recomendado)",
"description": "Mejor modelo multilingual 2025. Dense+sparse, 100+ idiomas, ideal para RAG.",
"size": "~2.3 GB",
"languages": "100+",
"performance": "⭐⭐⭐⭐⭐",
},
"gte-multilingual": {
"name": "Alibaba-NLP/gte-multilingual-base",
"display": "πŸš€ GTE Multilingual (Ligero Β· 70+ idiomas)",
"description": "Excelente balance tamaΓ±o/calidad. 70+ idiomas, encoder-only.",
"size": "~580 MB",
"languages": "70+",
"performance": "⭐⭐⭐⭐",
},
"multilingual-e5": {
"name": "intfloat/multilingual-e5-base",
"display": "πŸ“ Multilingual E5 Base (EstΓ‘ndar)",
"description": "Modelo estΓ‘ndar multilingual para retrieval y similitud semΓ‘ntica.",
"size": "~1.1 GB",
"languages": "100+",
"performance": "⭐⭐⭐⭐",
},
"minilm-v2": {
"name": "sentence-transformers/all-MiniLM-L6-v2",
"display": "⚑ MiniLM v2 (Ultra-ligero · Solo inglés)",
"description": "Modelo original, muy rΓ‘pido pero solo inglΓ©s. Ideal para pruebas.",
"size": "~90 MB",
"languages": "InglΓ©s",
"performance": "⭐⭐⭐",
},
}
DEFAULT_EMBEDDING = "bge-m3"
# ======================== BM25 KEYWORD INDEX ========================
class BM25Index:
"""Lightweight BM25 keyword index for hybrid search."""
def __init__(self):
self._documents: List[str] = []
self._metadatas: List[dict] = []
self._index = None
@property
def is_ready(self) -> bool:
return self._index is not None and len(self._documents) > 0
def add(self, texts: List[str], metadatas: List[dict]):
"""Add documents to the BM25 index."""
self._documents.extend(texts)
self._metadatas.extend(metadatas)
self._rebuild()
def _rebuild(self):
"""Rebuild the BM25 index from scratch."""
try:
from rank_bm25 import BM25Okapi
tokenized = [doc.lower().split() for doc in self._documents]
if tokenized:
self._index = BM25Okapi(tokenized)
except ImportError:
logger.warning("rank_bm25 not installed – keyword search disabled. pip install rank_bm25")
self._index = None
def search(
self, query: str, k: int = 10, filter_dict: Optional[dict] = None,
) -> List[Tuple[str, dict, float]]:
"""Search using BM25 keyword matching."""
if not self.is_ready:
return []
tokenized_query = query.lower().split()
scores = self._index.get_scores(tokenized_query)
# Pair with metadata and filter
results = []
for idx, score in enumerate(scores):
if score <= 0:
continue
meta = self._metadatas[idx] if idx < len(self._metadatas) else {}
# Apply metadata filter
if filter_dict:
if not all(meta.get(k_f) == v_f for k_f, v_f in filter_dict.items()):
continue
results.append((self._documents[idx], meta, float(score)))
# Sort by score descending and return top-k
results.sort(key=lambda x: x[2], reverse=True)
return results[:k]
def clear(self):
"""Clear the BM25 index."""
self._documents.clear()
self._metadatas.clear()
self._index = None
def rebuild_from_chroma(self, chroma_collection):
"""Rebuild BM25 index from existing ChromaDB collection."""
try:
data = chroma_collection.get()
if data and data.get("documents"):
self._documents = list(data["documents"])
self._metadatas = list(data.get("metadatas", [{}] * len(self._documents)))
self._rebuild()
logger.info(f"BM25 index rebuilt with {len(self._documents)} documents")
except Exception as e:
logger.warning(f"Failed to rebuild BM25 index: {e}")
# ======================== RERANKER ========================
class Reranker:
"""Cross-encoder reranker using BGE-Reranker-v2-m3 (free, local, multilingual)."""
def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
self.model_name = model_name
self._model = None
@property
def is_ready(self) -> bool:
return self._model is not None
def load(self):
"""Lazy-load the reranker model."""
if self._model is not None:
return
try:
from sentence_transformers import CrossEncoder
self._model = CrossEncoder(self.model_name, max_length=512)
logger.info(f"Reranker loaded: {self.model_name}")
except ImportError:
logger.warning("sentence-transformers not installed for reranking")
except Exception as e:
logger.warning(f"Failed to load reranker: {e}")
def rerank(
self,
query: str,
results: List[Tuple[str, dict, float]],
top_k: int = 5,
) -> List[Tuple[str, dict, float]]:
"""Rerank results using cross-encoder scoring."""
if not self.is_ready or not results:
return results[:top_k]
try:
pairs = [(query, content) for content, _, _ in results]
scores = self._model.predict(pairs)
reranked = []
for i, (content, meta, _) in enumerate(results):
reranked.append((content, meta, float(scores[i])))
reranked.sort(key=lambda x: x[2], reverse=True)
return reranked[:top_k]
except Exception as e:
logger.warning(f"Reranking failed, returning original order: {e}")
return results[:top_k]
# ======================== RECIPROCAL RANK FUSION ========================
def reciprocal_rank_fusion(
results_list: List[List[Tuple[str, dict, float]]],
k: int = 60,
top_n: int = 15,
) -> List[Tuple[str, dict, float]]:
"""
Merge multiple ranked result lists using Reciprocal Rank Fusion (RRF).
Each result is identified by content hash. Final score = sum(1 / (k + rank)).
"""
fused_scores: Dict[str, float] = {}
content_map: Dict[str, Tuple[str, dict]] = {}
for results in results_list:
for rank, (content, meta, _) in enumerate(results):
key = hashlib.md5(content[:200].encode()).hexdigest()
fused_scores[key] = fused_scores.get(key, 0.0) + 1.0 / (k + rank + 1)
if key not in content_map:
content_map[key] = (content, meta)
sorted_keys = sorted(fused_scores.keys(), key=lambda x: fused_scores[x], reverse=True)
merged = []
for key in sorted_keys[:top_n]:
content, meta = content_map[key]
merged.append((content, meta, fused_scores[key]))
return merged
# ======================== RAG ENGINE v2 ========================
class RAGEngine:
"""
Advanced RAG Engine v2.0 with:
- Selectable multilingual embeddings
- Hybrid search (vector + BM25 keyword)
- Cross-encoder reranking
- Metadata filtering
"""
def __init__(
self,
persist_directory: str = None,
embedding_key: str = DEFAULT_EMBEDDING,
enable_reranking: bool = True,
enable_hybrid: bool = True,
):
if persist_directory is None:
persist_directory = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "data", "vectordb"
)
self.persist_directory = persist_directory
os.makedirs(persist_directory, exist_ok=True)
# ---- Embeddings ----
self.embedding_key = embedding_key
model_info = EMBEDDING_MODELS.get(embedding_key, EMBEDDING_MODELS[DEFAULT_EMBEDDING])
model_name = model_info["name"]
self.embeddings = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs={"device": "cpu", "trust_remote_code": True},
encode_kwargs={"normalize_embeddings": True},
)
# ---- ChromaDB Vector Store ----
# Use collection name based on embedding to avoid dimension conflicts
collection_name = f"career_docs_{embedding_key.replace('-', '_')}"
self.vectorstore = Chroma(
collection_name=collection_name,
embedding_function=self.embeddings,
persist_directory=persist_directory,
)
# ---- BM25 Keyword Index (hybrid search) ----
self.enable_hybrid = enable_hybrid
self.bm25 = BM25Index()
if enable_hybrid:
try:
self.bm25.rebuild_from_chroma(self.vectorstore._collection)
except Exception:
pass
# ---- Reranker (lazy-loaded on first use) ----
self.enable_reranking = enable_reranking
self.reranker = Reranker() if enable_reranking else None
# ======================== DOCUMENT OPS ========================
def add_document(self, chunks: List[str], metadata: dict, user_id: str = "anonymous") -> int:
"""Add document chunks to vector store + BM25 index."""
if not chunks:
return 0
docs = []
chunk_metas = []
for i, chunk in enumerate(chunks):
doc_id = hashlib.md5(
f"{metadata.get('filename', 'unknown')}_{i}_{chunk[:50]}".encode()
).hexdigest()
doc_metadata = {
**metadata,
"user_id": user_id,
"chunk_index": i,
"total_chunks": len(chunks),
"doc_id": doc_id,
}
docs.append(Document(page_content=chunk, metadata=doc_metadata))
chunk_metas.append(doc_metadata)
# Add to vector store
self.vectorstore.add_documents(docs)
# Add to BM25 index
if self.enable_hybrid:
self.bm25.add(chunks, chunk_metas)
logger.info(f"Added {len(docs)} chunks for '{metadata.get('filename', '?')}'")
return len(docs)
def delete_document(self, filename: str, user_id: str = "anonymous"):
"""Delete all chunks for a specific document considering user_id."""
try:
collection = self.vectorstore._collection
if user_id == "anonymous":
# Try getting explicitly labeled anonymous docs
results = collection.get(where={"$and": [{"filename": filename}, {"user_id": user_id}]})
# If none found, fallback to legacy docs that have no user_id
if not results or not results.get("ids"):
all_file_docs = collection.get(where={"filename": filename})
if all_file_docs and all_file_docs.get("ids"):
legacy_ids = [ids for i, ids in enumerate(all_file_docs["ids"]) if "user_id" not in all_file_docs["metadatas"][i]]
results = {"ids": legacy_ids}
else:
results = collection.get(where={"$and": [{"filename": filename}, {"user_id": user_id}]})
if results and results.get("ids"):
collection.delete(ids=results["ids"])
# Rebuild BM25 index after deletion
if self.enable_hybrid:
self.bm25.rebuild_from_chroma(collection)
return True
return False
except Exception as e:
logger.error(f"Error deleting document: {e}")
return False
# ======================== SEARCH ========================
def search(
self,
query: str,
k: int = 5,
filter_dict: Optional[dict] = None,
user_id: str = "anonymous"
) -> List[Tuple[str, dict, float]]:
"""
Advanced search pipeline:
1. Vector similarity search (semantic)
2. BM25 keyword search (lexical) β€” if hybrid enabled
3. Reciprocal Rank Fusion to merge results
4. Reranking with cross-encoder β€” if enabled
"""
# Build ChromaDB-compatible filter with user_id
filter_dict = filter_dict or {}
if "user_id" not in filter_dict:
filter_dict["user_id"] = user_id
# ChromaDB requires $and for multiple filter keys
if len(filter_dict) > 1:
chroma_filter = {"$and": [{k: v} for k, v in filter_dict.items()]}
else:
chroma_filter = filter_dict
# Step 1: Vector search
vector_results = self._vector_search(query, k=k * 2, filter_dict=chroma_filter)
# Step 2: BM25 keyword search (if enabled)
if self.enable_hybrid and self.bm25.is_ready:
bm25_results = self.bm25.search(query, k=k * 2, filter_dict=chroma_filter)
# Step 3: Fuse results with RRF
merged = reciprocal_rank_fusion(
[vector_results, bm25_results],
top_n=k * 2,
)
else:
merged = vector_results
# Step 4: Rerank (if enabled and model loaded)
if self.enable_reranking and self.reranker is not None:
if not self.reranker.is_ready:
self.reranker.load()
if self.reranker.is_ready:
merged = self.reranker.rerank(query, merged, top_k=k)
else:
merged = merged[:k]
else:
merged = merged[:k]
return merged
def _vector_search(
self, query: str, k: int = 10, filter_dict: Optional[dict] = None,
) -> List[Tuple[str, dict, float]]:
"""Pure vector similarity search."""
try:
results = self.vectorstore.similarity_search_with_score(
query, k=k, filter=filter_dict
)
return [
(doc.page_content, doc.metadata, score) for doc, score in results
]
except Exception as e:
logger.warning(f"Vector search failed: {e}")
return []
def search_by_type(
self,
query: str,
doc_type: str,
k: int = 5,
) -> List[Tuple[str, dict, float]]:
"""Search filtered by document type (cv, job_offer, linkedin, other)."""
return self.search(query, k=k, filter_dict={"doc_type": doc_type})
# ======================== CONTEXT BUILDING ========================
def get_context(
self,
query: str,
k: int = 8,
filter_type: Optional[str] = None,
user_id: str = "anonymous"
) -> str:
"""Get formatted context string for LLM consumption."""
filter_dict = {"doc_type": filter_type} if filter_type else {}
# user_id will be injected by search() if not already present
results = self.search(query, k=k, filter_dict=filter_dict, user_id=user_id)
if not results:
return "⚠️ No se encontraron documentos relevantes. Por favor, sube tu CV u otros documentos primero."
context_parts = []
seen_content = set()
for content, metadata, score in results:
# Deduplicate similar chunks
content_hash = hashlib.md5(content[:100].encode()).hexdigest()
if content_hash in seen_content:
continue
seen_content.add(content_hash)
source = metadata.get("filename", "Desconocido")
doc_type = metadata.get("doc_type", "documento")
type_labels = {
"cv": "πŸ“‹ CV/Resume",
"job_offer": "πŸ’Ό Oferta de Trabajo",
"linkedin": "πŸ‘€ LinkedIn",
"other": "πŸ“„ Documento",
}
type_label = type_labels.get(doc_type, "πŸ“„ Documento")
# Score display depends on search mode
score_str = f"{score:.3f}"
context_parts.append(
f"[{type_label} | Fuente: {source} | Score: {score_str}]\n{content}"
)
return "\n\n" + "─" * 50 + "\n\n".join(context_parts)
# ======================== STATS & UTILS ========================
def get_document_list(self, user_id: str = "anonymous") -> List[str]:
"""Get list of all indexed document filenames for a user."""
try:
collection = self.vectorstore._collection
if user_id == "anonymous":
# For anonymous users, we get everything but could restrict it later.
# For now, if "anonymous", just get the ones explicitly marked "anonymous"
results = collection.get(where={"user_id": user_id})
# If nothing found, it might be legacy (no user_id set), so get those too
if not results.get("ids"):
all_docs = collection.get()
results = {"metadatas": [m for m in all_docs.get("metadatas", []) if "user_id" not in m]}
else:
results = collection.get(where={"user_id": user_id})
filenames = set()
for meta in results.get("metadatas", []):
if meta and "filename" in meta:
filenames.add(meta["filename"])
return sorted(list(filenames))
except Exception:
return []
def get_stats(self, user_id: str = "anonymous") -> dict:
"""Get vector store statistics for a user."""
try:
collection = self.vectorstore._collection
if user_id == "anonymous":
results = collection.get(where={"user_id": user_id})
if not results.get("ids"):
all_docs = collection.get()
results = {"ids": [ids for i, ids in enumerate(all_docs.get("ids", [])) if "user_id" not in all_docs.get("metadatas", [])[i]]}
else:
results = collection.get(where={"user_id": user_id})
count = len(results["ids"]) if results and results.get("ids") else 0
docs = self.get_document_list(user_id=user_id)
return {
"total_chunks": count,
"total_documents": len(docs),
"documents": docs,
"embedding_model": self.embedding_key,
"hybrid_search": self.enable_hybrid and self.bm25.is_ready,
"reranking": self.enable_reranking,
}
except Exception:
return {
"total_chunks": 0,
"total_documents": 0,
"documents": [],
"embedding_model": self.embedding_key,
"hybrid_search": False,
"reranking": False,
}
def get_all_text(self, user_id: str = "anonymous") -> str:
"""Get all document text for a specific user (for full-context queries)."""
try:
collection = self.vectorstore._collection
results = collection.get(where={"user_id": user_id})
if results and results["documents"]:
return "\n\n".join(results["documents"])
except Exception:
pass
return ""
def get_documents_by_type(self) -> Dict[str, List[str]]:
"""Get documents grouped by type."""
try:
collection = self.vectorstore._collection
results = collection.get()
by_type: Dict[str, List[str]] = {}
for meta in results.get("metadatas", []):
if meta:
doc_type = meta.get("doc_type", "other")
filename = meta.get("filename", "?")
if doc_type not in by_type:
by_type[doc_type] = []
if filename not in by_type[doc_type]:
by_type[doc_type].append(filename)
return by_type
except Exception:
return {}