Agentic-Service-Data-Eyond / src /knowledge /processing_service.py
ishaq101's picture
[KM-436][KM-437] add and modify for database and document pipeline (#3)
2ba0613
"""Service for processing documents and ingesting to vector store."""
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document as LangChainDocument
from src.db.postgres.vector_store import get_vector_store
from src.storage.az_blob.az_blob import blob_storage
from src.db.postgres.models import Document as DBDocument
from sqlalchemy.ext.asyncio import AsyncSession
from src.middlewares.logging import get_logger
from typing import List
import sys
import docx
import pandas as pd
import pytesseract
from pdf2image import convert_from_bytes
from io import BytesIO
logger = get_logger("knowledge_processing")
class KnowledgeProcessingService:
"""Service for processing documents and ingesting to vector store."""
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int:
"""Process document and ingest to vector store.
Returns:
Number of chunks ingested
"""
try:
logger.info(f"Processing document {db_doc.id}")
content = await blob_storage.download_file(db_doc.blob_name)
if db_doc.file_type == "pdf":
documents = await self._build_pdf_documents(content, db_doc)
elif db_doc.file_type == "csv":
documents = self._build_csv_documents(content, db_doc)
elif db_doc.file_type == "xlsx":
documents = self._build_excel_documents(content, db_doc)
else:
text = self._extract_text(content, db_doc.file_type)
if not text.strip():
raise ValueError("No text extracted from document")
chunks = self.text_splitter.split_text(text)
documents = [
LangChainDocument(
page_content=chunk,
metadata={
"user_id": db_doc.user_id,
"source_type": "document",
"data": {
"document_id": db_doc.id,
"filename": db_doc.filename,
"file_type": db_doc.file_type,
"chunk_index": i,
},
}
)
for i, chunk in enumerate(chunks)
]
if not documents:
raise ValueError("No text extracted from document")
vector_store = get_vector_store()
await vector_store.aadd_documents(documents)
logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested")
return len(documents)
except Exception as e:
logger.error(f"Failed to process document {db_doc.id}", error=str(e))
raise
async def _build_pdf_documents(
self, content: bytes, db_doc: DBDocument
) -> List[LangChainDocument]:
"""Build LangChain documents from PDF with page_label metadata using Tesseract OCR."""
documents: List[LangChainDocument] = []
poppler_path = None
if sys.platform == "win32":
pytesseract.pytesseract.tesseract_cmd = r"./software/Tesseract-OCR/tesseract.exe"
poppler_path = "./software/poppler-24.08.0/Library/bin"
images = convert_from_bytes(content, poppler_path=poppler_path)
logger.info(f"Tesseract OCR: converting {len(images)} pages")
for page_num, image in enumerate(images, start=1):
page_text = pytesseract.image_to_string(image)
if not page_text.strip():
continue
for chunk in self.text_splitter.split_text(page_text):
documents.append(LangChainDocument(
page_content=chunk,
metadata={
"user_id": db_doc.user_id,
"source_type": "document",
"data": {
"document_id": db_doc.id,
"filename": db_doc.filename,
"file_type": db_doc.file_type,
"chunk_index": len(documents),
"page_label": page_num,
},
}
))
return documents
def _profile_dataframe(
self, df: pd.DataFrame, source_name: str, db_doc: DBDocument
) -> List[LangChainDocument]:
"""Profile each column of a dataframe → one chunk per column."""
documents = []
row_count = len(df)
for col_name in df.columns:
col = df[col_name]
is_numeric = pd.api.types.is_numeric_dtype(col)
null_count = int(col.isnull().sum())
distinct_count = int(col.nunique())
distinct_ratio = distinct_count / row_count if row_count > 0 else 0
text = f"Source: {source_name} ({row_count} rows)\n"
text += f"Column: {col_name} ({col.dtype})\n"
text += f"Null count: {null_count}\n"
text += f"Distinct count: {distinct_count} ({distinct_ratio:.1%})\n"
if is_numeric:
text += f"Min: {col.min()}, Max: {col.max()}\n"
text += f"Mean: {col.mean():.4f}, Median: {col.median():.4f}\n"
if 0 < distinct_ratio <= 0.05:
top_values = col.value_counts().head(10)
top_str = ", ".join(f"{v} ({c})" for v, c in top_values.items())
text += f"Top values: {top_str}\n"
text += f"Sample values: {col.dropna().head(5).tolist()}"
documents.append(LangChainDocument(
page_content=text,
metadata={
"user_id": db_doc.user_id,
"source_type": "document",
"data": {
"document_id": db_doc.id,
"filename": db_doc.filename,
"file_type": db_doc.file_type,
"source": source_name,
"column_name": col_name,
"column_type": str(col.dtype),
}
}
))
return documents
def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
"""Profile each column of a CSV file."""
df = pd.read_csv(BytesIO(content))
return self._profile_dataframe(df, db_doc.filename, db_doc)
def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
"""Profile each column of every sheet in an Excel file."""
sheets = pd.read_excel(BytesIO(content), sheet_name=None)
documents = []
for sheet_name, df in sheets.items():
source_name = f"{db_doc.filename} / sheet: {sheet_name}"
documents.extend(self._profile_dataframe(df, source_name, db_doc))
return documents
def _extract_text(self, content: bytes, file_type: str) -> str:
"""Extract text from DOCX or TXT content."""
if file_type == "docx":
doc = docx.Document(BytesIO(content))
return "\n".join(p.text for p in doc.paragraphs)
elif file_type == "txt":
return content.decode("utf-8")
else:
raise ValueError(f"Unsupported file type: {file_type}")
knowledge_processor = KnowledgeProcessingService()