| """ |
| Script to ingest CBT book data into Pinecone vector database. |
| Ingests the book 6 times with different chunking formats for ablation study. |
| All chunks are stored in a SINGLE index with metadata to differentiate. |
| Run this once before starting the API server. |
| """ |
| import os |
| import time |
| from dotenv import load_dotenv |
| from config_loader import cfg |
| from data.data_loader import load_cbt_book, get_book_stats |
| from data.vector_db import get_pinecone_index, refresh_pinecone_index |
| from retriever.processor import ChunkProcessor |
|
|
|
|
| |
| CHUNKING_TECHNIQUES = [ |
| { |
| "name": "fixed", |
| "description": "Fixed-size chunking - splits every N characters (may cut sentences mid-way)", |
| "chunk_size": 1000, |
| "chunk_overlap": 100, |
| "kwargs": {"separator": ""}, |
| }, |
| { |
| "name": "sentence", |
| "description": "Sentence-level chunking - respects sentence boundaries (NLTK)", |
| "chunk_size": 1000, |
| "chunk_overlap": 100, |
| "kwargs": {}, |
| }, |
| { |
| "name": "paragraph", |
| "description": "Paragraph-level chunking - uses natural paragraph breaks", |
| "chunk_size": 2500, |
| "chunk_overlap": 100, |
| "kwargs": {"separator": "\n\n"}, |
| }, |
| { |
| "name": "semantic", |
| "description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)", |
| "chunk_size": 2000, |
| "chunk_overlap": 100, |
| "kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70}, |
| }, |
| { |
| "name": "semantic", |
| "description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)", |
| "chunk_size": 2000, |
| "chunk_overlap": 100, |
| "kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70}, |
| }, |
| { |
| "name": "recursive", |
| "description": "Recursive chunking - hierarchical splitting (paragraphs → sentences → words → chars)", |
| "chunk_size": 2000, |
| "chunk_overlap": 100, |
| "kwargs": {"separators": ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " ", ""], "keep_separator": True}, |
| }, |
| { |
| "name": "page", |
| "description": "Page-level chunking - uses entire book pages as-is", |
| "chunk_size": 10000, |
| "chunk_overlap": 0, |
| "kwargs": {"separator": "--- Page"}, |
| }, |
| { |
| "name": "markdown", |
| "description": "Markdown header chunking - splits by headers (#, ##, ###, ####) with 4k char limit", |
| "chunk_size": 4000, |
| "chunk_overlap": 0, |
| "kwargs": {}, |
| }, |
| ] |
|
|
|
|
| def ingest_single_technique( |
| raw_data, |
| proc, |
| technique_config, |
| technique_index, |
| total_techniques, |
| ): |
| """Chunk the book using a single technique and return chunks with metadata.""" |
| technique_name = technique_config["name"] |
| chunk_size = technique_config["chunk_size"] |
| chunk_overlap = technique_config["chunk_overlap"] |
| kwargs = technique_config.get("kwargs", {}) |
|
|
| print(f"\n[{technique_index}/{total_techniques}] Processing '{technique_name}'...") |
| print(f" Description: {technique_config['description']}") |
| print(f" Chunk size: {chunk_size}, Overlap: {chunk_overlap}") |
|
|
| |
| final_chunks = proc.process( |
| raw_data, |
| technique=technique_name, |
| chunk_size=chunk_size, |
| chunk_overlap=chunk_overlap, |
| max_docs=cfg.project.get("doc_limit"), |
| verbose=False, |
| **kwargs, |
| ) |
|
|
| |
| |
| for chunk in final_chunks: |
| chunk["metadata"]["chunking_technique"] = technique_name |
| chunk["id"] = f"{technique_name}-{chunk['id']}" |
|
|
| print(f" Created {len(final_chunks)} chunks") |
|
|
| return final_chunks |
|
|
|
|
| def ingest_data(): |
| """Load CBT book, chunk it 6 ways, and upload ALL to a SINGLE Pinecone index. |
| |
| Returns: |
| Tuple of (all_chunks, configured_technique_chunks, processor) for reuse in retrieval pipeline. |
| """ |
| load_dotenv() |
|
|
| pinecone_key = os.getenv("PINECONE_API_KEY") |
| if not pinecone_key: |
| raise RuntimeError("PINECONE_API_KEY not found in environment variables") |
|
|
| print("=" * 80) |
| print("CBT BOOK INGESTION PIPELINE - 6 TECHNIQUES (SINGLE INDEX)") |
| print("=" * 80) |
| print(f"\nTechniques to process: {len(CHUNKING_TECHNIQUES)}") |
| for i, tech in enumerate(CHUNKING_TECHNIQUES, 1): |
| print(f" {i}. {tech['name']}: {tech['description']}") |
| print(f"\nAll chunks will be stored in a SINGLE index: {cfg.db['base_index_name']}-{cfg.processing['technique']}") |
| print("Chunks are differentiated by 'chunking_technique' metadata field.") |
|
|
| |
| print(f"\n{'='*80}") |
| print("STEP 1: LOADING CBT BOOK") |
| print(f"{'='*80}") |
| print("\nLoading CBT book from EntireBookCleaned.txt...") |
| raw_data = load_cbt_book("data/EntireBookCleaned.txt") |
| stats = get_book_stats(raw_data) |
| print(f" Loaded {stats['total_pages']} pages") |
| print(f" Total characters: {stats['total_characters']:,}") |
| print(f" Average chars per page: {stats['average_chars_per_page']:.0f}") |
|
|
| |
| print(f"\nInitializing embedding model: {cfg.processing['embedding_model']}") |
| proc = ChunkProcessor(model_name=cfg.processing['embedding_model'], verbose=False) |
|
|
| |
| print(f"\n{'='*80}") |
| print("STEP 2: CHUNKING WITH 6 TECHNIQUES") |
| print(f"{'='*80}") |
|
|
| all_chunks = [] |
| configured_technique_chunks = [] |
| results = {} |
|
|
| for i, technique in enumerate(CHUNKING_TECHNIQUES, 1): |
| try: |
| chunks = ingest_single_technique( |
| raw_data=raw_data, |
| proc=proc, |
| technique_config=technique, |
| technique_index=i, |
| total_techniques=len(CHUNKING_TECHNIQUES), |
| ) |
| all_chunks.extend(chunks) |
| |
| |
| if technique["name"] == cfg.processing['technique']: |
| configured_technique_chunks = chunks |
| |
| results[technique["name"]] = { |
| "status": "success", |
| "chunks": len(chunks), |
| } |
|
|
| |
| if i < len(CHUNKING_TECHNIQUES): |
| print(f" Waiting 5 seconds before next technique (rate limit protection)...") |
| import time |
| time.sleep(5) |
|
|
| except Exception as e: |
| print(f" ERROR with technique '{technique['name']}': {e}") |
| results[technique["name"]] = { |
| "status": "failed", |
| "error": str(e), |
| } |
|
|
| |
| print(f"\n{'='*80}") |
| print("STEP 3: UPLOADING TO SINGLE PINECONE INDEX") |
| print(f"{'='*80}") |
|
|
| index_name = f"{cfg.db['base_index_name']}-{cfg.processing['technique']}" |
| print(f"\nIndex name: {index_name}") |
| print(f"Dimension: {cfg.db['dimension']}") |
| print(f"Metric: {cfg.db['metric']}") |
| print(f"Total chunks to upload: {len(all_chunks)}") |
|
|
| index = get_pinecone_index( |
| pinecone_key, |
| cfg.db['base_index_name'], |
| technique=cfg.processing['technique'], |
| dimension=cfg.db['dimension'], |
| metric=cfg.db['metric'], |
| ) |
|
|
| print("Uploading " + str(len(all_chunks)) + " vectors to Pinecone...") |
| refresh_pinecone_index(index, all_chunks, batch_size=cfg.db['batch_size']) |
|
|
| |
|
|
| print("Preparing to upload sparse vectors for BM25...") |
| try: |
| from pinecone import Pinecone, ServerlessSpec |
| try: |
| from pinecone_text.sparse import BM25Encoder |
| except ImportError: |
| print("Skipping BM25 indexing - run pip install pinecone-text") |
| return all_chunks, configured_technique_chunks, proc, index |
| pc = Pinecone(api_key=pinecone_key) |
| |
| sparse_index_name = "cbt-book-sparse" |
| existing_indexes = [idx.name for idx in pc.list_indexes()] |
| if sparse_index_name not in existing_indexes: |
| print(f"Creating sparse index: {sparse_index_name}") |
| pc.create_index( |
| name=sparse_index_name, |
| dimension=512, |
| metric="dotproduct", |
| spec=ServerlessSpec(cloud="aws", region="us-east-1") |
| ) |
| |
| import time |
| while not pc.describe_index(sparse_index_name).status["ready"]: |
| time.sleep(1) |
| |
| sparse_index = pc.Index(sparse_index_name) |
| |
| |
| print("Encoding sparse vectors...") |
| bm25 = BM25Encoder().default() |
| sparse_chunks = [] |
| |
| |
| corpus = [chunk["metadata"]["text"] for chunk in all_chunks] |
| bm25.fit(corpus) |
| |
| for chunk in all_chunks: |
| sparse_values = bm25.encode_documents(chunk["metadata"]["text"]) |
| |
| |
| if not sparse_values.get("indices") or len(sparse_values.get("indices", [])) == 0: |
| continue |
| |
| new_chunk = { |
| "id": chunk["id"], |
| |
| "sparse_values": sparse_values, |
| "metadata": chunk["metadata"] |
| } |
| sparse_chunks.append(new_chunk) |
| |
| print(f"Upserting {len(sparse_chunks)} valid sparse vectors to {sparse_index_name}...") |
| |
| |
| if sparse_chunks: |
| batch_size = cfg.db.get("batch_size", 100) |
| for i in range(0, len(sparse_chunks), batch_size): |
| batch = sparse_chunks[i:i+batch_size] |
| sparse_index.upsert(vectors=batch) |
| print("Sparse vector upsert complete.") |
| else: |
| print("No valid sparse vectors to upsert.") |
| |
| except Exception as e: |
| print(f"Error during sparse vector upload: {e}") |
|
|
| |
| print(f"\n{'='*80}") |
| print("INGESTION COMPLETE - SUMMARY") |
| print(f"{'='*80}") |
| print(f"\n{'Technique':<15} {'Status':<12} {'Chunks':<10}") |
| print("-" * 40) |
| total_chunks = 0 |
| for tech in CHUNKING_TECHNIQUES: |
| name = tech["name"] |
| result = results.get(name, {}) |
| status = result.get("status", "unknown") |
| chunks = result.get("chunks", 0) |
| if status == "success": |
| total_chunks += chunks |
| print(f"{name:<15} {status:<12} {chunks:<10}") |
| print("-" * 40) |
| print(f"{'TOTAL':<15} {'':<12} {total_chunks:<10}") |
|
|
| print(f"\nSingle index: {index_name}") |
| print(f"Total vectors: {len(all_chunks)}") |
| print("\nChunks can be filtered by 'chunking_technique' metadata field:") |
| for tech in CHUNKING_TECHNIQUES: |
| if results.get(tech["name"], {}).get("status") == "success": |
| print(f" - chunking_technique: '{tech['name']}'") |
|
|
| print("\nYou can now start the API server with:") |
| print(" python -m uvicorn api:app --host 0.0.0.0 --port 8000") |
| |
| |
| return all_chunks, configured_technique_chunks, proc, index |
|
|
|
|
| if __name__ == "__main__": |
| ingest_data() |