""" Script to ingest CBT book data into Pinecone vector database. Ingests the book 6 times with different chunking formats for ablation study. All chunks are stored in a SINGLE index with metadata to differentiate. Run this once before starting the API server. """ import os import time from dotenv import load_dotenv from config_loader import cfg from data.data_loader import load_cbt_book, get_book_stats from data.vector_db import get_pinecone_index, refresh_pinecone_index from retriever.processor import ChunkProcessor # 6 different chunking techniques for ablation study CHUNKING_TECHNIQUES = [ { "name": "fixed", "description": "Fixed-size chunking - splits every N characters (may cut sentences mid-way)", "chunk_size": 1000, "chunk_overlap": 100, "kwargs": {"separator": ""}, # No separator for fixed splitting }, { "name": "sentence", "description": "Sentence-level chunking - respects sentence boundaries (NLTK)", "chunk_size": 1000, "chunk_overlap": 100, "kwargs": {}, }, { "name": "paragraph", "description": "Paragraph-level chunking - uses natural paragraph breaks", "chunk_size": 2500, "chunk_overlap": 100, "kwargs": {"separator": "\n\n"}, # Split on paragraph breaks }, { "name": "semantic", "description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)", "chunk_size": 2000, "chunk_overlap": 100, "kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70}, }, { "name": "semantic", "description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)", "chunk_size": 2000, "chunk_overlap": 100, "kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70}, }, { "name": "recursive", "description": "Recursive chunking - hierarchical splitting (paragraphs → sentences → words → chars)", "chunk_size": 2000, "chunk_overlap": 100, "kwargs": {"separators": ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " ", ""], "keep_separator": True}, }, { "name": "page", "description": "Page-level chunking - uses entire book pages as-is", "chunk_size": 10000, # Very large to keep full pages "chunk_overlap": 0, # No overlap between pages "kwargs": {"separator": "--- Page"}, # Split on page markers }, { "name": "markdown", "description": "Markdown header chunking - splits by headers (#, ##, ###, ####) with 4k char limit", "chunk_size": 4000, # Max 4k chars per chunk "chunk_overlap": 0, # No overlap for markdown "kwargs": {}, # Custom implementation }, ] def ingest_single_technique( raw_data, proc, technique_config, technique_index, total_techniques, ): """Chunk the book using a single technique and return chunks with metadata.""" technique_name = technique_config["name"] chunk_size = technique_config["chunk_size"] chunk_overlap = technique_config["chunk_overlap"] kwargs = technique_config.get("kwargs", {}) print(f"\n[{technique_index}/{total_techniques}] Processing '{technique_name}'...") print(f" Description: {technique_config['description']}") print(f" Chunk size: {chunk_size}, Overlap: {chunk_overlap}") # Chunk and embed final_chunks = proc.process( raw_data, technique=technique_name, chunk_size=chunk_size, chunk_overlap=chunk_overlap, max_docs=cfg.project.get("doc_limit"), verbose=False, **kwargs, ) # Add technique metadata to each chunk for differentiation # Prefix ID with technique name to ensure uniqueness across techniques for chunk in final_chunks: chunk["metadata"]["chunking_technique"] = technique_name chunk["id"] = f"{technique_name}-{chunk['id']}" print(f" Created {len(final_chunks)} chunks") return final_chunks def ingest_data(): """Load CBT book, chunk it 6 ways, and upload ALL to a SINGLE Pinecone index. Returns: Tuple of (all_chunks, configured_technique_chunks, processor) for reuse in retrieval pipeline. """ load_dotenv() pinecone_key = os.getenv("PINECONE_API_KEY") if not pinecone_key: raise RuntimeError("PINECONE_API_KEY not found in environment variables") print("=" * 80) print("CBT BOOK INGESTION PIPELINE - 6 TECHNIQUES (SINGLE INDEX)") print("=" * 80) print(f"\nTechniques to process: {len(CHUNKING_TECHNIQUES)}") for i, tech in enumerate(CHUNKING_TECHNIQUES, 1): print(f" {i}. {tech['name']}: {tech['description']}") print(f"\nAll chunks will be stored in a SINGLE index: {cfg.db['base_index_name']}-{cfg.processing['technique']}") print("Chunks are differentiated by 'chunking_technique' metadata field.") # 1. Load the CBT book (once, reused for all techniques) print(f"\n{'='*80}") print("STEP 1: LOADING CBT BOOK") print(f"{'='*80}") print("\nLoading CBT book from EntireBookCleaned.txt...") raw_data = load_cbt_book("data/EntireBookCleaned.txt") stats = get_book_stats(raw_data) print(f" Loaded {stats['total_pages']} pages") print(f" Total characters: {stats['total_characters']:,}") print(f" Average chars per page: {stats['average_chars_per_page']:.0f}") # 2. Initialize processor (once, reused for all techniques) print(f"\nInitializing embedding model: {cfg.processing['embedding_model']}") proc = ChunkProcessor(model_name=cfg.processing['embedding_model'], verbose=False) # 3. Process each technique sequentially and collect all chunks print(f"\n{'='*80}") print("STEP 2: CHUNKING WITH 6 TECHNIQUES") print(f"{'='*80}") all_chunks = [] configured_technique_chunks = [] results = {} for i, technique in enumerate(CHUNKING_TECHNIQUES, 1): try: chunks = ingest_single_technique( raw_data=raw_data, proc=proc, technique_config=technique, technique_index=i, total_techniques=len(CHUNKING_TECHNIQUES), ) all_chunks.extend(chunks) # Save chunks for the configured technique (for retrieval pipeline) if technique["name"] == cfg.processing['technique']: configured_technique_chunks = chunks results[technique["name"]] = { "status": "success", "chunks": len(chunks), } # Wait between techniques to avoid rate limits (for embedding API) if i < len(CHUNKING_TECHNIQUES): print(f" Waiting 5 seconds before next technique (rate limit protection)...") import time time.sleep(5) except Exception as e: print(f" ERROR with technique '{technique['name']}': {e}") results[technique["name"]] = { "status": "failed", "error": str(e), } # 4. Upload ALL chunks to a SINGLE Pinecone index print(f"\n{'='*80}") print("STEP 3: UPLOADING TO SINGLE PINECONE INDEX") print(f"{'='*80}") index_name = f"{cfg.db['base_index_name']}-{cfg.processing['technique']}" print(f"\nIndex name: {index_name}") print(f"Dimension: {cfg.db['dimension']}") print(f"Metric: {cfg.db['metric']}") print(f"Total chunks to upload: {len(all_chunks)}") index = get_pinecone_index( pinecone_key, cfg.db['base_index_name'], technique=cfg.processing['technique'], dimension=cfg.db['dimension'], metric=cfg.db['metric'], ) print("Uploading " + str(len(all_chunks)) + " vectors to Pinecone...") refresh_pinecone_index(index, all_chunks, batch_size=cfg.db['batch_size']) # Upload sparse vectors to a separate index print("Preparing to upload sparse vectors for BM25...") try: from pinecone import Pinecone, ServerlessSpec try: from pinecone_text.sparse import BM25Encoder except ImportError: print("Skipping BM25 indexing - run pip install pinecone-text") return all_chunks, configured_technique_chunks, proc, index pc = Pinecone(api_key=pinecone_key) sparse_index_name = "cbt-book-sparse" existing_indexes = [idx.name for idx in pc.list_indexes()] if sparse_index_name not in existing_indexes: print(f"Creating sparse index: {sparse_index_name}") pc.create_index( name=sparse_index_name, dimension=512, # required space-filler dimension metric="dotproduct", spec=ServerlessSpec(cloud="aws", region="us-east-1") ) # wait for index import time while not pc.describe_index(sparse_index_name).status["ready"]: time.sleep(1) sparse_index = pc.Index(sparse_index_name) # Encode sparse vectors print("Encoding sparse vectors...") bm25 = BM25Encoder().default() sparse_chunks = [] # Learn BM25 corpus = [chunk["metadata"]["text"] for chunk in all_chunks] bm25.fit(corpus) for chunk in all_chunks: sparse_values = bm25.encode_documents(chunk["metadata"]["text"]) # Skip empty sparse vectors to prevent Pinecone errors if not sparse_values.get("indices") or len(sparse_values.get("indices", [])) == 0: continue new_chunk = { "id": chunk["id"], "sparse_values": sparse_values, "metadata": chunk["metadata"] } sparse_chunks.append(new_chunk) print(f"Upserting {len(sparse_chunks)} valid sparse vectors to {sparse_index_name}...") # Upsert sparse vectors if sparse_chunks: batch_size = cfg.db.get("batch_size", 100) for i in range(0, len(sparse_chunks), batch_size): batch = sparse_chunks[i:i+batch_size] sparse_index.upsert(vectors=batch) print("Sparse vector upsert complete.") else: print("No valid sparse vectors to upsert.") except Exception as e: print(f"Error during sparse vector upload: {e}") # 5. Summary print(f"\n{'='*80}") print("INGESTION COMPLETE - SUMMARY") print(f"{'='*80}") print(f"\n{'Technique':<15} {'Status':<12} {'Chunks':<10}") print("-" * 40) total_chunks = 0 for tech in CHUNKING_TECHNIQUES: name = tech["name"] result = results.get(name, {}) status = result.get("status", "unknown") chunks = result.get("chunks", 0) if status == "success": total_chunks += chunks print(f"{name:<15} {status:<12} {chunks:<10}") print("-" * 40) print(f"{'TOTAL':<15} {'':<12} {total_chunks:<10}") print(f"\nSingle index: {index_name}") print(f"Total vectors: {len(all_chunks)}") print("\nChunks can be filtered by 'chunking_technique' metadata field:") for tech in CHUNKING_TECHNIQUES: if results.get(tech["name"], {}).get("status") == "success": print(f" - chunking_technique: '{tech['name']}'") print("\nYou can now start the API server with:") print(" python -m uvicorn api:app --host 0.0.0.0 --port 8000") # Return chunks and processor for reuse in retrieval pipeline return all_chunks, configured_technique_chunks, proc, index if __name__ == "__main__": ingest_data()