| import fs from 'fs'; |
| import path from 'path'; |
| import { modalClient } from './modal-client'; |
| import { nebiusClient } from './nebius-client'; |
| import { FileProcessor } from './file-upload'; |
| import { storage } from './storage'; |
| import { type Document, type InsertDocument } from '@shared/schema'; |
|
|
| export interface ProcessingResult { |
| success: boolean; |
| extractedText?: string; |
| embeddings?: number[]; |
| modalTaskId?: string; |
| error?: string; |
| processingTime: number; |
| } |
|
|
| export interface BatchProcessingResult { |
| success: boolean; |
| processedCount: number; |
| failedCount: number; |
| results: Array<{ |
| documentId: number; |
| success: boolean; |
| extractedText?: string; |
| embeddings?: number[]; |
| error?: string; |
| }>; |
| totalProcessingTime: number; |
| } |
|
|
| export class DocumentProcessor { |
| private static instance: DocumentProcessor; |
|
|
| static getInstance(): DocumentProcessor { |
| if (!DocumentProcessor.instance) { |
| DocumentProcessor.instance = new DocumentProcessor(); |
| } |
| return DocumentProcessor.instance; |
| } |
|
|
| |
| |
| |
| async processDocument( |
| document: Document, |
| operations: Array<'extract_text' | 'generate_embedding' | 'build_index'> = ['extract_text'] |
| ): Promise<ProcessingResult> { |
| const startTime = Date.now(); |
| |
| try { |
| let extractedText = document.content; |
| let embeddings: number[] | undefined; |
| let modalTaskId: string | undefined; |
|
|
| |
| if (operations.includes('extract_text') && document.filePath) { |
| const textResult = await this.extractText(document); |
| if (textResult.success) { |
| extractedText = textResult.extractedText || document.content; |
| modalTaskId = textResult.modalTaskId; |
| } else { |
| console.warn(`Text extraction failed for document ${document.id}: ${textResult.error}`); |
| } |
| } |
|
|
| |
| if (operations.includes('generate_embedding') && extractedText) { |
| const embeddingResult = await this.generateEmbeddings(extractedText); |
| if (embeddingResult.success) { |
| embeddings = embeddingResult.embeddings; |
| } else { |
| console.warn(`Embedding generation failed for document ${document.id}: ${embeddingResult.error}`); |
| } |
| } |
|
|
| const processingTime = Date.now() - startTime; |
|
|
| return { |
| success: true, |
| extractedText, |
| embeddings, |
| modalTaskId, |
| processingTime |
| }; |
|
|
| } catch (error) { |
| const processingTime = Date.now() - startTime; |
| return { |
| success: false, |
| error: error instanceof Error ? error.message : String(error), |
| processingTime |
| }; |
| } |
| } |
|
|
| |
| |
| |
| async batchProcessDocuments( |
| documents: Document[], |
| operations: Array<'extract_text' | 'generate_embedding' | 'build_index'> = ['extract_text'] |
| ): Promise<BatchProcessingResult> { |
| const startTime = Date.now(); |
| const results: BatchProcessingResult['results'] = []; |
| |
| try { |
| |
| const documentsForModal = documents.filter(doc => |
| doc.filePath && FileProcessor.requiresOCR(doc.mimeType || '') |
| ); |
| |
| const documentsForLocal = documents.filter(doc => |
| !doc.filePath || !FileProcessor.requiresOCR(doc.mimeType || '') |
| ); |
|
|
| |
| if (documentsForModal.length > 0 && operations.includes('extract_text')) { |
| try { |
| const modalResults = await this.batchExtractTextModal(documentsForModal); |
| results.push(...modalResults); |
| } catch (error) { |
| console.error('Modal batch processing failed:', error); |
| |
| for (const doc of documentsForModal) { |
| const result = await this.processDocument(doc, operations); |
| results.push({ |
| documentId: doc.id, |
| success: result.success, |
| extractedText: result.extractedText, |
| embeddings: result.embeddings, |
| error: result.error |
| }); |
| } |
| } |
| } |
|
|
| |
| for (const doc of documentsForLocal) { |
| const result = await this.processDocument(doc, operations); |
| results.push({ |
| documentId: doc.id, |
| success: result.success, |
| extractedText: result.extractedText, |
| embeddings: result.embeddings, |
| error: result.error |
| }); |
| } |
|
|
| const totalProcessingTime = Date.now() - startTime; |
| const successCount = results.filter(r => r.success).length; |
| const failedCount = results.length - successCount; |
|
|
| return { |
| success: true, |
| processedCount: successCount, |
| failedCount, |
| results, |
| totalProcessingTime |
| }; |
|
|
| } catch (error) { |
| const totalProcessingTime = Date.now() - startTime; |
| return { |
| success: false, |
| processedCount: 0, |
| failedCount: documents.length, |
| results: documents.map(doc => ({ |
| documentId: doc.id, |
| success: false, |
| error: error instanceof Error ? error.message : String(error) |
| })), |
| totalProcessingTime |
| }; |
| } |
| } |
|
|
| |
| |
| |
| private async extractText(document: Document): Promise<{ |
| success: boolean; |
| extractedText?: string; |
| modalTaskId?: string; |
| error?: string; |
| }> { |
| if (!document.filePath) { |
| return { success: true, extractedText: document.content }; |
| } |
|
|
| const mimeType = document.mimeType || ''; |
|
|
| try { |
| |
| if (FileProcessor.isTextFile(mimeType)) { |
| const content = await FileProcessor.readTextFile(document.filePath); |
| return { success: true, extractedText: content }; |
| } |
|
|
| |
| if (FileProcessor.requiresOCR(mimeType)) { |
| return await this.extractTextModal(document); |
| } |
|
|
| |
| return { success: true, extractedText: document.content }; |
|
|
| } catch (error) { |
| return { |
| success: false, |
| error: error instanceof Error ? error.message : String(error) |
| }; |
| } |
| } |
|
|
| |
| |
| |
| private async extractTextModal(document: Document): Promise<{ |
| success: boolean; |
| extractedText?: string; |
| modalTaskId?: string; |
| error?: string; |
| }> { |
| try { |
| if (!document.filePath) { |
| throw new Error('No file path provided for Modal processing'); |
| } |
|
|
| |
| const fileBuffer = await fs.promises.readFile(document.filePath); |
| const base64Content = fileBuffer.toString('base64'); |
|
|
| |
| const modalDocument = { |
| id: document.id.toString(), |
| content: base64Content, |
| contentType: document.mimeType || 'application/octet-stream' |
| }; |
|
|
| |
| const result = await modalClient.extractTextFromDocuments([modalDocument]); |
| |
| if (result.status === 'completed' && result.results?.length > 0) { |
| const extractionResult = result.results[0]; |
| if (extractionResult.status === 'completed') { |
| return { |
| success: true, |
| extractedText: extractionResult.extracted_text, |
| modalTaskId: result.task_id |
| }; |
| } else { |
| return { |
| success: false, |
| error: extractionResult.error || 'Modal extraction failed' |
| }; |
| } |
| } else { |
| return { |
| success: false, |
| error: result.error || 'Modal processing failed' |
| }; |
| } |
|
|
| } catch (error) { |
| console.error('Modal text extraction failed:', error); |
| return { |
| success: false, |
| error: error instanceof Error ? error.message : String(error) |
| }; |
| } |
| } |
|
|
| |
| |
| |
| private async batchExtractTextModal(documents: Document[]): Promise<Array<{ |
| documentId: number; |
| success: boolean; |
| extractedText?: string; |
| error?: string; |
| }>> { |
| const modalDocuments = await Promise.all( |
| documents.map(async (doc) => { |
| if (!doc.filePath) return null; |
| |
| try { |
| const fileBuffer = await fs.promises.readFile(doc.filePath); |
| return { |
| id: doc.id.toString(), |
| content: fileBuffer.toString('base64'), |
| contentType: doc.mimeType || 'application/octet-stream' |
| }; |
| } catch (error) { |
| console.error(`Failed to read file for document ${doc.id}:`, error); |
| return null; |
| } |
| }) |
| ); |
|
|
| const validDocuments = modalDocuments.filter(doc => doc !== null) as any[]; |
| |
| if (validDocuments.length === 0) { |
| return documents.map(doc => ({ |
| documentId: doc.id, |
| success: false, |
| error: 'No valid documents for processing' |
| })); |
| } |
|
|
| try { |
| const batchResult = await modalClient.batchProcessDocuments({ |
| documents: validDocuments, |
| modelName: 'text-embedding-3-small', |
| batchSize: Math.min(validDocuments.length, 10) |
| }); |
|
|
| if (batchResult.status === 'completed' && batchResult.extraction_results) { |
| return batchResult.extraction_results.map((result: any) => ({ |
| documentId: parseInt(result.id), |
| success: result.status === 'completed', |
| extractedText: result.extracted_text, |
| error: result.error |
| })); |
| } else { |
| throw new Error(batchResult.error || 'Batch processing failed'); |
| } |
|
|
| } catch (error) { |
| console.error('Modal batch processing failed:', error); |
| return documents.map(doc => ({ |
| documentId: doc.id, |
| success: false, |
| error: error instanceof Error ? error.message : String(error) |
| })); |
| } |
| } |
|
|
| |
| |
| |
| private async generateEmbeddings(text: string): Promise<{ |
| success: boolean; |
| embeddings?: number[]; |
| error?: string; |
| }> { |
| try { |
| |
| const maxLength = 8000; |
| const truncatedText = text.length > maxLength ? text.substring(0, maxLength) : text; |
| |
| const result = await nebiusClient.generateEmbeddings(truncatedText); |
| |
| if (result.success && result.embeddings) { |
| return { |
| success: true, |
| embeddings: result.embeddings |
| }; |
| } else { |
| return { |
| success: false, |
| error: result.error || 'Embedding generation failed' |
| }; |
| } |
|
|
| } catch (error) { |
| return { |
| success: false, |
| error: error instanceof Error ? error.message : String(error) |
| }; |
| } |
| } |
|
|
| |
| |
| |
| async buildVectorIndex( |
| documents: Document[], |
| indexName = 'research_papers_clean_v2' |
| ): Promise<{ |
| success: boolean; |
| indexName?: string; |
| documentCount?: number; |
| error?: string; |
| }> { |
| try { |
| const modalDocuments = documents.map(doc => ({ |
| id: doc.id.toString(), |
| content: doc.content, |
| title: doc.title, |
| source: doc.source |
| })); |
|
|
| const result = await modalClient.buildVectorIndex(modalDocuments, { |
| indexName, |
| dimension: 1536, |
| indexType: 'IVF', |
| nlist: Math.min(100, Math.max(10, Math.floor(documents.length / 10))) |
| }); |
|
|
| if (result.status === 'completed') { |
| return { |
| success: true, |
| indexName: result.index_name, |
| documentCount: result.document_count |
| }; |
| } else { |
| return { |
| success: false, |
| error: result.error || 'Index building failed' |
| }; |
| } |
|
|
| } catch (error) { |
| return { |
| success: false, |
| error: error instanceof Error ? error.message : String(error) |
| }; |
| } |
| } |
|
|
| |
| |
| |
| async searchVectorIndex( |
| query: string, |
| indexName = 'research_papers_clean_v2', |
| maxResults = 10 |
| ): Promise<{ |
| success: boolean; |
| results?: Array<{ |
| id: string; |
| title: string; |
| content: string; |
| source: string; |
| relevanceScore: number; |
| rank: number; |
| snippet: string; |
| }>; |
| error?: string; |
| }> { |
| try { |
| const result = await modalClient.vectorSearch(query, indexName, maxResults); |
| if (result.status === 'completed') { |
| |
| const enrichedResults = await Promise.all( |
| result.results.map(async (vectorResult: any) => { |
| try { |
| |
| const dbDocument = await storage.getDocument(parseInt(vectorResult.id)); |
| if (dbDocument) { |
| |
| |
| const enriched = { |
| id: dbDocument.id, |
| title: dbDocument.title, |
| content: dbDocument.content, |
| source: dbDocument.source, |
| sourceType: dbDocument.sourceType, |
| url: dbDocument.url, |
| metadata: dbDocument.metadata, |
| createdAt: dbDocument.createdAt, |
| |
| relevanceScore: vectorResult.relevanceScore, |
| rank: vectorResult.rank, |
| snippet: vectorResult.snippet || dbDocument.content.substring(0, 200) + '...' |
| }; |
| return enriched; |
| } else { |
| |
| return vectorResult; |
| } |
| } catch (error) { |
| console.warn(`Failed to enrich vector result for ID ${vectorResult.id}:`, error); |
| return vectorResult; |
| } |
| }) |
| ); |
|
|
| return { |
| success: true, |
| results: enrichedResults |
| }; |
| } else { |
| return { |
| success: false, |
| error: result.error || 'Vector search failed' |
| }; |
| } |
|
|
| } catch (error) { |
| return { |
| success: false, |
| error: error instanceof Error ? error.message : String(error) |
| }; |
| } |
| } |
| } |
|
|
| export const documentProcessor = DocumentProcessor.getInstance(); |