| """Service for processing documents and ingesting to vector store.""" |
|
|
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_core.documents import Document as LangChainDocument |
| from src.db.postgres.vector_store import get_vector_store |
| from src.storage.az_blob.az_blob import blob_storage |
| from src.db.postgres.models import Document as DBDocument |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from src.middlewares.logging import get_logger |
| from typing import List |
| import sys |
| import docx |
| import pandas as pd |
| import pytesseract |
| from pdf2image import convert_from_bytes |
| from io import BytesIO |
|
|
| logger = get_logger("knowledge_processing") |
|
|
|
|
| class KnowledgeProcessingService: |
| """Service for processing documents and ingesting to vector store.""" |
|
|
| def __init__(self): |
| self.text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200, |
| length_function=len |
| ) |
|
|
| async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int: |
| """Process document and ingest to vector store. |
| |
| Returns: |
| Number of chunks ingested |
| """ |
| try: |
| logger.info(f"Processing document {db_doc.id}") |
| content = await blob_storage.download_file(db_doc.blob_name) |
|
|
| if db_doc.file_type == "pdf": |
| documents = await self._build_pdf_documents(content, db_doc) |
| elif db_doc.file_type == "csv": |
| documents = self._build_csv_documents(content, db_doc) |
| elif db_doc.file_type == "xlsx": |
| documents = self._build_excel_documents(content, db_doc) |
| else: |
| text = self._extract_text(content, db_doc.file_type) |
| if not text.strip(): |
| raise ValueError("No text extracted from document") |
| chunks = self.text_splitter.split_text(text) |
| documents = [ |
| LangChainDocument( |
| page_content=chunk, |
| metadata={ |
| "user_id": db_doc.user_id, |
| "source_type": "document", |
| "data": { |
| "document_id": db_doc.id, |
| "filename": db_doc.filename, |
| "file_type": db_doc.file_type, |
| "chunk_index": i, |
| }, |
| } |
| ) |
| for i, chunk in enumerate(chunks) |
| ] |
|
|
| if not documents: |
| raise ValueError("No text extracted from document") |
|
|
| vector_store = get_vector_store() |
| await vector_store.aadd_documents(documents) |
|
|
| logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested") |
| return len(documents) |
|
|
| except Exception as e: |
| logger.error(f"Failed to process document {db_doc.id}", error=str(e)) |
| raise |
|
|
| async def _build_pdf_documents( |
| self, content: bytes, db_doc: DBDocument |
| ) -> List[LangChainDocument]: |
| """Build LangChain documents from PDF with page_label metadata using Tesseract OCR.""" |
| documents: List[LangChainDocument] = [] |
|
|
| poppler_path = None |
| if sys.platform == "win32": |
| pytesseract.pytesseract.tesseract_cmd = r"./software/Tesseract-OCR/tesseract.exe" |
| poppler_path = "./software/poppler-24.08.0/Library/bin" |
|
|
| images = convert_from_bytes(content, poppler_path=poppler_path) |
| logger.info(f"Tesseract OCR: converting {len(images)} pages") |
|
|
| for page_num, image in enumerate(images, start=1): |
| page_text = pytesseract.image_to_string(image) |
| if not page_text.strip(): |
| continue |
| for chunk in self.text_splitter.split_text(page_text): |
| documents.append(LangChainDocument( |
| page_content=chunk, |
| metadata={ |
| "user_id": db_doc.user_id, |
| "source_type": "document", |
| "data": { |
| "document_id": db_doc.id, |
| "filename": db_doc.filename, |
| "file_type": db_doc.file_type, |
| "chunk_index": len(documents), |
| "page_label": page_num, |
| }, |
| } |
| )) |
|
|
| return documents |
|
|
| def _profile_dataframe( |
| self, df: pd.DataFrame, source_name: str, db_doc: DBDocument |
| ) -> List[LangChainDocument]: |
| """Profile each column of a dataframe → one chunk per column.""" |
| documents = [] |
| row_count = len(df) |
|
|
| for col_name in df.columns: |
| col = df[col_name] |
| is_numeric = pd.api.types.is_numeric_dtype(col) |
| null_count = int(col.isnull().sum()) |
| distinct_count = int(col.nunique()) |
| distinct_ratio = distinct_count / row_count if row_count > 0 else 0 |
|
|
| text = f"Source: {source_name} ({row_count} rows)\n" |
| text += f"Column: {col_name} ({col.dtype})\n" |
| text += f"Null count: {null_count}\n" |
| text += f"Distinct count: {distinct_count} ({distinct_ratio:.1%})\n" |
|
|
| if is_numeric: |
| text += f"Min: {col.min()}, Max: {col.max()}\n" |
| text += f"Mean: {col.mean():.4f}, Median: {col.median():.4f}\n" |
|
|
| if 0 < distinct_ratio <= 0.05: |
| top_values = col.value_counts().head(10) |
| top_str = ", ".join(f"{v} ({c})" for v, c in top_values.items()) |
| text += f"Top values: {top_str}\n" |
|
|
| text += f"Sample values: {col.dropna().head(5).tolist()}" |
|
|
| documents.append(LangChainDocument( |
| page_content=text, |
| metadata={ |
| "user_id": db_doc.user_id, |
| "source_type": "document", |
| "data": { |
| "document_id": db_doc.id, |
| "filename": db_doc.filename, |
| "file_type": db_doc.file_type, |
| "source": source_name, |
| "column_name": col_name, |
| "column_type": str(col.dtype), |
| } |
| } |
| )) |
| return documents |
|
|
| def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]: |
| """Profile each column of a CSV file.""" |
| df = pd.read_csv(BytesIO(content)) |
| return self._profile_dataframe(df, db_doc.filename, db_doc) |
|
|
| def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]: |
| """Profile each column of every sheet in an Excel file.""" |
| sheets = pd.read_excel(BytesIO(content), sheet_name=None) |
| documents = [] |
| for sheet_name, df in sheets.items(): |
| source_name = f"{db_doc.filename} / sheet: {sheet_name}" |
| documents.extend(self._profile_dataframe(df, source_name, db_doc)) |
| return documents |
|
|
| def _extract_text(self, content: bytes, file_type: str) -> str: |
| """Extract text from DOCX or TXT content.""" |
| if file_type == "docx": |
| doc = docx.Document(BytesIO(content)) |
| return "\n".join(p.text for p in doc.paragraphs) |
| elif file_type == "txt": |
| return content.decode("utf-8") |
| else: |
| raise ValueError(f"Unsupported file type: {file_type}") |
|
|
|
|
| knowledge_processor = KnowledgeProcessingService() |
|
|