| """ |
| Document Intelligence Bridge for RAG |
| |
| Bridges the document_intelligence subsystem with the RAG indexer/retriever. |
| Converts ParseResult to a format compatible with DocumentIndexer. |
| """ |
|
|
| from typing import List, Optional, Dict, Any |
| from pathlib import Path |
| from pydantic import BaseModel |
| from loguru import logger |
|
|
| from .store import VectorStore, get_vector_store |
| from .embeddings import EmbeddingAdapter, get_embedding_adapter |
| from .indexer import IndexingResult, IndexerConfig |
|
|
| |
| try: |
| from ..document_intelligence.chunks import ( |
| ParseResult, |
| DocumentChunk, |
| BoundingBox, |
| EvidenceRef, |
| ChunkType, |
| ) |
| DOCINT_AVAILABLE = True |
| except ImportError: |
| DOCINT_AVAILABLE = False |
| logger.warning("document_intelligence module not available") |
|
|
|
|
| class DocIntIndexer: |
| """ |
| Indexes ParseResult from document_intelligence into the vector store. |
| |
| This bridges the new document_intelligence subsystem with the existing |
| RAG infrastructure. |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[IndexerConfig] = None, |
| vector_store: Optional[VectorStore] = None, |
| embedding_adapter: Optional[EmbeddingAdapter] = None, |
| ): |
| self.config = config or IndexerConfig() |
| self._store = vector_store |
| self._embedder = embedding_adapter |
|
|
| @property |
| def store(self) -> VectorStore: |
| if self._store is None: |
| self._store = get_vector_store() |
| return self._store |
|
|
| @property |
| def embedder(self) -> EmbeddingAdapter: |
| if self._embedder is None: |
| self._embedder = get_embedding_adapter() |
| return self._embedder |
|
|
| def index_parse_result( |
| self, |
| parse_result: "ParseResult", |
| source_path: Optional[str] = None, |
| ) -> IndexingResult: |
| """ |
| Index a ParseResult from document_intelligence. |
| |
| Args: |
| parse_result: ParseResult from DocumentParser |
| source_path: Optional override for source path |
| |
| Returns: |
| IndexingResult with indexing stats |
| """ |
| if not DOCINT_AVAILABLE: |
| return IndexingResult( |
| document_id="unknown", |
| source_path="unknown", |
| num_chunks_indexed=0, |
| num_chunks_skipped=0, |
| success=False, |
| error="document_intelligence module not available", |
| ) |
|
|
| document_id = parse_result.doc_id |
| source = source_path or parse_result.filename |
|
|
| try: |
| chunks_to_index = [] |
| skipped = 0 |
|
|
| for chunk in parse_result.chunks: |
| |
| if self.config.skip_empty_chunks: |
| if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length: |
| skipped += 1 |
| continue |
|
|
| chunk_data = { |
| "chunk_id": chunk.chunk_id, |
| "document_id": document_id, |
| "source_path": source, |
| "text": chunk.text, |
| "sequence_index": chunk.sequence_index, |
| "confidence": chunk.confidence, |
| } |
|
|
| if self.config.include_page: |
| chunk_data["page"] = chunk.page |
|
|
| if self.config.include_chunk_type: |
| chunk_data["chunk_type"] = chunk.chunk_type.value |
|
|
| if self.config.include_bbox and chunk.bbox: |
| chunk_data["bbox"] = { |
| "x_min": chunk.bbox.x_min, |
| "y_min": chunk.bbox.y_min, |
| "x_max": chunk.bbox.x_max, |
| "y_max": chunk.bbox.y_max, |
| } |
|
|
| chunks_to_index.append(chunk_data) |
|
|
| if not chunks_to_index: |
| return IndexingResult( |
| document_id=document_id, |
| source_path=source, |
| num_chunks_indexed=0, |
| num_chunks_skipped=skipped, |
| success=True, |
| ) |
|
|
| |
| logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks") |
| texts = [c["text"] for c in chunks_to_index] |
|
|
| embeddings = [] |
| batch_size = self.config.batch_size |
| for i in range(0, len(texts), batch_size): |
| batch = texts[i:i + batch_size] |
| batch_embeddings = self.embedder.embed_batch(batch) |
| embeddings.extend(batch_embeddings) |
|
|
| |
| logger.info(f"Storing {len(chunks_to_index)} chunks in vector store") |
| self.store.add_chunks(chunks_to_index, embeddings) |
|
|
| logger.info( |
| f"Indexed document {document_id}: " |
| f"{len(chunks_to_index)} chunks, {skipped} skipped" |
| ) |
|
|
| return IndexingResult( |
| document_id=document_id, |
| source_path=source, |
| num_chunks_indexed=len(chunks_to_index), |
| num_chunks_skipped=skipped, |
| success=True, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Failed to index parse result: {e}") |
| return IndexingResult( |
| document_id=document_id, |
| source_path=source, |
| num_chunks_indexed=0, |
| num_chunks_skipped=0, |
| success=False, |
| error=str(e), |
| ) |
|
|
| def index_document( |
| self, |
| path: str, |
| max_pages: Optional[int] = None, |
| ) -> IndexingResult: |
| """ |
| Parse and index a document in one step. |
| |
| Args: |
| path: Path to document file |
| max_pages: Optional limit on pages to process |
| |
| Returns: |
| IndexingResult |
| """ |
| if not DOCINT_AVAILABLE: |
| return IndexingResult( |
| document_id=str(path), |
| source_path=str(path), |
| num_chunks_indexed=0, |
| num_chunks_skipped=0, |
| success=False, |
| error="document_intelligence module not available", |
| ) |
|
|
| try: |
| from ..document_intelligence import DocumentParser, ParserConfig |
|
|
| config = ParserConfig(max_pages=max_pages) |
| parser = DocumentParser(config=config) |
|
|
| logger.info(f"Parsing document: {path}") |
| parse_result = parser.parse(path) |
|
|
| return self.index_parse_result(parse_result, source_path=str(path)) |
|
|
| except Exception as e: |
| logger.error(f"Failed to parse and index document: {e}") |
| return IndexingResult( |
| document_id=str(path), |
| source_path=str(path), |
| num_chunks_indexed=0, |
| num_chunks_skipped=0, |
| success=False, |
| error=str(e), |
| ) |
|
|
| def delete_document(self, document_id: str) -> int: |
| """Remove a document from the index.""" |
| return self.store.delete_document(document_id) |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| """Get indexing statistics.""" |
| total_chunks = self.store.count() |
|
|
| return { |
| "total_chunks": total_chunks, |
| "embedding_model": self.embedder.model_name, |
| "embedding_dimension": self.embedder.embedding_dimension, |
| } |
|
|
|
|
| class DocIntRetriever: |
| """ |
| Retriever with document_intelligence EvidenceRef support. |
| |
| Wraps DocumentRetriever with conversions to document_intelligence types. |
| """ |
|
|
| def __init__( |
| self, |
| vector_store: Optional[VectorStore] = None, |
| embedding_adapter: Optional[EmbeddingAdapter] = None, |
| similarity_threshold: float = 0.5, |
| ): |
| self._store = vector_store |
| self._embedder = embedding_adapter |
| self.similarity_threshold = similarity_threshold |
|
|
| @property |
| def store(self) -> VectorStore: |
| if self._store is None: |
| self._store = get_vector_store() |
| return self._store |
|
|
| @property |
| def embedder(self) -> EmbeddingAdapter: |
| if self._embedder is None: |
| self._embedder = get_embedding_adapter() |
| return self._embedder |
|
|
| def retrieve( |
| self, |
| query: str, |
| top_k: int = 5, |
| document_id: Optional[str] = None, |
| chunk_types: Optional[List[str]] = None, |
| page_range: Optional[tuple] = None, |
| ) -> List[Dict[str, Any]]: |
| """ |
| Retrieve relevant chunks. |
| |
| Args: |
| query: Search query |
| top_k: Number of results |
| document_id: Filter by document |
| chunk_types: Filter by chunk type(s) |
| page_range: Filter by page range (start, end) |
| |
| Returns: |
| List of chunk dicts with metadata |
| """ |
| |
| filters = {} |
|
|
| if document_id: |
| filters["document_id"] = document_id |
|
|
| if chunk_types: |
| filters["chunk_type"] = chunk_types |
|
|
| if page_range: |
| filters["page"] = {"min": page_range[0], "max": page_range[1]} |
|
|
| |
| query_embedding = self.embedder.embed_text(query) |
|
|
| |
| results = self.store.search( |
| query_embedding=query_embedding, |
| top_k=top_k, |
| filters=filters if filters else None, |
| ) |
|
|
| |
| chunks = [] |
| for result in results: |
| if result.similarity < self.similarity_threshold: |
| continue |
|
|
| chunk = { |
| "chunk_id": result.chunk_id, |
| "document_id": result.document_id, |
| "text": result.text, |
| "similarity": result.similarity, |
| "page": result.page, |
| "chunk_type": result.chunk_type, |
| "bbox": result.bbox, |
| "source_path": result.metadata.get("source_path"), |
| "confidence": result.metadata.get("confidence"), |
| } |
| chunks.append(chunk) |
|
|
| return chunks |
|
|
| def retrieve_with_evidence( |
| self, |
| query: str, |
| top_k: int = 5, |
| document_id: Optional[str] = None, |
| chunk_types: Optional[List[str]] = None, |
| page_range: Optional[tuple] = None, |
| ) -> tuple: |
| """ |
| Retrieve chunks with EvidenceRef objects. |
| |
| Returns: |
| Tuple of (chunks, evidence_refs) |
| """ |
| chunks = self.retrieve( |
| query, top_k, document_id, chunk_types, page_range |
| ) |
|
|
| evidence_refs = [] |
|
|
| if DOCINT_AVAILABLE: |
| for chunk in chunks: |
| bbox = None |
| if chunk.get("bbox"): |
| bbox_data = chunk["bbox"] |
| bbox = BoundingBox( |
| x_min=bbox_data.get("x_min", 0), |
| y_min=bbox_data.get("y_min", 0), |
| x_max=bbox_data.get("x_max", 1), |
| y_max=bbox_data.get("y_max", 1), |
| normalized=True, |
| ) |
| else: |
| bbox = BoundingBox(x_min=0, y_min=0, x_max=1, y_max=1) |
|
|
| evidence = EvidenceRef( |
| chunk_id=chunk["chunk_id"], |
| doc_id=chunk["document_id"], |
| page=chunk.get("page", 1), |
| bbox=bbox, |
| source_type=chunk.get("chunk_type", "text"), |
| snippet=chunk["text"][:200], |
| confidence=chunk.get("confidence", chunk["similarity"]), |
| ) |
| evidence_refs.append(evidence) |
|
|
| return chunks, evidence_refs |
|
|
| def build_context( |
| self, |
| chunks: List[Dict[str, Any]], |
| max_length: int = 8000, |
| ) -> str: |
| """Build context string from retrieved chunks.""" |
| if not chunks: |
| return "" |
|
|
| parts = [] |
| for i, chunk in enumerate(chunks, 1): |
| header = f"[{i}]" |
| if chunk.get("page"): |
| header += f" Page {chunk['page']}" |
| if chunk.get("chunk_type"): |
| header += f" ({chunk['chunk_type']})" |
| header += f" [sim={chunk['similarity']:.2f}]" |
|
|
| parts.append(header) |
| parts.append(chunk["text"]) |
| parts.append("") |
|
|
| context = "\n".join(parts) |
|
|
| if len(context) > max_length: |
| context = context[:max_length] + "\n...[truncated]" |
|
|
| return context |
|
|
|
|
| |
| _docint_indexer: Optional[DocIntIndexer] = None |
| _docint_retriever: Optional[DocIntRetriever] = None |
|
|
|
|
| def get_docint_indexer( |
| config: Optional[IndexerConfig] = None, |
| vector_store: Optional[VectorStore] = None, |
| embedding_adapter: Optional[EmbeddingAdapter] = None, |
| ) -> DocIntIndexer: |
| """Get or create singleton DocIntIndexer.""" |
| global _docint_indexer |
|
|
| if _docint_indexer is None: |
| _docint_indexer = DocIntIndexer( |
| config=config, |
| vector_store=vector_store, |
| embedding_adapter=embedding_adapter, |
| ) |
|
|
| return _docint_indexer |
|
|
|
|
| def get_docint_retriever( |
| vector_store: Optional[VectorStore] = None, |
| embedding_adapter: Optional[EmbeddingAdapter] = None, |
| similarity_threshold: float = 0.5, |
| ) -> DocIntRetriever: |
| """Get or create singleton DocIntRetriever.""" |
| global _docint_retriever |
|
|
| if _docint_retriever is None: |
| _docint_retriever = DocIntRetriever( |
| vector_store=vector_store, |
| embedding_adapter=embedding_adapter, |
| similarity_threshold=similarity_threshold, |
| ) |
|
|
| return _docint_retriever |
|
|
|
|
| def reset_docint_components(): |
| """Reset singleton instances.""" |
| global _docint_indexer, _docint_retriever |
| _docint_indexer = None |
| _docint_retriever = None |
|
|