File size: 4,532 Bytes
463fc7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Local Codebase Pipeline Runner - Processes local codebases for dataset creation.

This is the main entry point for processing LOCAL CODEBASES (not Git repos).
It orchestrates the entire chunking pipeline for local files, handling both
code files and documentation with intelligent fallback strategies.

ARCHITECTURE POSITION:
    - Local Pipeline Orchestrator: Coordinates local file processing
    - Fallback Handler: Intelligent fallback from code to documentation
    - Dataset Exporter: Creates final JSONL datasets with statistics

KEY FEATURES:
    1. Unified processing of Python files and documentation
    2. Intelligent fallback (failed code chunking β†’ documentation chunking)
    3. Hierarchical chunking for Python files
    4. Documentation-aware chunking for markdown/text files
    5. Dataset statistics and metadata generation

DATA FLOW:
    Local files β†’ Type detection β†’ Python chunking (or fallback) β†’ 
    Documentation chunking β†’ JSONL export β†’ Statistics

USE CASES:
    - Processing locally saved code examples
    - Creating datasets from example repositories
    - Testing chunking strategies on local files

USAGE:
    python run_python_pipeline.py --name crewai_examples --include crewai
    python run_python_pipeline.py --name test_dataset --exclude large_repos
"""

from pathlib import Path
import json
import argparse

from src.task_3_data_engineering.chunking.hierarchical_chunker import HierarchicalChunker
from src.task_3_data_engineering.export.jsonl_exporter import export_chunks_jsonl
from src.task_3_data_engineering.analysis.dataset_stats import compute_dataset_stats
from src.task_3_data_engineering.export.dataset_metadata import write_dataset_metadata
from src.task_3_data_engineering.chunking.doc_chunker import chunk_document , wrap_doc_chunks


INPUT_DIR = Path("data/raw/codebases")
BASE_OUTPUT_DIR = Path("data/processed/chunks")

DOC_EXTS = {".md", ".txt", ".rst"}


def run(dataset_name: str, include: list[str] | None, exclude: list[str] | None):
    output_dir = BASE_OUTPUT_DIR / dataset_name
    output_dir.mkdir(parents=True, exist_ok=True)

    chunker = HierarchicalChunker()
    all_chunks = []

    files = [p for p in INPUT_DIR.rglob("*") if p.is_file()]

    for file_path in files:
        rel = file_path.relative_to(INPUT_DIR).parts
        if include and rel[0] not in include:
            continue
        if exclude and rel[0] in exclude:
            continue

        print(f"Processing: {file_path}")

        # ---- Python files ----
        if file_path.suffix == ".py":
            try:
                code_chunks = chunker.chunk_file(file_path)
                if code_chunks:
                    all_chunks.extend(code_chunks)
                    continue
            except Exception:
                pass  # fallback to doc mode

        # ---- Documentation / text ----
        if file_path.suffix.lower() in DOC_EXTS or file_path.suffix == ".py":
            try:
                raw_text = file_path.read_text(encoding="utf-8", errors="ignore")
            except Exception:
                continue

            if not raw_text.strip():
                continue

            doc_chunks = chunk_document(
                raw_text=raw_text,
                source_name=str(file_path),
                source_url=None,
            )

            all_chunks.extend(wrap_doc_chunks(doc_chunks))

    # ---- Export ----
    export_chunks_jsonl(all_chunks, output_dir / "chunks.jsonl", print_stats=True)

    stats = compute_dataset_stats(all_chunks)

    primary = [c for c in all_chunks if c.hierarchy.is_primary]
    stats["hierarchy"] = {
        "primary_chunks": len(primary),
        "secondary_chunks": len(all_chunks) - len(primary),
    }

    with (output_dir / "dataset_stats.json").open("w", encoding="utf-8") as f:
        json.dump(stats, f, indent=2)

    write_dataset_metadata(
        chunks=all_chunks,
        output_path=output_dir / "dataset_metadata.json",
        dataset_name=dataset_name,
        dataset_version="v1",
    )

    print("\nβœ… Dataset built successfully")
    print(f"   - Files: {len({c.file_path for c in all_chunks})}")
    print(f"   - Chunks: {len(all_chunks)}")
    print(f"   - Output: {output_dir}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--name", required=True)
    parser.add_argument("--include", nargs="+")
    parser.add_argument("--exclude", nargs="+")
    args = parser.parse_args()

    run(args.name, args.include, args.exclude)