| | """ |
| | FAISS Database Toolkit for EvoAgentX |
| | |
| | This module provides a comprehensive toolkit for interacting with FAISS vector databases |
| | through the existing RAG engine and storage infrastructure. It includes tools for querying, |
| | inserting, deleting, and managing vector data with semantic search capabilities. |
| | |
| | The toolkit wraps the existing RAGEngine and StorageHandler to provide a unified interface |
| | for vector database operations that can be easily used by agents. |
| | |
| | Key Features: |
| | - Automatic database path validation and creation |
| | - Support for existing database detection and reuse |
| | - Robust error handling for invalid paths |
| | - Default configuration with sensible defaults |
| | - Seamless integration with the RAG pipeline |
| | - Automatic file path detection and content processing |
| | - Support for multiple file formats (PDF, text, Markdown, etc.) |
| | |
| | Usage: |
| | # Using default configuration with automatic path handling |
| | toolkit = FaissToolkit(db_path="./my_database.db") |
| | |
| | # Using custom configuration |
| | toolkit = FaissToolkit( |
| | storage_config=custom_storage_config, |
| | rag_config=custom_rag_config |
| | ) |
| | |
| | # Insert documents including file paths |
| | toolkit.get_tool("faiss_insert")( |
| | documents=[ |
| | "This is some text content", |
| | "./documents/report.pdf", # Will be automatically read and processed |
| | "./data/notes.txt" # Will be automatically read and processed |
| | ] |
| | ) |
| | """ |
| |
|
| | import os |
| | import asyncio |
| | import concurrent.futures |
| | from typing import Dict, Any, List, Optional |
| | from uuid import uuid4 |
| | from datetime import datetime |
| | from pathlib import Path |
| |
|
| | from .tool import Tool, Toolkit |
| | from .storage_handler import LocalStorageHandler |
| | from ..core.module import BaseModule |
| | from ..core.logging import logger |
| | from ..rag.rag import RAGEngine |
| | from ..rag.rag_config import RAGConfig |
| | from ..rag.schema import Query, Document, Corpus, DocumentMetadata |
| | from ..storages.base import StorageHandler |
| | from ..storages.storages_config import StoreConfig |
| | from .storage_handler import FileStorageHandler |
| |
|
| |
|
| | def _ensure_database_path(db_path: str) -> str: |
| | """ |
| | Ensure the database path exists and is properly configured. |
| | |
| | Args: |
| | db_path (str): The database file path |
| | |
| | Returns: |
| | str: The validated and prepared database path |
| | |
| | Raises: |
| | ValueError: If the path is invalid or cannot be created |
| | """ |
| | if not db_path: |
| | raise ValueError("Database path cannot be empty") |
| | |
| | |
| | path = Path(db_path).resolve() |
| | |
| | |
| | if path.exists() and path.is_dir(): |
| | raise ValueError(f"Database path points to a directory: {db_path}") |
| | |
| | |
| | try: |
| | path.parent.mkdir(parents=True, exist_ok=True) |
| | except Exception as e: |
| | raise ValueError(f"Cannot create directory for database path {db_path}: {e}") |
| | |
| | |
| | if path.exists(): |
| | logger.info(f"Found existing database at: {db_path}") |
| | |
| | try: |
| | import sqlite3 |
| | conn = sqlite3.connect(str(path)) |
| | conn.execute("SELECT name FROM sqlite_master WHERE type='table';") |
| | conn.close() |
| | logger.info("Database validation successful") |
| | except Exception as e: |
| | logger.warning(f"Database validation failed: {e}. Will create new database.") |
| | |
| | try: |
| | path.unlink() |
| | except Exception as unlink_error: |
| | logger.error(f"Failed to remove corrupted database file: {unlink_error}") |
| | raise ValueError(f"Cannot remove corrupted database file: {unlink_error}") |
| | else: |
| | logger.info(f"Database not found at: {db_path}. Will create new database.") |
| | |
| | return str(path) |
| |
|
| |
|
| | def _create_default_storage_config(db_path: Optional[str] = None) -> StoreConfig: |
| | """ |
| | Create a default storage configuration with proper path handling. |
| | |
| | Args: |
| | db_path (str, optional): Custom database path |
| | |
| | Returns: |
| | StoreConfig: Configured storage configuration |
| | """ |
| | from ..storages.storages_config import StoreConfig, DBConfig, VectorStoreConfig |
| | |
| | |
| | if db_path is None: |
| | db_path = "./faiss_db.sqlite" |
| | |
| | |
| | validated_db_path = _ensure_database_path(db_path) |
| | logger.info(f"Using validated database path: {validated_db_path}") |
| | |
| | |
| | index_cache_path = str(Path(validated_db_path).parent.resolve() / "index_cache") |
| | |
| | |
| | storage_config = StoreConfig( |
| | dbConfig=DBConfig( |
| | db_name="sqlite", |
| | path=validated_db_path |
| | ), |
| | vectorConfig=VectorStoreConfig( |
| | vector_name="faiss", |
| | dimensions=1536, |
| | index_type="flat_l2" |
| | ), |
| | path=index_cache_path |
| | ) |
| | |
| | |
| | Path(index_cache_path).mkdir(parents=True, exist_ok=True) |
| | |
| | return storage_config |
| |
|
| |
|
| | def _create_default_rag_config() -> RAGConfig: |
| | """ |
| | Create a default RAG configuration. |
| | |
| | Returns: |
| | RAGConfig: Configured RAG configuration |
| | """ |
| | from ..rag.rag_config import RAGConfig, EmbeddingConfig, ChunkerConfig |
| | |
| | return RAGConfig( |
| | embedding=EmbeddingConfig( |
| | provider="openai", |
| | model_name="text-embedding-ada-002" |
| | ), |
| | chunker=ChunkerConfig( |
| | chunk_size=500, |
| | chunk_overlap=50 |
| | ) |
| | ) |
| |
|
| |
|
| | class FaissDatabase(BaseModule): |
| | """ |
| | A high-level interface for FAISS vector database operations. |
| | |
| | This class wraps the RAGEngine and StorageHandler to provide a unified interface |
| | for vector database operations including document ingestion, semantic search, |
| | and corpus management. |
| | |
| | Attributes: |
| | rag_engine (RAGEngine): The RAG engine for document processing and retrieval |
| | storage_handler (StorageHandler): The storage handler for persistence |
| | default_corpus_id (str): Default corpus ID for operations |
| | default_index_type (str): Default index type for vector operations |
| | """ |
| | |
| | def __init__( |
| | self, |
| | storage_config: StoreConfig, |
| | rag_config: RAGConfig, |
| | default_corpus_id: str = "default", |
| | default_index_type: str = "vector", |
| | storage_handler: StorageHandler = None, |
| | file_handler: FileStorageHandler = None, |
| | **kwargs |
| | ): |
| | """ |
| | Initialize the FAISS database. |
| | |
| | Args: |
| | storage_config (StoreConfig): Configuration for storage backends |
| | rag_config (RAGConfig): Configuration for RAG pipeline |
| | default_corpus_id (str): Default corpus ID for operations |
| | default_index_type (str): Default index type for vector operations |
| | storage_handler (StorageHandler, optional): Storage handler for file operations |
| | **kwargs: Additional arguments for BaseModule |
| | """ |
| | super().__init__(**kwargs) |
| | |
| | |
| | self.storage_handler = StorageHandler(storageConfig=storage_config) |
| | |
| | |
| | self.rag_engine = RAGEngine(config=rag_config, storage_handler=self.storage_handler) |
| | |
| | |
| | |
| | if storage_handler is None: |
| | storage_handler = LocalStorageHandler(base_path="./workplace/storage") |
| | self.file_storage_handler = storage_handler |
| | |
| | |
| | self.default_corpus_id = default_corpus_id |
| | self.default_index_type = default_index_type |
| | |
| | logger.info(f"Initialized FAISS database with corpus_id: {default_corpus_id}") |
| | |
| | def query( |
| | self, |
| | query: str, |
| | corpus_id: Optional[str] = None, |
| | top_k: int = 5, |
| | similarity_threshold: float = 0.0, |
| | metadata_filters: Optional[Dict[str, Any]] = None |
| | ) -> Dict[str, Any]: |
| | """ |
| | Query the vector database with semantic search. |
| | |
| | Args: |
| | query (str): The query string to search for |
| | corpus_id (str, optional): Corpus ID to search in |
| | top_k (int): Number of top results to return |
| | similarity_threshold (float): Minimum similarity threshold |
| | metadata_filters (Dict[str, Any], optional): Metadata filters for search |
| | |
| | Returns: |
| | Dict[str, Any]: Search results with chunks and scores |
| | """ |
| | try: |
| | |
| | try: |
| | asyncio.get_running_loop() |
| | |
| | logger.info("Detected running event loop, using thread executor for query") |
| | with concurrent.futures.ThreadPoolExecutor() as executor: |
| | future = executor.submit(self._query_sync, query, corpus_id, top_k, similarity_threshold, metadata_filters) |
| | return future.result() |
| | except RuntimeError: |
| | |
| | logger.info("No event loop detected, using direct query processing") |
| | return self._query_sync(query, corpus_id, top_k, similarity_threshold, metadata_filters) |
| | |
| | except Exception as e: |
| | logger.error(f"Query failed: {str(e)}") |
| | return {"success": False, "error": str(e)} |
| | |
| | def _query_sync( |
| | self, |
| | query: str, |
| | corpus_id: Optional[str] = None, |
| | top_k: int = 5, |
| | similarity_threshold: float = 0.0, |
| | metadata_filters: Optional[Dict[str, Any]] = None |
| | ) -> Dict[str, Any]: |
| | """ |
| | Synchronous version of query that can be safely called from a thread. |
| | |
| | Args: |
| | query (str): The query string to search for |
| | corpus_id (str, optional): Corpus ID to search in |
| | top_k (int): Number of top results to return |
| | similarity_threshold (float): Minimum similarity threshold |
| | metadata_filters (Dict[str, Any], optional): Metadata filters for search |
| | |
| | Returns: |
| | Dict[str, Any]: Search results with chunks and scores |
| | """ |
| | try: |
| | corpus_id = corpus_id or self.default_corpus_id |
| | |
| | |
| | if corpus_id not in self.rag_engine.indices: |
| | logger.warning(f"Corpus {corpus_id} not found. Returning empty results.") |
| | return {"success": True, "data": { |
| | "query": query, |
| | "corpus_id": corpus_id, |
| | "total_results": 0, |
| | "results": [] |
| | }} |
| | |
| | |
| | query_obj = Query( |
| | query_str=query, |
| | top_k=top_k, |
| | similarity_cutoff=similarity_threshold, |
| | metadata_filters=metadata_filters |
| | ) |
| | |
| | |
| | results = self.rag_engine.query(query_obj, corpus_id=corpus_id) |
| | |
| | |
| | if not results or not results.corpus: |
| | logger.warning(f"Query returned no results for corpus {corpus_id}") |
| | return {"success": True, "data": { |
| | "query": query, |
| | "corpus_id": corpus_id, |
| | "total_results": 0, |
| | "results": [] |
| | }} |
| | |
| | |
| | chunks = results.corpus.chunks if results.corpus.chunks else [] |
| | formatted_results = { |
| | "query": query, |
| | "corpus_id": corpus_id, |
| | "total_results": len(chunks), |
| | "results": [] |
| | } |
| | |
| | for i, chunk in enumerate(chunks): |
| | score = results.scores[i] if results.scores and i < len(results.scores) else 0.0 |
| | formatted_results["results"].append({ |
| | "chunk_id": chunk.chunk_id, |
| | "content": chunk.text, |
| | "score": score, |
| | "metadata": chunk.metadata.model_dump() if chunk.metadata else {}, |
| | "doc_id": chunk.metadata.doc_id if chunk.metadata else None |
| | }) |
| | |
| | logger.info(f"Query executed successfully. Found {len(formatted_results['results'])} results.") |
| | return {"success": True, "data": formatted_results} |
| | |
| | except Exception as e: |
| | logger.error(f"Query failed: {str(e)}") |
| | return {"success": False, "error": str(e)} |
| | |
| | def _is_file_path(self, text: str) -> bool: |
| | """ |
| | Check if a string appears to be a file path. |
| | |
| | Args: |
| | text (str): The string to check |
| | |
| | Returns: |
| | bool: True if the string looks like a file path |
| | """ |
| | |
| | path_indicators = ['/', '\\', '.txt', '.pdf', '.md', '.doc', '.docx', '.csv', '.json', '.xml', '.html', '.htm'] |
| | return any(indicator in text for indicator in path_indicators) and os.path.exists(text) |
| | |
| | def _process_file_path(self, file_path: str, doc_index: int, metadata: Optional[Dict[str, Any]] = None) -> List[Document]: |
| | """ |
| | Process a file path and return Document objects. |
| | |
| | Args: |
| | file_path (str): Path to the file |
| | doc_index (int): Index of the document in the batch |
| | metadata (Dict[str, Any], optional): Additional metadata |
| | |
| | Returns: |
| | List[Document]: List of Document objects created from the file |
| | """ |
| | try: |
| | |
| | try: |
| | asyncio.get_running_loop() |
| | |
| | logger.info(f"Detected running event loop, using thread executor for {file_path}") |
| | with concurrent.futures.ThreadPoolExecutor() as executor: |
| | future = executor.submit(self._process_file_path_sync, file_path, doc_index, metadata) |
| | return future.result() |
| | except RuntimeError: |
| | |
| | logger.info(f"No event loop detected, using direct processing for {file_path}") |
| | return self._process_file_path_sync(file_path, doc_index, metadata) |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to process file {file_path}: {str(e)}") |
| | |
| | doc_metadata = metadata.copy() if metadata else {} |
| | doc_metadata.update({ |
| | "doc_index": doc_index, |
| | "insertion_time": datetime.now().isoformat(), |
| | "source_file": file_path, |
| | "error": str(e) |
| | }) |
| | |
| | document_metadata = DocumentMetadata(**doc_metadata) |
| | return [Document( |
| | text=f"Error reading file {file_path}: {str(e)}", |
| | metadata=document_metadata, |
| | doc_id=str(uuid4()) |
| | )] |
| |
|
| | def _process_file_path_sync(self, file_path: str, doc_index: int, metadata: Optional[Dict[str, Any]] = None) -> List[Document]: |
| | """ |
| | Synchronous version of file processing that can be safely called from a thread. |
| | |
| | Args: |
| | file_path (str): Path to the file |
| | doc_index (int): Index of the document in the batch |
| | metadata (Dict[str, Any], optional): Additional metadata |
| | |
| | Returns: |
| | List[Document]: List of Document objects created from the file |
| | """ |
| | try: |
| | |
| | if self.file_storage_handler: |
| | result = self.file_storage_handler.read(file_path) |
| | if result["success"]: |
| | file_content = result["content"] |
| | else: |
| | raise Exception(f"Failed to read file: {result.get('error', 'Unknown error')}") |
| | else: |
| | |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | file_content = f.read() |
| | |
| | |
| | temp_corpus_id = f"temp_file_{uuid4().hex[:8]}" |
| | |
| | |
| | temp_doc = Document( |
| | text=file_content, |
| | metadata=DocumentMetadata( |
| | source_file=file_path, |
| | doc_index=doc_index, |
| | insertion_time=datetime.now().isoformat() |
| | ), |
| | doc_id=str(uuid4()) |
| | ) |
| | |
| | |
| | corpus = self.rag_engine.process_documents([temp_doc], corpus_id=temp_corpus_id) |
| | |
| | |
| | documents = [] |
| | for chunk in corpus.chunks: |
| | doc_metadata = metadata.copy() if metadata else {} |
| | doc_metadata.update({ |
| | "doc_index": doc_index, |
| | "insertion_time": datetime.now().isoformat(), |
| | "source_file": file_path, |
| | "original_chunk_id": chunk.chunk_id |
| | }) |
| | |
| | |
| | document_metadata = DocumentMetadata(**doc_metadata) |
| | |
| | |
| | documents.append(Document( |
| | text=chunk.text, |
| | metadata=document_metadata, |
| | doc_id=chunk.chunk_id |
| | )) |
| | |
| | |
| | self.rag_engine.clear(corpus_id=temp_corpus_id) |
| | |
| | logger.info(f"Processed file {file_path} into {len(documents)} chunks") |
| | return documents |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to process file {file_path} in sync mode: {str(e)}") |
| | |
| | doc_metadata = metadata.copy() if metadata else {} |
| | doc_metadata.update({ |
| | "doc_index": doc_index, |
| | "insertion_time": datetime.now().isoformat(), |
| | "source_file": file_path, |
| | "error": str(e) |
| | }) |
| | |
| | document_metadata = DocumentMetadata(**doc_metadata) |
| | return [Document( |
| | text=f"Error reading file {file_path}: {str(e)}", |
| | metadata=document_metadata, |
| | doc_id=str(uuid4()) |
| | )] |
| | |
| | def insert( |
| | self, |
| | documents: list, |
| | corpus_id: Optional[str] = None, |
| | metadata: Optional[Dict[str, Any]] = None, |
| | batch_size: int = 100 |
| | ) -> Dict[str, Any]: |
| | """ |
| | Insert documents into the vector database. |
| | |
| | Args: |
| | documents (Union[List[str], List[Dict[str, Any]]]): Documents to insert. |
| | Strings can be either text content or file paths (if they look like paths and exist) |
| | corpus_id (str, optional): Corpus ID to insert into |
| | metadata (Dict[str, Any], optional): Additional metadata for all documents |
| | batch_size (int): Batch size for processing |
| | |
| | Returns: |
| | Dict[str, Any]: Insertion results |
| | """ |
| | try: |
| | corpus_id = corpus_id or self.default_corpus_id |
| | |
| | |
| | processed_docs = [] |
| | file_paths_processed = [] |
| | |
| | for i, doc in enumerate(documents): |
| | if isinstance(doc, str): |
| | |
| | if self._is_file_path(doc): |
| | logger.info(f"Detected file path: {doc}") |
| | file_docs = self._process_file_path(doc, i, metadata) |
| | processed_docs.extend(file_docs) |
| | file_paths_processed.append(doc) |
| | else: |
| | |
| | doc_metadata = metadata.copy() if metadata else {} |
| | doc_metadata.update({ |
| | "doc_index": i, |
| | "insertion_time": datetime.now().isoformat() |
| | }) |
| | |
| | document_metadata = DocumentMetadata(**doc_metadata) |
| | |
| | processed_docs.append(Document( |
| | text=doc, |
| | metadata=document_metadata, |
| | doc_id=str(uuid4()) |
| | )) |
| | elif isinstance(doc, dict): |
| | doc_metadata = metadata.copy() if metadata else {} |
| | doc_metadata.update(doc.get("metadata", {})) |
| | doc_metadata.update({ |
| | "doc_index": i, |
| | "insertion_time": datetime.now().isoformat() |
| | }) |
| | |
| | document_metadata = DocumentMetadata(**doc_metadata) |
| | |
| | processed_docs.append(Document( |
| | text=doc.get("text", ""), |
| | metadata=document_metadata, |
| | doc_id=doc.get("doc_id", str(uuid4())) |
| | )) |
| | |
| | |
| | corpus = Corpus(corpus_id=corpus_id) |
| | |
| | |
| | total_processed = 0 |
| | for i in range(0, len(processed_docs), batch_size): |
| | batch = processed_docs[i:i+batch_size] |
| | |
| | |
| | batch_corpus = self.rag_engine.chunker.chunk(batch) |
| | batch_corpus.corpus_id = corpus_id |
| | |
| | |
| | self.rag_engine.add(self.default_index_type, batch_corpus, corpus_id=corpus_id) |
| | |
| | |
| | corpus.chunks.extend(batch_corpus.chunks) |
| | total_processed += len(batch) |
| | |
| | logger.info(f"Processed batch {i//batch_size + 1}, total processed: {total_processed}") |
| | |
| | |
| | self.rag_engine.save(corpus_id=corpus_id, index_type=self.default_index_type) |
| | |
| | result = { |
| | "corpus_id": corpus_id, |
| | "documents_inserted": len(documents), |
| | "chunks_created": len(corpus.chunks), |
| | "total_processed": total_processed, |
| | "file_paths_processed": file_paths_processed |
| | } |
| | |
| | logger.info(f"Successfully inserted {len(documents)} documents into corpus {corpus_id}") |
| | if file_paths_processed: |
| | logger.info(f"Processed {len(file_paths_processed)} file paths: {file_paths_processed}") |
| | return {"success": True, "data": result} |
| | |
| | except Exception as e: |
| | logger.error(f"Insert failed: {str(e)}") |
| | return {"success": False, "error": str(e)} |
| | |
| | def delete( |
| | self, |
| | corpus_id: Optional[str] = None, |
| | doc_ids: Optional[List[str]] = None, |
| | metadata_filters: Optional[Dict[str, Any]] = None, |
| | clear_all: bool = False |
| | ) -> Dict[str, Any]: |
| | """ |
| | Delete documents or chunks from the vector database. |
| | |
| | Args: |
| | corpus_id (str, optional): Corpus ID to delete from |
| | doc_ids (List[str], optional): Document IDs to delete |
| | metadata_filters (Dict[str, Any], optional): Metadata filters for deletion |
| | clear_all (bool): Whether to clear the entire corpus |
| | |
| | Returns: |
| | Dict[str, Any]: Deletion results |
| | """ |
| | try: |
| | corpus_id = corpus_id or self.default_corpus_id |
| | |
| | if clear_all: |
| | |
| | self.rag_engine.clear(corpus_id=corpus_id) |
| | logger.info(f"Cleared entire corpus: {corpus_id}") |
| | return {"success": True, "data": {"operation": "clear_all", "corpus_id": corpus_id}} |
| | |
| | |
| | if corpus_id not in self.rag_engine.indices: |
| | logger.warning(f"Corpus {corpus_id} not found. Nothing to delete.") |
| | return {"success": True, "data": {"operation": "selective_delete", "corpus_id": corpus_id, "message": "Corpus not found, nothing to delete"}} |
| | |
| | |
| | if doc_ids or metadata_filters: |
| | |
| | self.rag_engine.delete( |
| | corpus_id=corpus_id, |
| | index_type=self.default_index_type, |
| | node_ids=doc_ids, |
| | metadata_filters=metadata_filters |
| | ) |
| | |
| | result = { |
| | "corpus_id": corpus_id, |
| | "operation": "selective_delete", |
| | "doc_ids": doc_ids, |
| | "metadata_filters": metadata_filters |
| | } |
| | |
| | logger.info(f"Successfully deleted from corpus {corpus_id}") |
| | return {"success": True, "data": result} |
| | else: |
| | |
| | logger.warning(f"No deletion criteria provided for corpus {corpus_id}") |
| | return {"success": True, "data": {"operation": "selective_delete", "corpus_id": corpus_id, "message": "No deletion criteria provided"}} |
| | |
| | except Exception as e: |
| | logger.error(f"Delete failed: {str(e)}") |
| | return {"success": False, "error": str(e)} |
| | |
| | def list_corpora(self) -> Dict[str, Any]: |
| | """ |
| | List all available corpora and their metadata. |
| | |
| | Returns: |
| | Dict[str, Any]: List of corpora with metadata |
| | """ |
| | try: |
| | corpora = [] |
| | |
| | |
| | for corpus_id, indices in self.rag_engine.indices.items(): |
| | corpus_info = { |
| | "corpus_id": corpus_id, |
| | "index_types": list(indices.keys()), |
| | "retrievers": list(self.rag_engine.retrievers.get(corpus_id, {}).keys()) |
| | } |
| | corpora.append(corpus_info) |
| | |
| | return {"success": True, "data": {"corpora": corpora, "total": len(corpora)}} |
| | |
| | except Exception as e: |
| | logger.error(f"List corpora failed: {str(e)}") |
| | return {"success": False, "error": str(e)} |
| | |
| | def get_stats(self, corpus_id: Optional[str] = None) -> Dict[str, Any]: |
| | """ |
| | Get statistics about the database or a specific corpus. |
| | |
| | Args: |
| | corpus_id (str, optional): Corpus ID to get stats for |
| | |
| | Returns: |
| | Dict[str, Any]: Database statistics |
| | """ |
| | try: |
| | if corpus_id: |
| | |
| | corpus_id = corpus_id or self.default_corpus_id |
| | |
| | stats = { |
| | "corpus_id": corpus_id, |
| | "exists": corpus_id in self.rag_engine.indices, |
| | "index_types": list(self.rag_engine.indices.get(corpus_id, {}).keys()), |
| | "retrievers": list(self.rag_engine.retrievers.get(corpus_id, {}).keys()) |
| | } |
| | |
| | |
| | if corpus_id in self.rag_engine.indices: |
| | vector_index = self.rag_engine.indices[corpus_id].get(self.default_index_type) |
| | if vector_index and hasattr(vector_index, 'get_index'): |
| | try: |
| | index = vector_index.get_index() |
| | if hasattr(index, 'vector_store'): |
| | vector_store = index.vector_store |
| | if hasattr(vector_store, 'faiss_index'): |
| | stats["vector_count"] = vector_store.faiss_index.ntotal |
| | stats["dimensions"] = vector_store.faiss_index.d |
| | except Exception: |
| | pass |
| | |
| | return {"success": True, "data": stats} |
| | else: |
| | |
| | stats = { |
| | "total_corpora": len(self.rag_engine.indices), |
| | "corpora": list(self.rag_engine.indices.keys()), |
| | "embedding_model": self.rag_engine.config.embedding.model_name, |
| | "vector_store_type": self.rag_engine.storage_handler.storageConfig.vectorConfig.vector_name if self.rag_engine.storage_handler.storageConfig.vectorConfig else None |
| | } |
| | |
| | return {"success": True, "data": stats} |
| | |
| | except Exception as e: |
| | logger.error(f"Get stats failed: {str(e)}") |
| | return {"success": False, "error": str(e)} |
| |
|
| |
|
| | class FaissQueryTool(Tool): |
| | """Tool for querying the FAISS vector database with semantic search.""" |
| | |
| | name: str = "faiss_query" |
| | description: str = "Query the FAISS vector database using semantic search to find relevant documents and chunks" |
| | inputs: Dict[str, Dict[str, Any]] = { |
| | "query": { |
| | "type": "string", |
| | "description": "The search query text to find semantically similar content" |
| | }, |
| | "corpus_id": { |
| | "type": "string", |
| | "description": "Optional corpus ID to search in. If not provided, uses default corpus" |
| | }, |
| | "top_k": { |
| | "type": "integer", |
| | "description": "Number of top results to return (default: 5)", |
| | "default": 5 |
| | }, |
| | "similarity_threshold": { |
| | "type": "number", |
| | "description": "Minimum similarity threshold for results (default: 0.0)", |
| | "default": 0.0 |
| | }, |
| | "metadata_filters": { |
| | "type": "object", |
| | "description": "Optional metadata filters to apply to search results (e.g., {'source': 'file1.txt'})" |
| | } |
| | } |
| | required: Optional[List[str]] = ["query"] |
| | |
| | def __init__(self, faiss_database: FaissDatabase = None): |
| | super().__init__() |
| | self.faiss_database = faiss_database |
| | |
| | def __call__( |
| | self, |
| | query: str, |
| | corpus_id: str = None, |
| | top_k: int = 5, |
| | similarity_threshold: float = 0.0, |
| | metadata_filters: dict = None |
| | ) -> Dict[str, Any]: |
| | """Execute the query operation.""" |
| | return self.faiss_database.query( |
| | query=query, |
| | corpus_id=corpus_id, |
| | top_k=top_k, |
| | similarity_threshold=similarity_threshold, |
| | metadata_filters=metadata_filters |
| | ) |
| |
|
| |
|
| | class FaissInsertTool(Tool): |
| | """Tool for inserting documents into the FAISS vector database.""" |
| | |
| | name: str = "faiss_insert" |
| | description: str = "Insert documents into the FAISS vector database with automatic chunking and embedding. Supports both text content and file paths - if a string looks like a file path and exists, it will automatically read and process the file content." |
| | inputs: Dict[str, Dict[str, Any]] = { |
| | "documents": { |
| | "type": "array", |
| | "description": "Array of documents to insert. Can be strings (text content or file paths), or objects with 'text', 'metadata', and 'doc_id' fields. If a string contains path separators or file extensions and the file exists, it will be treated as a file path and its content will be read and processed." |
| | }, |
| | "corpus_id": { |
| | "type": "string", |
| | "description": "Optional corpus ID to insert into. If not provided, uses default corpus" |
| | }, |
| | "metadata": { |
| | "type": "object", |
| | "description": "Optional metadata to add to all documents (e.g., {'source': 'file1.txt', 'category': 'research'})" |
| | }, |
| | "batch_size": { |
| | "type": "integer", |
| | "description": "Batch size for processing documents (default: 100)", |
| | "default": 100 |
| | } |
| | } |
| | required: Optional[List[str]] = ["documents"] |
| | |
| | def __init__(self, faiss_database: FaissDatabase = None): |
| | super().__init__() |
| | self.faiss_database = faiss_database |
| | |
| | def __call__( |
| | self, |
| | documents: list, |
| | corpus_id: str = None, |
| | metadata: dict = None, |
| | batch_size: int = 100 |
| | ) -> Dict[str, Any]: |
| | """Execute the insert operation.""" |
| | return self.faiss_database.insert( |
| | documents=documents, |
| | corpus_id=corpus_id, |
| | metadata=metadata, |
| | batch_size=batch_size |
| | ) |
| |
|
| |
|
| | class FaissDeleteTool(Tool): |
| | """Tool for deleting documents from the FAISS vector database.""" |
| | |
| | name: str = "faiss_delete" |
| | description: str = "Delete documents or chunks from the FAISS vector database. You can delete specific documents by ID, filter by metadata, or clear the entire corpus." |
| | inputs: Dict[str, Dict[str, Any]] = { |
| | "corpus_id": { |
| | "type": "string", |
| | "description": "Optional corpus ID to delete from. If not provided, uses default corpus" |
| | }, |
| | "doc_ids": { |
| | "type": "array", |
| | "description": "Optional list of document IDs to delete. Use this to delete specific documents", |
| | "items": {"type": "string"} |
| | }, |
| | "metadata_filters": { |
| | "type": "object", |
| | "description": "Optional metadata filters to select documents for deletion (e.g., {'source': 'file1.txt'})" |
| | }, |
| | "clear_all": { |
| | "type": "boolean", |
| | "description": "Set to true to clear the entire corpus. WARNING: This will delete all documents in the corpus", |
| | "default": False |
| | } |
| | } |
| | required: Optional[List[str]] = [] |
| | |
| | def __init__(self, faiss_database: FaissDatabase = None): |
| | super().__init__() |
| | self.faiss_database = faiss_database |
| | |
| | def __call__( |
| | self, |
| | corpus_id: str = None, |
| | doc_ids: list = None, |
| | metadata_filters: dict = None, |
| | clear_all: bool = False |
| | ) -> Dict[str, Any]: |
| | """Execute the delete operation.""" |
| | return self.faiss_database.delete( |
| | corpus_id=corpus_id, |
| | doc_ids=doc_ids, |
| | metadata_filters=metadata_filters, |
| | clear_all=clear_all |
| | ) |
| |
|
| |
|
| | class FaissListTool(Tool): |
| | """Tool for listing available corpora in the FAISS vector database.""" |
| | |
| | name: str = "faiss_list" |
| | description: str = "List all available corpora and their metadata in the FAISS vector database. This tool takes no parameters." |
| | inputs: Dict[str, Dict[str, Any]] = {} |
| | required: Optional[List[str]] = [] |
| | |
| | def __init__(self, faiss_database: FaissDatabase = None): |
| | super().__init__() |
| | self.faiss_database = faiss_database |
| | |
| | def __call__(self) -> Dict[str, Any]: |
| | """Execute the list operation.""" |
| | return self.faiss_database.list_corpora() |
| |
|
| |
|
| | class FaissStatsTool(Tool): |
| | """Tool for getting statistics about the FAISS vector database.""" |
| | |
| | name: str = "faiss_stats" |
| | description: str = "Get statistics about the FAISS vector database or a specific corpus. Optionally provide a corpus_id to get stats for a specific corpus." |
| | inputs: Dict[str, Dict[str, Any]] = { |
| | "corpus_id": { |
| | "type": "string", |
| | "description": "Optional corpus ID to get statistics for. If not provided, returns global statistics" |
| | } |
| | } |
| | required: Optional[List[str]] = [] |
| | |
| | def __init__(self, faiss_database: FaissDatabase = None): |
| | super().__init__() |
| | self.faiss_database = faiss_database |
| | |
| | def __call__(self, corpus_id: str = None) -> Dict[str, Any]: |
| | """Execute the stats operation.""" |
| | return self.faiss_database.get_stats(corpus_id=corpus_id) |
| |
|
| |
|
| | class FaissToolkit(Toolkit): |
| | """ |
| | Toolkit for FAISS vector database operations. |
| | |
| | This toolkit provides a comprehensive set of tools for interacting with FAISS vector databases, |
| | including semantic search, document insertion, deletion, and database management operations. |
| | |
| | The toolkit integrates with the existing RAG engine and storage infrastructure to provide |
| | a unified interface for vector database operations that can be easily used by agents. |
| | """ |
| | |
| | def __init__( |
| | self, |
| | name: str = "FaissToolkit", |
| | storage_config: Optional[StoreConfig] = None, |
| | rag_config: Optional[RAGConfig] = None, |
| | default_corpus_id: str = "default", |
| | default_index_type: str = "vector", |
| | db_path: Optional[str] = None, |
| | storage_handler: StorageHandler = None, |
| | file_handler: FileStorageHandler = None, |
| | **kwargs |
| | ): |
| | """ |
| | Initialize the FAISS toolkit. |
| | |
| | Args: |
| | name (str): Name of the toolkit |
| | storage_config (StoreConfig, optional): Configuration for storage backends |
| | rag_config (RAGConfig, optional): Configuration for RAG pipeline |
| | default_corpus_id (str): Default corpus ID for operations |
| | default_index_type (str): Default index type for vector operations |
| | db_path (str, optional): Custom database path. If provided, will check for existing database or create new one |
| | storage_handler (StorageHandler, optional): Storage handler for file operations |
| | file_handler (FileStorageHandler, optional): File handler for file operations |
| | **kwargs: Additional arguments |
| | """ |
| | |
| | if storage_config is None: |
| | storage_config = _create_default_storage_config(db_path) |
| | |
| | if rag_config is None: |
| | rag_config = _create_default_rag_config() |
| | |
| | |
| | faiss_database = FaissDatabase( |
| | storage_config=storage_config, |
| | rag_config=rag_config, |
| | default_corpus_id=default_corpus_id, |
| | default_index_type=default_index_type, |
| | storage_handler=storage_handler, |
| | file_handler=file_handler |
| | ) |
| | |
| | |
| | tools = [ |
| | FaissQueryTool(faiss_database), |
| | FaissInsertTool(faiss_database), |
| | FaissDeleteTool(faiss_database), |
| | FaissListTool(faiss_database), |
| | FaissStatsTool(faiss_database) |
| | ] |
| | |
| | super().__init__(name=name, tools=tools, **kwargs) |
| | |
| | |
| | self.faiss_database = faiss_database |
| | |
| | logger.info(f"Initialized {name} with {len(tools)} tools") |
| | |
| | def get_database(self) -> FaissDatabase: |
| | """ |
| | Get the underlying FAISS database instance. |
| | |
| | Returns: |
| | FaissDatabase: The FAISS database instance |
| | """ |
| | return self.faiss_database |