| import os |
| import logging |
| import uuid |
| import pinecone |
| from app.utils.pinecone_fix import PineconeConnectionManager, check_connection |
| import time |
| from typing import List, Dict, Any, Optional |
|
|
| |
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings |
| import google.generativeai as genai |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| class PDFProcessor: |
| """Process PDF files and create embeddings in Pinecone""" |
| |
| def __init__(self, index_name="testbot768", namespace="Default", api_key=None, vector_db_id=None, mock_mode=False, correlation_id=None): |
| self.index_name = index_name |
| self.namespace = namespace |
| self.api_key = api_key |
| self.vector_db_id = vector_db_id |
| self.pinecone_index = None |
| self.mock_mode = False |
| self.correlation_id = correlation_id or str(uuid.uuid4())[:8] |
| self.google_api_key = os.environ.get("GOOGLE_API_KEY") |
| |
| |
| if self.api_key: |
| try: |
| |
| logger.info(f"[{self.correlation_id}] Initializing Pinecone connection to {self.index_name}") |
| self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
| logger.info(f"[{self.correlation_id}] Successfully connected to Pinecone index {self.index_name}") |
| except Exception as e: |
| logger.error(f"[{self.correlation_id}] Failed to initialize Pinecone: {str(e)}") |
| |
| |
| async def process_pdf(self, file_path, document_id=None, metadata=None, progress_callback=None): |
| """Process a PDF file and create vector embeddings |
| |
| This method: |
| 1. Extracts text from PDF using PyPDFLoader |
| 2. Splits text into chunks using RecursiveCharacterTextSplitter |
| 3. Creates embeddings using Google Gemini model |
| 4. Stores embeddings in Pinecone |
| """ |
| logger.info(f"[{self.correlation_id}] Processing PDF: {file_path}") |
| |
| try: |
| |
| if metadata is None: |
| metadata = {} |
| |
| |
| if document_id is None: |
| document_id = str(uuid.uuid4()) |
| |
| |
| metadata["document_id"] = document_id |
| |
| |
| actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace |
| |
| |
| logger.info(f"[{self.correlation_id}] Extracting text from PDF: {file_path}") |
| if progress_callback: |
| await progress_callback(None, document_id, "text_extraction", 0.2, "Extracting text from PDF") |
| |
| loader = PyPDFLoader(file_path) |
| documents = loader.load() |
| total_text_length = sum(len(doc.page_content) for doc in documents) |
| |
| logger.info(f"[{self.correlation_id}] Extracted {len(documents)} pages, total text length: {total_text_length}") |
| |
| |
| if progress_callback: |
| await progress_callback(None, document_id, "chunking", 0.4, "Splitting text into chunks") |
| |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=100, |
| length_function=len, |
| separators=["\n\n", "\n", " ", ""] |
| ) |
| |
| chunks = text_splitter.split_documents(documents) |
| |
| logger.info(f"[{self.correlation_id}] Split into {len(chunks)} chunks") |
| |
| |
| if progress_callback: |
| await progress_callback(None, document_id, "embedding", 0.6, "Creating embeddings") |
| |
| |
| if not self.google_api_key: |
| raise ValueError("Google API key not found in environment variables") |
| |
| |
| masked_key = self.google_api_key[:8] + "..." + self.google_api_key[-4:] if len(self.google_api_key) > 12 else "***" |
| logger.info(f"[{self.correlation_id}] Using Google API key: {masked_key}") |
| |
| genai.configure(api_key=self.google_api_key) |
| |
| |
| logger.info(f"[{self.correlation_id}] Checking Pinecone index dimensions") |
| if not self.pinecone_index: |
| self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
| |
| stats = self.pinecone_index.describe_index_stats() |
| pinecone_dimension = stats.dimension |
| logger.info(f"[{self.correlation_id}] Pinecone index dimension: {pinecone_dimension}") |
| |
| |
| embedding_model = GoogleGenerativeAIEmbeddings( |
| model="models/text-embedding-004", |
| google_api_key=self.google_api_key, |
| task_type="retrieval_document" |
| ) |
| |
| |
| sample_embedding = embedding_model.embed_query("test") |
| embedding_dimension = len(sample_embedding) |
| |
| logger.info(f"[{self.correlation_id}] Generated embeddings with dimension: {embedding_dimension}") |
| |
| |
| if embedding_dimension != pinecone_dimension: |
| logger.warning(f"[{self.correlation_id}] Embedding dimension mismatch: got {embedding_dimension}, need {pinecone_dimension}") |
| |
| if embedding_dimension < pinecone_dimension: |
| |
| |
| logger.info(f"[{self.correlation_id}] Using duplication strategy to upscale from {embedding_dimension} to {pinecone_dimension}") |
| |
| if embedding_dimension * 2 == pinecone_dimension: |
| |
| def adjust_embedding(embedding): |
| |
| return [val for val in embedding for _ in range(2)] |
| else: |
| |
| pad_size = pinecone_dimension - embedding_dimension |
| def adjust_embedding(embedding): |
| return embedding + [0.0] * pad_size |
| else: |
| |
| logger.info(f"[{self.correlation_id}] Will truncate embeddings from {embedding_dimension} to {pinecone_dimension}") |
| |
| def adjust_embedding(embedding): |
| return embedding[:pinecone_dimension] |
| else: |
| |
| def adjust_embedding(embedding): |
| return embedding |
| |
| |
| batch_size = 10 |
| vectors_to_upsert = [] |
| |
| for i in range(0, len(chunks), batch_size): |
| batch = chunks[i:i+batch_size] |
| |
| |
| texts = [chunk.page_content for chunk in batch] |
| |
| |
| embeddings = embedding_model.embed_documents(texts) |
| |
| |
| for j, (chunk, embedding) in enumerate(zip(batch, embeddings)): |
| |
| adjusted_embedding = adjust_embedding(embedding) |
| |
| |
| if len(adjusted_embedding) != pinecone_dimension: |
| raise ValueError(f"Dimension mismatch after adjustment: got {len(adjusted_embedding)}, expected {pinecone_dimension}") |
| |
| |
| chunk_metadata = { |
| "document_id": document_id, |
| "page": chunk.metadata.get("page", 0), |
| "chunk_id": f"{document_id}-chunk-{i+j}", |
| "text": chunk.page_content[:1000], |
| **metadata |
| } |
| |
| |
| vector = { |
| "id": f"{document_id}-{i+j}", |
| "values": adjusted_embedding, |
| "metadata": chunk_metadata |
| } |
| |
| vectors_to_upsert.append(vector) |
| |
| logger.info(f"[{self.correlation_id}] Processed batch {i//batch_size + 1}/{(len(chunks)-1)//batch_size + 1}") |
| |
| |
| if progress_callback: |
| await progress_callback(None, document_id, "storing", 0.8, f"Storing {len(vectors_to_upsert)} vectors in Pinecone") |
| |
| logger.info(f"[{self.correlation_id}] Upserting {len(vectors_to_upsert)} vectors to Pinecone index {self.index_name}, namespace {actual_namespace}") |
| |
| |
| result = PineconeConnectionManager.upsert_vectors_with_validation( |
| self.pinecone_index, |
| vectors_to_upsert, |
| namespace=actual_namespace |
| ) |
| |
| logger.info(f"[{self.correlation_id}] Successfully upserted {result.get('upserted_count', 0)} vectors to Pinecone") |
| |
| if progress_callback: |
| await progress_callback(None, document_id, "embedding_complete", 1.0, "Processing completed") |
| |
| |
| return { |
| "success": True, |
| "document_id": document_id, |
| "chunks_processed": len(chunks), |
| "total_text_length": total_text_length, |
| "vectors_created": len(vectors_to_upsert), |
| "vectors_upserted": result.get('upserted_count', 0), |
| "message": "PDF processed successfully" |
| } |
| except Exception as e: |
| logger.error(f"[{self.correlation_id}] Error processing PDF: {str(e)}") |
| return { |
| "success": False, |
| "error": f"Error processing PDF: {str(e)}" |
| } |
| |
| async def list_namespaces(self): |
| """List all namespaces in the Pinecone index""" |
| try: |
| if not self.pinecone_index: |
| self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
| |
| |
| stats = self.pinecone_index.describe_index_stats() |
| namespaces = list(stats.get("namespaces", {}).keys()) |
| |
| return { |
| "success": True, |
| "namespaces": namespaces |
| } |
| except Exception as e: |
| logger.error(f"[{self.correlation_id}] Error listing namespaces: {str(e)}") |
| return { |
| "success": False, |
| "error": f"Error listing namespaces: {str(e)}" |
| } |
| |
| async def delete_namespace(self): |
| """Delete all vectors in a namespace""" |
| try: |
| if not self.pinecone_index: |
| self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
| |
| logger.info(f"[{self.correlation_id}] Deleting namespace '{self.namespace}' from index '{self.index_name}'") |
| |
| |
| stats = self.pinecone_index.describe_index_stats() |
| namespaces = stats.get("namespaces", {}) |
| |
| if self.namespace in namespaces: |
| vector_count = namespaces[self.namespace].get("vector_count", 0) |
| |
| self.pinecone_index.delete(delete_all=True, namespace=self.namespace) |
| return { |
| "success": True, |
| "namespace": self.namespace, |
| "deleted_count": vector_count, |
| "message": f"Successfully deleted namespace '{self.namespace}' with {vector_count} vectors" |
| } |
| else: |
| return { |
| "success": True, |
| "namespace": self.namespace, |
| "deleted_count": 0, |
| "message": f"Namespace '{self.namespace}' does not exist - nothing to delete" |
| } |
| except Exception as e: |
| logger.error(f"[{self.correlation_id}] Error deleting namespace: {str(e)}") |
| return { |
| "success": False, |
| "namespace": self.namespace, |
| "error": f"Error deleting namespace: {str(e)}" |
| } |
| |
| async def delete_document(self, document_id, additional_metadata=None): |
| """Delete vectors associated with a specific document ID or name""" |
| logger.info(f"[{self.correlation_id}] Deleting vectors for document '{document_id}' from namespace '{self.namespace}'") |
|
|
| try: |
| if not self.pinecone_index: |
| self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
| |
| |
| |
| actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace |
| |
| |
| filters = [] |
| |
| |
| filters.append({"document_id": document_id}) |
| |
| |
| if len(document_id) >= 32: |
| |
| if "-" in document_id: |
| |
| filters.append({"document_id": document_id.replace("-", "")}) |
| else: |
| |
| try: |
| formatted_uuid = str(uuid.UUID(document_id)) |
| filters.append({"document_id": formatted_uuid}) |
| except ValueError: |
| pass |
| |
| |
| if not document_id.startswith("doc-") and not document_id.startswith("test-doc-") and len(document_id) < 36: |
| |
| filters.append({"title": document_id}) |
| |
| |
| if additional_metadata: |
| if "document_name" in additional_metadata: |
| |
| filters.append({"title": additional_metadata["document_name"]}) |
| |
| |
| if "." in additional_metadata["document_name"]: |
| filters.append({"filename": additional_metadata["document_name"]}) |
| |
| |
| found_vectors = False |
| deleted_count = 0 |
| filter_used = "" |
| |
| logger.info(f"[{self.correlation_id}] Will try {len(filters)} different filters to find document") |
| |
| for i, filter_query in enumerate(filters): |
| logger.info(f"[{self.correlation_id}] Searching for vectors with filter #{i+1}: {filter_query}") |
| |
| |
| try: |
| results = self.pinecone_index.query( |
| vector=[0] * 1536, |
| top_k=1, |
| include_metadata=True, |
| filter=filter_query, |
| namespace=actual_namespace |
| ) |
| |
| if results and results.get("matches") and len(results.get("matches", [])) > 0: |
| logger.info(f"[{self.correlation_id}] Found vectors matching filter: {filter_query}") |
| found_vectors = True |
| filter_used = str(filter_query) |
| |
| |
| delete_result = self.pinecone_index.delete( |
| filter=filter_query, |
| namespace=actual_namespace |
| ) |
| |
| |
| deleted_count = delete_result.get("deleted_count", 0) |
| logger.info(f"[{self.correlation_id}] Deleted {deleted_count} vectors with filter: {filter_query}") |
| break |
| except Exception as filter_error: |
| logger.warning(f"[{self.correlation_id}] Error searching with filter {filter_query}: {str(filter_error)}") |
| continue |
| |
| |
| if not found_vectors: |
| logger.warning(f"[{self.correlation_id}] No vectors found for document '{document_id}' in namespace '{actual_namespace}'") |
| return { |
| "success": True, |
| "document_id": document_id, |
| "namespace": actual_namespace, |
| "deleted_count": 0, |
| "warning": f"No vectors found for document '{document_id}' in namespace '{actual_namespace}'", |
| "message": f"Found 0 vectors for document '{document_id}' in namespace '{actual_namespace}'", |
| "vectors_found": False, |
| "vectors_deleted": 0 |
| } |
| |
| return { |
| "success": True, |
| "document_id": document_id, |
| "namespace": actual_namespace, |
| "deleted_count": deleted_count, |
| "filter_used": filter_used, |
| "message": f"Successfully deleted {deleted_count} vectors for document '{document_id}' from namespace '{actual_namespace}'", |
| "vectors_found": True, |
| "vectors_deleted": deleted_count |
| } |
| except Exception as e: |
| logger.error(f"[{self.correlation_id}] Error deleting document vectors: {str(e)}") |
| return { |
| "success": False, |
| "document_id": document_id, |
| "error": f"Error deleting document vectors: {str(e)}", |
| "vectors_found": False, |
| "vectors_deleted": 0 |
| } |
| |
| async def list_documents(self): |
| """List all documents in a namespace""" |
| |
| actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace |
| |
| try: |
| if not self.pinecone_index: |
| self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
| |
| logger.info(f"[{self.correlation_id}] Listing documents in namespace '{actual_namespace}'") |
| |
| |
| stats = self.pinecone_index.describe_index_stats() |
| namespace_stats = stats.get("namespaces", {}).get(actual_namespace, {}) |
| vector_count = namespace_stats.get("vector_count", 0) |
| |
| if vector_count == 0: |
| |
| return DocumentsListResponse( |
| success=True, |
| total_vectors=0, |
| namespace=actual_namespace, |
| index_name=self.index_name, |
| documents=[] |
| ).dict() |
| |
| |
| |
| results = self.pinecone_index.query( |
| vector=[0] * stats.dimension, |
| top_k=min(vector_count, 1000), |
| include_metadata=True, |
| namespace=actual_namespace |
| ) |
| |
| |
| seen_documents = set() |
| documents = [] |
| |
| for match in results.get("matches", []): |
| metadata = match.get("metadata", {}) |
| document_id = metadata.get("document_id") |
| |
| if document_id and document_id not in seen_documents: |
| seen_documents.add(document_id) |
| doc_info = { |
| "id": document_id, |
| "title": metadata.get("title"), |
| "filename": metadata.get("filename"), |
| "content_type": metadata.get("content_type"), |
| "chunk_count": 0 |
| } |
| documents.append(doc_info) |
| |
| |
| for doc in documents: |
| if doc["id"] == document_id: |
| doc["chunk_count"] += 1 |
| break |
| |
| return DocumentsListResponse( |
| success=True, |
| total_vectors=vector_count, |
| namespace=actual_namespace, |
| index_name=self.index_name, |
| documents=documents |
| ).dict() |
| |
| except Exception as e: |
| logger.error(f"[{self.correlation_id}] Error listing documents: {str(e)}") |
| return DocumentsListResponse( |
| success=False, |
| error=f"Error listing documents: {str(e)}" |
| ).dict() |
|
|
|
|
|
|
|
|
|
|