File size: 7,706 Bytes
027123c
 
 
 
 
 
 
 
 
 
2ba0613
027123c
2ba0613
 
 
027123c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2ba0613
 
 
 
027123c
 
 
 
 
 
 
 
 
 
2ba0613
 
 
 
 
 
 
027123c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2ba0613
027123c
 
2ba0613
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
 
2ba0613
027123c
 
2ba0613
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
2ba0613
 
 
 
 
 
 
 
 
 
 
 
027123c
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""Service for processing documents and ingesting to vector store."""

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document as LangChainDocument
from src.db.postgres.vector_store import get_vector_store
from src.storage.az_blob.az_blob import blob_storage
from src.db.postgres.models import Document as DBDocument
from sqlalchemy.ext.asyncio import AsyncSession
from src.middlewares.logging import get_logger
from typing import List
import sys
import docx
import pandas as pd
import pytesseract
from pdf2image import convert_from_bytes
from io import BytesIO

logger = get_logger("knowledge_processing")


class KnowledgeProcessingService:
    """Service for processing documents and ingesting to vector store."""

    def __init__(self):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )

    async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int:
        """Process document and ingest to vector store.

        Returns:
            Number of chunks ingested
        """
        try:
            logger.info(f"Processing document {db_doc.id}")
            content = await blob_storage.download_file(db_doc.blob_name)

            if db_doc.file_type == "pdf":
                documents = await self._build_pdf_documents(content, db_doc)
            elif db_doc.file_type == "csv":
                documents = self._build_csv_documents(content, db_doc)
            elif db_doc.file_type == "xlsx":
                documents = self._build_excel_documents(content, db_doc)
            else:
                text = self._extract_text(content, db_doc.file_type)
                if not text.strip():
                    raise ValueError("No text extracted from document")
                chunks = self.text_splitter.split_text(text)
                documents = [
                    LangChainDocument(
                        page_content=chunk,
                        metadata={
                            "user_id": db_doc.user_id,
                            "source_type": "document",
                            "data": {
                                "document_id": db_doc.id,
                                "filename": db_doc.filename,
                                "file_type": db_doc.file_type,
                                "chunk_index": i,
                            },
                        }
                    )
                    for i, chunk in enumerate(chunks)
                ]

            if not documents:
                raise ValueError("No text extracted from document")

            vector_store = get_vector_store()
            await vector_store.aadd_documents(documents)

            logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested")
            return len(documents)

        except Exception as e:
            logger.error(f"Failed to process document {db_doc.id}", error=str(e))
            raise

    async def _build_pdf_documents(
        self, content: bytes, db_doc: DBDocument
    ) -> List[LangChainDocument]:
        """Build LangChain documents from PDF with page_label metadata using Tesseract OCR."""
        documents: List[LangChainDocument] = []

        poppler_path = None
        if sys.platform == "win32":
            pytesseract.pytesseract.tesseract_cmd = r"./software/Tesseract-OCR/tesseract.exe"
            poppler_path = "./software/poppler-24.08.0/Library/bin"

        images = convert_from_bytes(content, poppler_path=poppler_path)
        logger.info(f"Tesseract OCR: converting {len(images)} pages")

        for page_num, image in enumerate(images, start=1):
            page_text = pytesseract.image_to_string(image)
            if not page_text.strip():
                continue
            for chunk in self.text_splitter.split_text(page_text):
                documents.append(LangChainDocument(
                    page_content=chunk,
                    metadata={
                        "user_id": db_doc.user_id,
                        "source_type": "document",
                        "data": {
                            "document_id": db_doc.id,
                            "filename": db_doc.filename,
                            "file_type": db_doc.file_type,
                            "chunk_index": len(documents),
                            "page_label": page_num,
                        },
                    }
                ))

        return documents

    def _profile_dataframe(
        self, df: pd.DataFrame, source_name: str, db_doc: DBDocument
    ) -> List[LangChainDocument]:
        """Profile each column of a dataframe → one chunk per column."""
        documents = []
        row_count = len(df)

        for col_name in df.columns:
            col = df[col_name]
            is_numeric = pd.api.types.is_numeric_dtype(col)
            null_count = int(col.isnull().sum())
            distinct_count = int(col.nunique())
            distinct_ratio = distinct_count / row_count if row_count > 0 else 0

            text = f"Source: {source_name} ({row_count} rows)\n"
            text += f"Column: {col_name} ({col.dtype})\n"
            text += f"Null count: {null_count}\n"
            text += f"Distinct count: {distinct_count} ({distinct_ratio:.1%})\n"

            if is_numeric:
                text += f"Min: {col.min()}, Max: {col.max()}\n"
                text += f"Mean: {col.mean():.4f}, Median: {col.median():.4f}\n"

            if 0 < distinct_ratio <= 0.05:
                top_values = col.value_counts().head(10)
                top_str = ", ".join(f"{v} ({c})" for v, c in top_values.items())
                text += f"Top values: {top_str}\n"

            text += f"Sample values: {col.dropna().head(5).tolist()}"

            documents.append(LangChainDocument(
                page_content=text,
                metadata={
                    "user_id": db_doc.user_id,
                    "source_type": "document",
                    "data": {
                        "document_id": db_doc.id,
                        "filename": db_doc.filename,
                        "file_type": db_doc.file_type,
                        "source": source_name,
                        "column_name": col_name,
                        "column_type": str(col.dtype),
                    }
                }
            ))
        return documents

    def _build_csv_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
        """Profile each column of a CSV file."""
        df = pd.read_csv(BytesIO(content))
        return self._profile_dataframe(df, db_doc.filename, db_doc)

    def _build_excel_documents(self, content: bytes, db_doc: DBDocument) -> List[LangChainDocument]:
        """Profile each column of every sheet in an Excel file."""
        sheets = pd.read_excel(BytesIO(content), sheet_name=None)
        documents = []
        for sheet_name, df in sheets.items():
            source_name = f"{db_doc.filename} / sheet: {sheet_name}"
            documents.extend(self._profile_dataframe(df, source_name, db_doc))
        return documents

    def _extract_text(self, content: bytes, file_type: str) -> str:
        """Extract text from DOCX or TXT content."""
        if file_type == "docx":
            doc = docx.Document(BytesIO(content))
            return "\n".join(p.text for p in doc.paragraphs)
        elif file_type == "txt":
            return content.decode("utf-8")
        else:
            raise ValueError(f"Unsupported file type: {file_type}")


knowledge_processor = KnowledgeProcessingService()