"""Service for processing documents and ingesting to vector store.""" from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.documents import Document as LangChainDocument from src.db.postgres.vector_store import get_vector_store from src.storage.az_blob.az_blob import blob_storage from src.db.postgres.models import Document as DBDocument from sqlalchemy.ext.asyncio import AsyncSession from src.middlewares.logging import get_logger from typing import List import sys import docx import pandas as pd import pytesseract from pdf2image import convert_from_bytes from io import BytesIO logger = get_logger("knowledge_processing") class KnowledgeProcessingService: """Service for processing documents and ingesting to vector store.""" def __init__(self): self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len ) async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int: """Process document and ingest to vector store. Returns: Number of chunks ingested """ try: logger.info(f"Processing document {db_doc.id}") content = await blob_storage.download_file(db_doc.blob_name) if db_doc.file_type == "pdf": documents = await self._build_pdf_documents(content, db_doc) elif db_doc.file_type == "csv": documents = self._build_csv_documents(content, db_doc) elif db_doc.file_type == "xlsx": documents = self._build_excel_documents(content, db_doc) else: text = self._extract_text(content, db_doc.file_type) if not text.strip(): raise ValueError("No text extracted from document") chunks = self.text_splitter.split_text(text) documents = [ LangChainDocument( page_content=chunk, metadata={ "user_id": db_doc.user_id, "source_type": "document", "data": { "document_id": db_doc.id, "filename": db_doc.filename, "file_type": db_doc.file_type, "chunk_index": i, }, } ) for i, chunk in enumerate(chunks) ] if not documents: raise ValueError("No text extracted from document") vector_store = get_vector_store() await vector_store.aadd_documents(documents) logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested") return len(documents) except Exception as e: logger.error(f"Failed to process document {db_doc.id}", error=str(e)) raise async def _build_pdf_documents( self, content: bytes, db_doc: DBDocument ) -> List[LangChainDocument]: """Build LangChain documents from PDF with page_label metadata using Tesseract OCR.""" documents: List[LangChainDocument] = [] poppler_path = None if sys.platform == "win32": pytesseract.pytesseract.tesseract_cmd = r"./software/Tesseract-OCR/tesseract.exe" poppler_path = "./software/poppler-24.08.0/Library/bin" images = convert_from_bytes(content, poppler_path=poppler_path) logger.info(f"Tesseract OCR: converting {len(images)} pages") for page_num, image in enumerate(images, start=1): page_text = pytesseract.image_to_string(image) if not page_text.strip(): continue for chunk in self.text_splitter.split_text(page_text): documents.append(LangChainDocument( page_content=chunk, metadata={ "user_id": db_doc.user_id, "source_type": "document", "data": { "document_id": db_doc.id, "filename": db_doc.filename, "file_type": db_doc.file_type, "chunk_index": len(documents), "page_label": page_num, }, } )) return documents def _profile_dataframe( self, df: pd.DataFrame, source_name: str, db_doc: DBDocument ) -> List[LangChainDocument]: """Profile each column of a dataframe → one chunk per column.""" documents = [] row_count = len(df) for col_name in df.columns: col = df[col_name] is_numeric = pd.api.types.is_numeric_dtype(col) null_count = int(col.isnull().sum()) distinct_count = int(col.nunique()) distinct_ratio = distinct_count / row_count if row_count > 0 else 0 text = f"Source: {source_name} ({row_count} rows)\n" text += f"Column: {col_name} ({col.dtype})\n" text += f"Null count: {null_count}\n" text += f"Distinct count: {distinct_count} ({distinct_ratio:.1%})\n" if is_numeric: text += f"Min: {col.min()}, Max: {col.max()}\n" text += f"Mean: {col.mean():.4f}, Median: {col.median():.4f}\n" if 0 < distinct_ratio <= 0.05: top_values = col.value_counts().head(10) top_str = ", ".join(f"{v} ({c})" for v, c in top_values.items()) text += f"Top values: {top_str}\n" text += f"Sample values: {col.dropna().head(5).tolist()}" documents.append(LangChainDocument( page_content=text, metadata={ "user_id": db_doc.user_id, "source_type": "document", "data": { "document_id": db_doc.id, "filename": db_doc.filename, "file_type": db_doc.file_type, "source": source_name, "column_name": col_name, "column_type": str(col.dtype), } } )) return documents def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]: """Profile each column of a CSV file.""" df = pd.read_csv(BytesIO(content)) return self._profile_dataframe(df, db_doc.filename, db_doc) def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]: """Profile each column of every sheet in an Excel file.""" sheets = pd.read_excel(BytesIO(content), sheet_name=None) documents = [] for sheet_name, df in sheets.items(): source_name = f"{db_doc.filename} / sheet: {sheet_name}" documents.extend(self._profile_dataframe(df, source_name, db_doc)) return documents def _extract_text(self, content: bytes, file_type: str) -> str: """Extract text from DOCX or TXT content.""" if file_type == "docx": doc = docx.Document(BytesIO(content)) return "\n".join(p.text for p in doc.paragraphs) elif file_type == "txt": return content.decode("utf-8") else: raise ValueError(f"Unsupported file type: {file_type}") knowledge_processor = KnowledgeProcessingService()