File size: 7,706 Bytes
027123c 2ba0613 027123c 2ba0613 027123c 2ba0613 027123c 2ba0613 027123c 2ba0613 027123c 2ba0613 027123c 2ba0613 027123c 2ba0613 027123c 2ba0613 027123c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | """Service for processing documents and ingesting to vector store."""
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document as LangChainDocument
from src.db.postgres.vector_store import get_vector_store
from src.storage.az_blob.az_blob import blob_storage
from src.db.postgres.models import Document as DBDocument
from sqlalchemy.ext.asyncio import AsyncSession
from src.middlewares.logging import get_logger
from typing import List
import sys
import docx
import pandas as pd
import pytesseract
from pdf2image import convert_from_bytes
from io import BytesIO
logger = get_logger("knowledge_processing")
class KnowledgeProcessingService:
"""Service for processing documents and ingesting to vector store."""
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int:
"""Process document and ingest to vector store.
Returns:
Number of chunks ingested
"""
try:
logger.info(f"Processing document {db_doc.id}")
content = await blob_storage.download_file(db_doc.blob_name)
if db_doc.file_type == "pdf":
documents = await self._build_pdf_documents(content, db_doc)
elif db_doc.file_type == "csv":
documents = self._build_csv_documents(content, db_doc)
elif db_doc.file_type == "xlsx":
documents = self._build_excel_documents(content, db_doc)
else:
text = self._extract_text(content, db_doc.file_type)
if not text.strip():
raise ValueError("No text extracted from document")
chunks = self.text_splitter.split_text(text)
documents = [
LangChainDocument(
page_content=chunk,
metadata={
"user_id": db_doc.user_id,
"source_type": "document",
"data": {
"document_id": db_doc.id,
"filename": db_doc.filename,
"file_type": db_doc.file_type,
"chunk_index": i,
},
}
)
for i, chunk in enumerate(chunks)
]
if not documents:
raise ValueError("No text extracted from document")
vector_store = get_vector_store()
await vector_store.aadd_documents(documents)
logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested")
return len(documents)
except Exception as e:
logger.error(f"Failed to process document {db_doc.id}", error=str(e))
raise
async def _build_pdf_documents(
self, content: bytes, db_doc: DBDocument
) -> List[LangChainDocument]:
"""Build LangChain documents from PDF with page_label metadata using Tesseract OCR."""
documents: List[LangChainDocument] = []
poppler_path = None
if sys.platform == "win32":
pytesseract.pytesseract.tesseract_cmd = r"./software/Tesseract-OCR/tesseract.exe"
poppler_path = "./software/poppler-24.08.0/Library/bin"
images = convert_from_bytes(content, poppler_path=poppler_path)
logger.info(f"Tesseract OCR: converting {len(images)} pages")
for page_num, image in enumerate(images, start=1):
page_text = pytesseract.image_to_string(image)
if not page_text.strip():
continue
for chunk in self.text_splitter.split_text(page_text):
documents.append(LangChainDocument(
page_content=chunk,
metadata={
"user_id": db_doc.user_id,
"source_type": "document",
"data": {
"document_id": db_doc.id,
"filename": db_doc.filename,
"file_type": db_doc.file_type,
"chunk_index": len(documents),
"page_label": page_num,
},
}
))
return documents
def _profile_dataframe(
self, df: pd.DataFrame, source_name: str, db_doc: DBDocument
) -> List[LangChainDocument]:
"""Profile each column of a dataframe → one chunk per column."""
documents = []
row_count = len(df)
for col_name in df.columns:
col = df[col_name]
is_numeric = pd.api.types.is_numeric_dtype(col)
null_count = int(col.isnull().sum())
distinct_count = int(col.nunique())
distinct_ratio = distinct_count / row_count if row_count > 0 else 0
text = f"Source: {source_name} ({row_count} rows)\n"
text += f"Column: {col_name} ({col.dtype})\n"
text += f"Null count: {null_count}\n"
text += f"Distinct count: {distinct_count} ({distinct_ratio:.1%})\n"
if is_numeric:
text += f"Min: {col.min()}, Max: {col.max()}\n"
text += f"Mean: {col.mean():.4f}, Median: {col.median():.4f}\n"
if 0 < distinct_ratio <= 0.05:
top_values = col.value_counts().head(10)
top_str = ", ".join(f"{v} ({c})" for v, c in top_values.items())
text += f"Top values: {top_str}\n"
text += f"Sample values: {col.dropna().head(5).tolist()}"
documents.append(LangChainDocument(
page_content=text,
metadata={
"user_id": db_doc.user_id,
"source_type": "document",
"data": {
"document_id": db_doc.id,
"filename": db_doc.filename,
"file_type": db_doc.file_type,
"source": source_name,
"column_name": col_name,
"column_type": str(col.dtype),
}
}
))
return documents
def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
"""Profile each column of a CSV file."""
df = pd.read_csv(BytesIO(content))
return self._profile_dataframe(df, db_doc.filename, db_doc)
def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
"""Profile each column of every sheet in an Excel file."""
sheets = pd.read_excel(BytesIO(content), sheet_name=None)
documents = []
for sheet_name, df in sheets.items():
source_name = f"{db_doc.filename} / sheet: {sheet_name}"
documents.extend(self._profile_dataframe(df, source_name, db_doc))
return documents
def _extract_text(self, content: bytes, file_type: str) -> str:
"""Extract text from DOCX or TXT content."""
if file_type == "docx":
doc = docx.Document(BytesIO(content))
return "\n".join(p.text for p in doc.paragraphs)
elif file_type == "txt":
return content.decode("utf-8")
else:
raise ValueError(f"Unsupported file type: {file_type}")
knowledge_processor = KnowledgeProcessingService()
|