NLP-RAG / data /ingest.py
Qar-Raz's picture
Sync backend Docker context from GitHub main
c64aaec verified
"""
Script to ingest CBT book data into Pinecone vector database.
Ingests the book 6 times with different chunking formats for ablation study.
All chunks are stored in a SINGLE index with metadata to differentiate.
Run this once before starting the API server.
"""
import os
import time
from dotenv import load_dotenv
from config_loader import cfg
from data.data_loader import load_cbt_book, get_book_stats
from data.vector_db import get_pinecone_index, refresh_pinecone_index
from retriever.processor import ChunkProcessor
# 6 different chunking techniques for ablation study
CHUNKING_TECHNIQUES = [
{
"name": "fixed",
"description": "Fixed-size chunking - splits every N characters (may cut sentences mid-way)",
"chunk_size": 1000,
"chunk_overlap": 100,
"kwargs": {"separator": ""}, # No separator for fixed splitting
},
{
"name": "sentence",
"description": "Sentence-level chunking - respects sentence boundaries (NLTK)",
"chunk_size": 1000,
"chunk_overlap": 100,
"kwargs": {},
},
{
"name": "paragraph",
"description": "Paragraph-level chunking - uses natural paragraph breaks",
"chunk_size": 2500,
"chunk_overlap": 100,
"kwargs": {"separator": "\n\n"}, # Split on paragraph breaks
},
{
"name": "semantic",
"description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)",
"chunk_size": 2000,
"chunk_overlap": 100,
"kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70},
},
{
"name": "semantic",
"description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)",
"chunk_size": 2000,
"chunk_overlap": 100,
"kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70},
},
{
"name": "recursive",
"description": "Recursive chunking - hierarchical splitting (paragraphs → sentences → words → chars)",
"chunk_size": 2000,
"chunk_overlap": 100,
"kwargs": {"separators": ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " ", ""], "keep_separator": True},
},
{
"name": "page",
"description": "Page-level chunking - uses entire book pages as-is",
"chunk_size": 10000, # Very large to keep full pages
"chunk_overlap": 0, # No overlap between pages
"kwargs": {"separator": "--- Page"}, # Split on page markers
},
{
"name": "markdown",
"description": "Markdown header chunking - splits by headers (#, ##, ###, ####) with 4k char limit",
"chunk_size": 4000, # Max 4k chars per chunk
"chunk_overlap": 0, # No overlap for markdown
"kwargs": {}, # Custom implementation
},
]
def ingest_single_technique(
raw_data,
proc,
technique_config,
technique_index,
total_techniques,
):
"""Chunk the book using a single technique and return chunks with metadata."""
technique_name = technique_config["name"]
chunk_size = technique_config["chunk_size"]
chunk_overlap = technique_config["chunk_overlap"]
kwargs = technique_config.get("kwargs", {})
print(f"\n[{technique_index}/{total_techniques}] Processing '{technique_name}'...")
print(f" Description: {technique_config['description']}")
print(f" Chunk size: {chunk_size}, Overlap: {chunk_overlap}")
# Chunk and embed
final_chunks = proc.process(
raw_data,
technique=technique_name,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
max_docs=cfg.project.get("doc_limit"),
verbose=False,
**kwargs,
)
# Add technique metadata to each chunk for differentiation
# Prefix ID with technique name to ensure uniqueness across techniques
for chunk in final_chunks:
chunk["metadata"]["chunking_technique"] = technique_name
chunk["id"] = f"{technique_name}-{chunk['id']}"
print(f" Created {len(final_chunks)} chunks")
return final_chunks
def ingest_data():
"""Load CBT book, chunk it 6 ways, and upload ALL to a SINGLE Pinecone index.
Returns:
Tuple of (all_chunks, configured_technique_chunks, processor) for reuse in retrieval pipeline.
"""
load_dotenv()
pinecone_key = os.getenv("PINECONE_API_KEY")
if not pinecone_key:
raise RuntimeError("PINECONE_API_KEY not found in environment variables")
print("=" * 80)
print("CBT BOOK INGESTION PIPELINE - 6 TECHNIQUES (SINGLE INDEX)")
print("=" * 80)
print(f"\nTechniques to process: {len(CHUNKING_TECHNIQUES)}")
for i, tech in enumerate(CHUNKING_TECHNIQUES, 1):
print(f" {i}. {tech['name']}: {tech['description']}")
print(f"\nAll chunks will be stored in a SINGLE index: {cfg.db['base_index_name']}-{cfg.processing['technique']}")
print("Chunks are differentiated by 'chunking_technique' metadata field.")
# 1. Load the CBT book (once, reused for all techniques)
print(f"\n{'='*80}")
print("STEP 1: LOADING CBT BOOK")
print(f"{'='*80}")
print("\nLoading CBT book from EntireBookCleaned.txt...")
raw_data = load_cbt_book("data/EntireBookCleaned.txt")
stats = get_book_stats(raw_data)
print(f" Loaded {stats['total_pages']} pages")
print(f" Total characters: {stats['total_characters']:,}")
print(f" Average chars per page: {stats['average_chars_per_page']:.0f}")
# 2. Initialize processor (once, reused for all techniques)
print(f"\nInitializing embedding model: {cfg.processing['embedding_model']}")
proc = ChunkProcessor(model_name=cfg.processing['embedding_model'], verbose=False)
# 3. Process each technique sequentially and collect all chunks
print(f"\n{'='*80}")
print("STEP 2: CHUNKING WITH 6 TECHNIQUES")
print(f"{'='*80}")
all_chunks = []
configured_technique_chunks = []
results = {}
for i, technique in enumerate(CHUNKING_TECHNIQUES, 1):
try:
chunks = ingest_single_technique(
raw_data=raw_data,
proc=proc,
technique_config=technique,
technique_index=i,
total_techniques=len(CHUNKING_TECHNIQUES),
)
all_chunks.extend(chunks)
# Save chunks for the configured technique (for retrieval pipeline)
if technique["name"] == cfg.processing['technique']:
configured_technique_chunks = chunks
results[technique["name"]] = {
"status": "success",
"chunks": len(chunks),
}
# Wait between techniques to avoid rate limits (for embedding API)
if i < len(CHUNKING_TECHNIQUES):
print(f" Waiting 5 seconds before next technique (rate limit protection)...")
import time
time.sleep(5)
except Exception as e:
print(f" ERROR with technique '{technique['name']}': {e}")
results[technique["name"]] = {
"status": "failed",
"error": str(e),
}
# 4. Upload ALL chunks to a SINGLE Pinecone index
print(f"\n{'='*80}")
print("STEP 3: UPLOADING TO SINGLE PINECONE INDEX")
print(f"{'='*80}")
index_name = f"{cfg.db['base_index_name']}-{cfg.processing['technique']}"
print(f"\nIndex name: {index_name}")
print(f"Dimension: {cfg.db['dimension']}")
print(f"Metric: {cfg.db['metric']}")
print(f"Total chunks to upload: {len(all_chunks)}")
index = get_pinecone_index(
pinecone_key,
cfg.db['base_index_name'],
technique=cfg.processing['technique'],
dimension=cfg.db['dimension'],
metric=cfg.db['metric'],
)
print("Uploading " + str(len(all_chunks)) + " vectors to Pinecone...")
refresh_pinecone_index(index, all_chunks, batch_size=cfg.db['batch_size'])
# Upload sparse vectors to a separate index
print("Preparing to upload sparse vectors for BM25...")
try:
from pinecone import Pinecone, ServerlessSpec
try:
from pinecone_text.sparse import BM25Encoder
except ImportError:
print("Skipping BM25 indexing - run pip install pinecone-text")
return all_chunks, configured_technique_chunks, proc, index
pc = Pinecone(api_key=pinecone_key)
sparse_index_name = "cbt-book-sparse"
existing_indexes = [idx.name for idx in pc.list_indexes()]
if sparse_index_name not in existing_indexes:
print(f"Creating sparse index: {sparse_index_name}")
pc.create_index(
name=sparse_index_name,
dimension=512, # required space-filler dimension
metric="dotproduct",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# wait for index
import time
while not pc.describe_index(sparse_index_name).status["ready"]:
time.sleep(1)
sparse_index = pc.Index(sparse_index_name)
# Encode sparse vectors
print("Encoding sparse vectors...")
bm25 = BM25Encoder().default()
sparse_chunks = []
# Learn BM25
corpus = [chunk["metadata"]["text"] for chunk in all_chunks]
bm25.fit(corpus)
for chunk in all_chunks:
sparse_values = bm25.encode_documents(chunk["metadata"]["text"])
# Skip empty sparse vectors to prevent Pinecone errors
if not sparse_values.get("indices") or len(sparse_values.get("indices", [])) == 0:
continue
new_chunk = {
"id": chunk["id"],
"sparse_values": sparse_values,
"metadata": chunk["metadata"]
}
sparse_chunks.append(new_chunk)
print(f"Upserting {len(sparse_chunks)} valid sparse vectors to {sparse_index_name}...")
# Upsert sparse vectors
if sparse_chunks:
batch_size = cfg.db.get("batch_size", 100)
for i in range(0, len(sparse_chunks), batch_size):
batch = sparse_chunks[i:i+batch_size]
sparse_index.upsert(vectors=batch)
print("Sparse vector upsert complete.")
else:
print("No valid sparse vectors to upsert.")
except Exception as e:
print(f"Error during sparse vector upload: {e}")
# 5. Summary
print(f"\n{'='*80}")
print("INGESTION COMPLETE - SUMMARY")
print(f"{'='*80}")
print(f"\n{'Technique':<15} {'Status':<12} {'Chunks':<10}")
print("-" * 40)
total_chunks = 0
for tech in CHUNKING_TECHNIQUES:
name = tech["name"]
result = results.get(name, {})
status = result.get("status", "unknown")
chunks = result.get("chunks", 0)
if status == "success":
total_chunks += chunks
print(f"{name:<15} {status:<12} {chunks:<10}")
print("-" * 40)
print(f"{'TOTAL':<15} {'':<12} {total_chunks:<10}")
print(f"\nSingle index: {index_name}")
print(f"Total vectors: {len(all_chunks)}")
print("\nChunks can be filtered by 'chunking_technique' metadata field:")
for tech in CHUNKING_TECHNIQUES:
if results.get(tech["name"], {}).get("status") == "success":
print(f" - chunking_technique: '{tech['name']}'")
print("\nYou can now start the API server with:")
print(" python -m uvicorn api:app --host 0.0.0.0 --port 8000")
# Return chunks and processor for reuse in retrieval pipeline
return all_chunks, configured_technique_chunks, proc, index
if __name__ == "__main__":
ingest_data()