File size: 11,927 Bytes
f1d2c2b c64aaec f1d2c2b 8f37cc7 f1d2c2b 8f37cc7 f1d2c2b c64aaec f1d2c2b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 | """
Script to ingest CBT book data into Pinecone vector database.
Ingests the book 6 times with different chunking formats for ablation study.
All chunks are stored in a SINGLE index with metadata to differentiate.
Run this once before starting the API server.
"""
import os
import time
from dotenv import load_dotenv
from config_loader import cfg
from data.data_loader import load_cbt_book, get_book_stats
from data.vector_db import get_pinecone_index, refresh_pinecone_index
from retriever.processor import ChunkProcessor
# 6 different chunking techniques for ablation study
CHUNKING_TECHNIQUES = [
{
"name": "fixed",
"description": "Fixed-size chunking - splits every N characters (may cut sentences mid-way)",
"chunk_size": 1000,
"chunk_overlap": 100,
"kwargs": {"separator": ""}, # No separator for fixed splitting
},
{
"name": "sentence",
"description": "Sentence-level chunking - respects sentence boundaries (NLTK)",
"chunk_size": 1000,
"chunk_overlap": 100,
"kwargs": {},
},
{
"name": "paragraph",
"description": "Paragraph-level chunking - uses natural paragraph breaks",
"chunk_size": 2500,
"chunk_overlap": 100,
"kwargs": {"separator": "\n\n"}, # Split on paragraph breaks
},
{
"name": "semantic",
"description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)",
"chunk_size": 2000,
"chunk_overlap": 100,
"kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70},
},
{
"name": "semantic",
"description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)",
"chunk_size": 2000,
"chunk_overlap": 100,
"kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70},
},
{
"name": "recursive",
"description": "Recursive chunking - hierarchical splitting (paragraphs → sentences → words → chars)",
"chunk_size": 2000,
"chunk_overlap": 100,
"kwargs": {"separators": ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " ", ""], "keep_separator": True},
},
{
"name": "page",
"description": "Page-level chunking - uses entire book pages as-is",
"chunk_size": 10000, # Very large to keep full pages
"chunk_overlap": 0, # No overlap between pages
"kwargs": {"separator": "--- Page"}, # Split on page markers
},
{
"name": "markdown",
"description": "Markdown header chunking - splits by headers (#, ##, ###, ####) with 4k char limit",
"chunk_size": 4000, # Max 4k chars per chunk
"chunk_overlap": 0, # No overlap for markdown
"kwargs": {}, # Custom implementation
},
]
def ingest_single_technique(
raw_data,
proc,
technique_config,
technique_index,
total_techniques,
):
"""Chunk the book using a single technique and return chunks with metadata."""
technique_name = technique_config["name"]
chunk_size = technique_config["chunk_size"]
chunk_overlap = technique_config["chunk_overlap"]
kwargs = technique_config.get("kwargs", {})
print(f"\n[{technique_index}/{total_techniques}] Processing '{technique_name}'...")
print(f" Description: {technique_config['description']}")
print(f" Chunk size: {chunk_size}, Overlap: {chunk_overlap}")
# Chunk and embed
final_chunks = proc.process(
raw_data,
technique=technique_name,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
max_docs=cfg.project.get("doc_limit"),
verbose=False,
**kwargs,
)
# Add technique metadata to each chunk for differentiation
# Prefix ID with technique name to ensure uniqueness across techniques
for chunk in final_chunks:
chunk["metadata"]["chunking_technique"] = technique_name
chunk["id"] = f"{technique_name}-{chunk['id']}"
print(f" Created {len(final_chunks)} chunks")
return final_chunks
def ingest_data():
"""Load CBT book, chunk it 6 ways, and upload ALL to a SINGLE Pinecone index.
Returns:
Tuple of (all_chunks, configured_technique_chunks, processor) for reuse in retrieval pipeline.
"""
load_dotenv()
pinecone_key = os.getenv("PINECONE_API_KEY")
if not pinecone_key:
raise RuntimeError("PINECONE_API_KEY not found in environment variables")
print("=" * 80)
print("CBT BOOK INGESTION PIPELINE - 6 TECHNIQUES (SINGLE INDEX)")
print("=" * 80)
print(f"\nTechniques to process: {len(CHUNKING_TECHNIQUES)}")
for i, tech in enumerate(CHUNKING_TECHNIQUES, 1):
print(f" {i}. {tech['name']}: {tech['description']}")
print(f"\nAll chunks will be stored in a SINGLE index: {cfg.db['base_index_name']}-{cfg.processing['technique']}")
print("Chunks are differentiated by 'chunking_technique' metadata field.")
# 1. Load the CBT book (once, reused for all techniques)
print(f"\n{'='*80}")
print("STEP 1: LOADING CBT BOOK")
print(f"{'='*80}")
print("\nLoading CBT book from EntireBookCleaned.txt...")
raw_data = load_cbt_book("data/EntireBookCleaned.txt")
stats = get_book_stats(raw_data)
print(f" Loaded {stats['total_pages']} pages")
print(f" Total characters: {stats['total_characters']:,}")
print(f" Average chars per page: {stats['average_chars_per_page']:.0f}")
# 2. Initialize processor (once, reused for all techniques)
print(f"\nInitializing embedding model: {cfg.processing['embedding_model']}")
proc = ChunkProcessor(model_name=cfg.processing['embedding_model'], verbose=False)
# 3. Process each technique sequentially and collect all chunks
print(f"\n{'='*80}")
print("STEP 2: CHUNKING WITH 6 TECHNIQUES")
print(f"{'='*80}")
all_chunks = []
configured_technique_chunks = []
results = {}
for i, technique in enumerate(CHUNKING_TECHNIQUES, 1):
try:
chunks = ingest_single_technique(
raw_data=raw_data,
proc=proc,
technique_config=technique,
technique_index=i,
total_techniques=len(CHUNKING_TECHNIQUES),
)
all_chunks.extend(chunks)
# Save chunks for the configured technique (for retrieval pipeline)
if technique["name"] == cfg.processing['technique']:
configured_technique_chunks = chunks
results[technique["name"]] = {
"status": "success",
"chunks": len(chunks),
}
# Wait between techniques to avoid rate limits (for embedding API)
if i < len(CHUNKING_TECHNIQUES):
print(f" Waiting 5 seconds before next technique (rate limit protection)...")
import time
time.sleep(5)
except Exception as e:
print(f" ERROR with technique '{technique['name']}': {e}")
results[technique["name"]] = {
"status": "failed",
"error": str(e),
}
# 4. Upload ALL chunks to a SINGLE Pinecone index
print(f"\n{'='*80}")
print("STEP 3: UPLOADING TO SINGLE PINECONE INDEX")
print(f"{'='*80}")
index_name = f"{cfg.db['base_index_name']}-{cfg.processing['technique']}"
print(f"\nIndex name: {index_name}")
print(f"Dimension: {cfg.db['dimension']}")
print(f"Metric: {cfg.db['metric']}")
print(f"Total chunks to upload: {len(all_chunks)}")
index = get_pinecone_index(
pinecone_key,
cfg.db['base_index_name'],
technique=cfg.processing['technique'],
dimension=cfg.db['dimension'],
metric=cfg.db['metric'],
)
print("Uploading " + str(len(all_chunks)) + " vectors to Pinecone...")
refresh_pinecone_index(index, all_chunks, batch_size=cfg.db['batch_size'])
# Upload sparse vectors to a separate index
print("Preparing to upload sparse vectors for BM25...")
try:
from pinecone import Pinecone, ServerlessSpec
try:
from pinecone_text.sparse import BM25Encoder
except ImportError:
print("Skipping BM25 indexing - run pip install pinecone-text")
return all_chunks, configured_technique_chunks, proc, index
pc = Pinecone(api_key=pinecone_key)
sparse_index_name = "cbt-book-sparse"
existing_indexes = [idx.name for idx in pc.list_indexes()]
if sparse_index_name not in existing_indexes:
print(f"Creating sparse index: {sparse_index_name}")
pc.create_index(
name=sparse_index_name,
dimension=512, # required space-filler dimension
metric="dotproduct",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# wait for index
import time
while not pc.describe_index(sparse_index_name).status["ready"]:
time.sleep(1)
sparse_index = pc.Index(sparse_index_name)
# Encode sparse vectors
print("Encoding sparse vectors...")
bm25 = BM25Encoder().default()
sparse_chunks = []
# Learn BM25
corpus = [chunk["metadata"]["text"] for chunk in all_chunks]
bm25.fit(corpus)
for chunk in all_chunks:
sparse_values = bm25.encode_documents(chunk["metadata"]["text"])
# Skip empty sparse vectors to prevent Pinecone errors
if not sparse_values.get("indices") or len(sparse_values.get("indices", [])) == 0:
continue
new_chunk = {
"id": chunk["id"],
"sparse_values": sparse_values,
"metadata": chunk["metadata"]
}
sparse_chunks.append(new_chunk)
print(f"Upserting {len(sparse_chunks)} valid sparse vectors to {sparse_index_name}...")
# Upsert sparse vectors
if sparse_chunks:
batch_size = cfg.db.get("batch_size", 100)
for i in range(0, len(sparse_chunks), batch_size):
batch = sparse_chunks[i:i+batch_size]
sparse_index.upsert(vectors=batch)
print("Sparse vector upsert complete.")
else:
print("No valid sparse vectors to upsert.")
except Exception as e:
print(f"Error during sparse vector upload: {e}")
# 5. Summary
print(f"\n{'='*80}")
print("INGESTION COMPLETE - SUMMARY")
print(f"{'='*80}")
print(f"\n{'Technique':<15} {'Status':<12} {'Chunks':<10}")
print("-" * 40)
total_chunks = 0
for tech in CHUNKING_TECHNIQUES:
name = tech["name"]
result = results.get(name, {})
status = result.get("status", "unknown")
chunks = result.get("chunks", 0)
if status == "success":
total_chunks += chunks
print(f"{name:<15} {status:<12} {chunks:<10}")
print("-" * 40)
print(f"{'TOTAL':<15} {'':<12} {total_chunks:<10}")
print(f"\nSingle index: {index_name}")
print(f"Total vectors: {len(all_chunks)}")
print("\nChunks can be filtered by 'chunking_technique' metadata field:")
for tech in CHUNKING_TECHNIQUES:
if results.get(tech["name"], {}).get("status") == "success":
print(f" - chunking_technique: '{tech['name']}'")
print("\nYou can now start the API server with:")
print(" python -m uvicorn api:app --host 0.0.0.0 --port 8000")
# Return chunks and processor for reuse in retrieval pipeline
return all_chunks, configured_technique_chunks, proc, index
if __name__ == "__main__":
ingest_data() |