| | ''' |
| | Synthesize triplet and positive pair datasets from chunked code files.''' |
| |
|
| | import argparse |
| | import json |
| | import random |
| | import hashlib |
| | from pathlib import Path |
| | from typing import Dict, List |
| | from datetime import datetime |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | from sklearn.metrics.pairwise import cosine_similarity |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | MAX_DOCUMENTS = 200 |
| | POSITIVE_VARIANTS = 5 |
| | TFIDF_MAX_FEATURES = 5000 |
| | RANDOM_SEED = 42 |
| |
|
| | BASE_OUTPUT_DIR = Path("data/synthetic") |
| |
|
| | random.seed(RANDOM_SEED) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def load_chunks(file_path): |
| | path = Path(file_path) |
| |
|
| | if path.suffix == ".jsonl": |
| | chunks = [] |
| | with open(path, "r", encoding="utf-8") as f: |
| | for line_no, line in enumerate(f, 1): |
| | line = line.strip() |
| | if not line: |
| | continue |
| | try: |
| | chunks.append(json.loads(line)) |
| | except json.JSONDecodeError as e: |
| | raise ValueError( |
| | f"Invalid JSON on line {line_no} in {path}" |
| | ) from e |
| | return chunks |
| |
|
| | elif path.suffix == ".json": |
| | with open(path, "r", encoding="utf-8") as f: |
| | data = json.load(f) |
| | if not isinstance(data, list): |
| | raise ValueError(f"{path} must contain a list of chunks") |
| | return data |
| |
|
| | else: |
| | raise ValueError( |
| | f"Unsupported file format {path.suffix}. Use .json or .jsonl" |
| | ) |
| |
|
| |
|
| |
|
| | def save_jsonl(path: Path, records: List[Dict]): |
| | path.parent.mkdir(parents=True, exist_ok=True) |
| | with path.open("w", encoding="utf-8") as f: |
| | for r in records: |
| | f.write(json.dumps(r, ensure_ascii=False) + "\n") |
| |
|
| |
|
| | def save_json(path: Path, data): |
| | path.parent.mkdir(parents=True, exist_ok=True) |
| | with path.open("w", encoding="utf-8") as f: |
| | json.dump(data, f, indent=2) |
| |
|
| |
|
| | def stable_document_id(chunk: Dict, idx: int) -> str: |
| | """ |
| | Generate a canonical, stable document_id. |
| | """ |
| | base = f"{chunk.get('file_path','unknown')}::{idx}" |
| | return "doc_" + hashlib.sha1(base.encode()).hexdigest() |
| |
|
| |
|
| | def infer_framework(input_path: Path) -> str: |
| | """ |
| | Infer framework from path (fallback-safe). |
| | """ |
| | parts = [p.lower() for p in input_path.parts] |
| | for fw in ["crewai", "langchain", "langgraph", "autogen"]: |
| | if fw in parts: |
| | return fw |
| | return "unknown" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def generate_anchor_questions(code: str, n: int) -> List[str]: |
| | """ |
| | Deterministic placeholder (LLM-ready). |
| | """ |
| | symbol = code.split("(")[0].replace("def ", "").replace("class ", "").strip() |
| |
|
| | templates = [ |
| | f"How does {symbol} work in Python?", |
| | f"How to implement {symbol}?", |
| | f"Example usage of {symbol}", |
| | f"Explain the {symbol} logic", |
| | f"Best practices for {symbol}", |
| | ] |
| |
|
| | random.shuffle(templates) |
| | return templates[:n] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def build_tfidf(chunks: List[Dict]): |
| | corpus = [c["code"] for c in chunks] |
| | vectorizer = TfidfVectorizer( |
| | stop_words="english", |
| | max_features=TFIDF_MAX_FEATURES |
| | ) |
| | matrix = vectorizer.fit_transform(corpus) |
| | return vectorizer, matrix |
| |
|
| |
|
| | def mine_hard_negative( |
| | anchor: str, |
| | positive_idx: int, |
| | chunks: List[Dict], |
| | vectorizer, |
| | matrix, |
| | ) -> Dict: |
| | query_vec = vectorizer.transform([anchor]) |
| | scores = cosine_similarity(query_vec, matrix)[0] |
| |
|
| | ranked = sorted( |
| | [(i, s) for i, s in enumerate(scores)], |
| | key=lambda x: x[1], |
| | reverse=True, |
| | ) |
| |
|
| | for idx, _ in ranked: |
| | if idx != positive_idx: |
| | return chunks[idx] |
| |
|
| | raise RuntimeError("No negative candidate found") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def generate_datasets(input_path: Path, run_name: str): |
| | output_dir = BASE_OUTPUT_DIR / run_name |
| | framework = infer_framework(input_path) |
| |
|
| | chunks = load_chunks(input_path) |
| | |
| | chunks = [ |
| | c for c in chunks |
| | if c.get("chunk_type") in {"class", "method", "function"} |
| | and "code" in c |
| | ] |
| |
|
| | random.shuffle(chunks) |
| | chunks = chunks[:MAX_DOCUMENTS] |
| |
|
| | |
| | for idx, c in enumerate(chunks): |
| | c["document_id"] = stable_document_id(c, idx) |
| |
|
| | vectorizer, matrix = build_tfidf(chunks) |
| |
|
| | positive_pairs = [] |
| | triplets = [] |
| |
|
| | for idx, chunk in enumerate(chunks): |
| | code = chunk["code"] |
| | doc_id = chunk["document_id"] |
| |
|
| | |
| | anchors = generate_anchor_questions(code, POSITIVE_VARIANTS) |
| | for a in anchors: |
| | positive_pairs.append({ |
| | "document_id": doc_id, |
| | "anchor": a, |
| | "positive": code, |
| | "framework": framework, |
| | "source": "synthetic_positive_v2", |
| | }) |
| |
|
| | |
| | anchor = anchors[0] |
| | negative_chunk = mine_hard_negative( |
| | anchor, idx, chunks, vectorizer, matrix |
| | ) |
| |
|
| | triplets.append({ |
| | "document_id": doc_id, |
| | "anchor": anchor, |
| | "positive": code, |
| | "negative": negative_chunk["code"], |
| | "framework": framework, |
| | "source": "synthetic_triplet_v2", |
| | }) |
| |
|
| | |
| | save_jsonl(output_dir / "positive_pairs.jsonl", positive_pairs) |
| | save_jsonl(output_dir / "triplets.jsonl", triplets) |
| |
|
| | save_json(output_dir / "positive_pairs.json", positive_pairs) |
| | save_json(output_dir / "triplets.json", triplets) |
| |
|
| | metadata = { |
| | "name": run_name, |
| | "framework": framework, |
| | "input_file": str(input_path), |
| | "num_chunks": len(chunks), |
| | "positive_pairs": len(positive_pairs), |
| | "triplets": len(triplets), |
| | "created_at": datetime.utcnow().isoformat(), |
| | "random_seed": RANDOM_SEED, |
| | } |
| |
|
| | save_json(output_dir / "metadata.json", metadata) |
| |
|
| | print(f"✅ Dataset generated at: {output_dir}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--input", required=True, help="Chunked JSONL file") |
| | parser.add_argument("--name", required=True, help="Synthetic dataset name") |
| |
|
| | args = parser.parse_args() |
| |
|
| | generate_datasets( |
| | input_path=Path(args.input), |
| | run_name=args.name, |
| | ) |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| | |