| | """ |
| | Vector Store Interface and ChromaDB Implementation |
| | |
| | Provides: |
| | - Abstract VectorStore interface |
| | - ChromaDB implementation with local persistence |
| | - Chunk storage with metadata |
| | """ |
| |
|
| | from abc import ABC, abstractmethod |
| | from typing import List, Optional, Dict, Any, Tuple |
| | from pathlib import Path |
| | from pydantic import BaseModel, Field |
| | from loguru import logger |
| | import hashlib |
| | import json |
| |
|
| | try: |
| | import chromadb |
| | from chromadb.config import Settings |
| | CHROMADB_AVAILABLE = True |
| | except ImportError: |
| | CHROMADB_AVAILABLE = False |
| | logger.warning("ChromaDB not available. Install with: pip install chromadb") |
| |
|
| |
|
| | class VectorStoreConfig(BaseModel): |
| | """Configuration for vector store.""" |
| | |
| | persist_directory: str = Field( |
| | default="./data/vectorstore", |
| | description="Directory for persistent storage" |
| | ) |
| | collection_name: str = Field( |
| | default="sparknet_documents", |
| | description="Name of the collection" |
| | ) |
| |
|
| | |
| | default_top_k: int = Field(default=5, ge=1, description="Default number of results") |
| | similarity_threshold: float = Field( |
| | default=0.7, |
| | ge=0.0, |
| | le=1.0, |
| | description="Minimum similarity score" |
| | ) |
| |
|
| | |
| | anonymized_telemetry: bool = Field(default=False) |
| |
|
| |
|
| | class VectorSearchResult(BaseModel): |
| | """Result from vector search.""" |
| | chunk_id: str |
| | document_id: str |
| | text: str |
| | metadata: Dict[str, Any] |
| | similarity: float |
| |
|
| | |
| | page: Optional[int] = None |
| | bbox: Optional[Dict[str, float]] = None |
| | chunk_type: Optional[str] = None |
| |
|
| |
|
| | class VectorStore(ABC): |
| | """Abstract interface for vector stores.""" |
| |
|
| | @abstractmethod |
| | def add_chunks( |
| | self, |
| | chunks: List[Dict[str, Any]], |
| | embeddings: List[List[float]], |
| | ) -> List[str]: |
| | """ |
| | Add chunks with embeddings to the store. |
| | |
| | Args: |
| | chunks: List of chunk dictionaries with text and metadata |
| | embeddings: Corresponding embeddings |
| | |
| | Returns: |
| | List of stored chunk IDs |
| | """ |
| | pass |
| |
|
| | @abstractmethod |
| | def search( |
| | self, |
| | query_embedding: List[float], |
| | top_k: int = 5, |
| | filters: Optional[Dict[str, Any]] = None, |
| | ) -> List[VectorSearchResult]: |
| | """ |
| | Search for similar chunks. |
| | |
| | Args: |
| | query_embedding: Query vector |
| | top_k: Number of results |
| | filters: Optional metadata filters |
| | |
| | Returns: |
| | List of search results |
| | """ |
| | pass |
| |
|
| | @abstractmethod |
| | def delete_document(self, document_id: str) -> int: |
| | """ |
| | Delete all chunks for a document. |
| | |
| | Args: |
| | document_id: Document ID to delete |
| | |
| | Returns: |
| | Number of chunks deleted |
| | """ |
| | pass |
| |
|
| | @abstractmethod |
| | def get_chunk(self, chunk_id: str) -> Optional[Dict[str, Any]]: |
| | """Get a specific chunk by ID.""" |
| | pass |
| |
|
| | @abstractmethod |
| | def count(self, document_id: Optional[str] = None) -> int: |
| | """Count chunks in store, optionally filtered by document.""" |
| | pass |
| |
|
| |
|
| | class ChromaVectorStore(VectorStore): |
| | """ |
| | ChromaDB implementation of vector store. |
| | |
| | Features: |
| | - Local persistent storage |
| | - Metadata filtering |
| | - Similarity search with cosine distance |
| | """ |
| |
|
| | def __init__(self, config: Optional[VectorStoreConfig] = None): |
| | """Initialize ChromaDB store.""" |
| | if not CHROMADB_AVAILABLE: |
| | raise ImportError("ChromaDB is required. Install with: pip install chromadb") |
| |
|
| | self.config = config or VectorStoreConfig() |
| |
|
| | |
| | persist_path = Path(self.config.persist_directory) |
| | persist_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | self._client = chromadb.PersistentClient( |
| | path=str(persist_path), |
| | settings=Settings( |
| | anonymized_telemetry=self.config.anonymized_telemetry, |
| | ) |
| | ) |
| |
|
| | |
| | self._collection = self._client.get_or_create_collection( |
| | name=self.config.collection_name, |
| | metadata={"hnsw:space": "cosine"} |
| | ) |
| |
|
| | logger.info( |
| | f"ChromaDB initialized: {self.config.collection_name} " |
| | f"({self._collection.count()} chunks)" |
| | ) |
| |
|
| | def add_chunks( |
| | self, |
| | chunks: List[Dict[str, Any]], |
| | embeddings: List[List[float]], |
| | ) -> List[str]: |
| | """Add chunks with embeddings.""" |
| | if not chunks: |
| | return [] |
| |
|
| | if len(chunks) != len(embeddings): |
| | raise ValueError( |
| | f"Chunks ({len(chunks)}) and embeddings ({len(embeddings)}) " |
| | "must have same length" |
| | ) |
| |
|
| | ids = [] |
| | documents = [] |
| | metadatas = [] |
| |
|
| | for chunk in chunks: |
| | |
| | chunk_id = chunk.get("chunk_id") |
| | if not chunk_id: |
| | |
| | content = f"{chunk.get('document_id', '')}-{chunk.get('text', '')[:100]}" |
| | chunk_id = hashlib.md5(content.encode()).hexdigest()[:16] |
| |
|
| | ids.append(chunk_id) |
| | documents.append(chunk.get("text", "")) |
| |
|
| | |
| | metadata = { |
| | "document_id": chunk.get("document_id", ""), |
| | "source_path": chunk.get("source_path", ""), |
| | "chunk_type": chunk.get("chunk_type", "text"), |
| | "page": chunk.get("page", 0), |
| | "sequence_index": chunk.get("sequence_index", 0), |
| | "confidence": chunk.get("confidence", 1.0), |
| | } |
| |
|
| | |
| | if "bbox" in chunk and chunk["bbox"]: |
| | bbox = chunk["bbox"] |
| | if hasattr(bbox, "model_dump"): |
| | metadata["bbox_json"] = json.dumps(bbox.model_dump()) |
| | elif isinstance(bbox, dict): |
| | metadata["bbox_json"] = json.dumps(bbox) |
| |
|
| | metadatas.append(metadata) |
| |
|
| | |
| | self._collection.add( |
| | ids=ids, |
| | embeddings=embeddings, |
| | documents=documents, |
| | metadatas=metadatas, |
| | ) |
| |
|
| | logger.debug(f"Added {len(ids)} chunks to vector store") |
| | return ids |
| |
|
| | def search( |
| | self, |
| | query_embedding: List[float], |
| | top_k: int = 5, |
| | filters: Optional[Dict[str, Any]] = None, |
| | ) -> List[VectorSearchResult]: |
| | """Search for similar chunks.""" |
| | |
| | where = None |
| | if filters: |
| | where = self._build_where_clause(filters) |
| |
|
| | |
| | results = self._collection.query( |
| | query_embeddings=[query_embedding], |
| | n_results=top_k, |
| | where=where, |
| | include=["documents", "metadatas", "distances"], |
| | ) |
| |
|
| | |
| | search_results = [] |
| |
|
| | if results["ids"] and results["ids"][0]: |
| | for i, chunk_id in enumerate(results["ids"][0]): |
| | |
| | distance = results["distances"][0][i] if results["distances"] else 0 |
| | similarity = 1 - distance |
| |
|
| | |
| | if similarity < self.config.similarity_threshold: |
| | continue |
| |
|
| | metadata = results["metadatas"][0][i] if results["metadatas"] else {} |
| |
|
| | |
| | bbox = None |
| | if "bbox_json" in metadata: |
| | try: |
| | bbox = json.loads(metadata["bbox_json"]) |
| | except: |
| | pass |
| |
|
| | result = VectorSearchResult( |
| | chunk_id=chunk_id, |
| | document_id=metadata.get("document_id", ""), |
| | text=results["documents"][0][i] if results["documents"] else "", |
| | metadata=metadata, |
| | similarity=similarity, |
| | page=metadata.get("page"), |
| | bbox=bbox, |
| | chunk_type=metadata.get("chunk_type"), |
| | ) |
| | search_results.append(result) |
| |
|
| | return search_results |
| |
|
| | def _build_where_clause(self, filters: Dict[str, Any]) -> Dict[str, Any]: |
| | """Build ChromaDB where clause from filters.""" |
| | conditions = [] |
| |
|
| | for key, value in filters.items(): |
| | if key == "document_id": |
| | conditions.append({"document_id": {"$eq": value}}) |
| | elif key == "chunk_type": |
| | if isinstance(value, list): |
| | conditions.append({"chunk_type": {"$in": value}}) |
| | else: |
| | conditions.append({"chunk_type": {"$eq": value}}) |
| | elif key == "page": |
| | if isinstance(value, dict): |
| | |
| | if "min" in value: |
| | conditions.append({"page": {"$gte": value["min"]}}) |
| | if "max" in value: |
| | conditions.append({"page": {"$lte": value["max"]}}) |
| | else: |
| | conditions.append({"page": {"$eq": value}}) |
| | elif key == "confidence_min": |
| | conditions.append({"confidence": {"$gte": value}}) |
| |
|
| | if len(conditions) == 0: |
| | return None |
| | elif len(conditions) == 1: |
| | return conditions[0] |
| | else: |
| | return {"$and": conditions} |
| |
|
| | def delete_document(self, document_id: str) -> int: |
| | """Delete all chunks for a document.""" |
| | |
| | results = self._collection.get( |
| | where={"document_id": {"$eq": document_id}}, |
| | include=[], |
| | ) |
| |
|
| | if not results["ids"]: |
| | return 0 |
| |
|
| | count = len(results["ids"]) |
| |
|
| | |
| | self._collection.delete(ids=results["ids"]) |
| |
|
| | logger.info(f"Deleted {count} chunks for document {document_id}") |
| | return count |
| |
|
| | def get_chunk(self, chunk_id: str) -> Optional[Dict[str, Any]]: |
| | """Get a specific chunk by ID.""" |
| | results = self._collection.get( |
| | ids=[chunk_id], |
| | include=["documents", "metadatas"], |
| | ) |
| |
|
| | if not results["ids"]: |
| | return None |
| |
|
| | metadata = results["metadatas"][0] if results["metadatas"] else {} |
| |
|
| | return { |
| | "chunk_id": chunk_id, |
| | "text": results["documents"][0] if results["documents"] else "", |
| | **metadata, |
| | } |
| |
|
| | def count(self, document_id: Optional[str] = None) -> int: |
| | """Count chunks in store.""" |
| | if document_id: |
| | results = self._collection.get( |
| | where={"document_id": {"$eq": document_id}}, |
| | include=[], |
| | ) |
| | return len(results["ids"]) if results["ids"] else 0 |
| | return self._collection.count() |
| |
|
| | def list_documents(self) -> List[str]: |
| | """List all unique document IDs in the store.""" |
| | results = self._collection.get(include=["metadatas"]) |
| |
|
| | if not results["metadatas"]: |
| | return [] |
| |
|
| | doc_ids = set() |
| | for meta in results["metadatas"]: |
| | if meta and "document_id" in meta: |
| | doc_ids.add(meta["document_id"]) |
| |
|
| | return list(doc_ids) |
| |
|
| |
|
| | |
| | _vector_store: Optional[VectorStore] = None |
| |
|
| |
|
| | def get_vector_store( |
| | config: Optional[VectorStoreConfig] = None, |
| | store_type: str = "chromadb", |
| | ) -> VectorStore: |
| | """ |
| | Get or create singleton vector store. |
| | |
| | Args: |
| | config: Store configuration |
| | store_type: Type of store ("chromadb") |
| | |
| | Returns: |
| | VectorStore instance |
| | """ |
| | global _vector_store |
| |
|
| | if _vector_store is None: |
| | if store_type == "chromadb": |
| | _vector_store = ChromaVectorStore(config) |
| | else: |
| | raise ValueError(f"Unknown store type: {store_type}") |
| |
|
| | return _vector_store |
| |
|
| |
|
| | def reset_vector_store(): |
| | """Reset the global vector store instance.""" |
| | global _vector_store |
| | _vector_store = None |
| |
|