| import argparse
|
| import json
|
| import os
|
| import re
|
| from typing import Any, Dict
|
|
|
|
|
| TIMESTAMP_REGEX = re.compile(r"_(\d{8}_\d{6})\.json$")
|
|
|
| KIND_TS_INFIX_REGEX = re.compile(r"(analysis|iterations|messages)_(\d{8}_\d{6})\.json$", re.IGNORECASE)
|
|
|
|
|
| def extract_timestamp_from_filename(filename: str) -> str:
|
| match = TIMESTAMP_REGEX.search(filename)
|
| return match.group(1) if match else ""
|
|
|
|
|
| def remove_name_keys(obj: Any) -> Any:
|
| if isinstance(obj, dict):
|
| return {k: remove_name_keys(v) for k, v in obj.items() if k != "name"}
|
| if isinstance(obj, list):
|
| return [remove_name_keys(v) for v in obj]
|
| return obj
|
|
|
|
|
| def reduce_payload(original: Any, filename: str) -> Dict[str, Any]:
|
| cleaned = remove_name_keys(original)
|
| timestamp = extract_timestamp_from_filename(filename)
|
|
|
| if isinstance(cleaned, dict):
|
| result: Dict[str, Any] = {}
|
| if "function" in cleaned:
|
| result["function"] = cleaned["function"]
|
| if "analysis" in cleaned:
|
| result["analysis"] = cleaned["analysis"]
|
|
|
| if not result:
|
|
|
| if "analysis" in os.path.basename(filename):
|
| result = {"analysis": cleaned}
|
| else:
|
|
|
| result = {"analysis": cleaned}
|
|
|
| else:
|
|
|
| result = {"analysis": cleaned}
|
|
|
| if timestamp:
|
| result["timestamp"] = timestamp
|
| return result
|
|
|
|
|
| def compute_new_basename(filename: str) -> str | None:
|
| base = os.path.basename(filename)
|
| m = KIND_TS_INFIX_REGEX.search(base)
|
| if not m:
|
| return None
|
| kind = m.group(1).lower()
|
| ts = m.group(2)
|
| return f"{kind}_{ts}.json"
|
|
|
|
|
| def safe_rename(path: str, new_basename: str) -> str:
|
| directory = os.path.dirname(path)
|
| target = os.path.join(directory, new_basename)
|
| if os.path.abspath(path) == os.path.abspath(target):
|
| return path
|
| if not os.path.exists(target):
|
| os.replace(path, target)
|
| return target
|
| stem, ext = os.path.splitext(new_basename)
|
| counter = 1
|
| while True:
|
| candidate = os.path.join(directory, f"{stem}_{counter}{ext}")
|
| if not os.path.exists(candidate):
|
| os.replace(path, candidate)
|
| return candidate
|
| counter += 1
|
|
|
|
|
| def process_file(path: str, do_rename: bool) -> str:
|
| with open(path, "r", encoding="utf-8") as f:
|
| data = json.load(f)
|
| reduced = reduce_payload(data, os.path.basename(path))
|
| with open(path, "w", encoding="utf-8") as f:
|
| json.dump(reduced, f, ensure_ascii=False, indent=2)
|
| f.write("\n")
|
| if do_rename:
|
| new_base = compute_new_basename(path)
|
| if new_base:
|
| path = safe_rename(path, new_base)
|
| print(f"Processed: {os.path.basename(path)}")
|
| return path
|
|
|
|
|
| def process_directory(target_dir: str, do_rename: bool) -> None:
|
| if not os.path.isdir(target_dir):
|
| raise FileNotFoundError(f"Directory not found: {target_dir}")
|
|
|
| for root, _dirs, files in os.walk(target_dir):
|
| for entry in files:
|
| if not entry.lower().endswith(".json"):
|
| continue
|
| path = os.path.join(root, entry)
|
| try:
|
| process_file(path, do_rename)
|
| except Exception as e:
|
| print(f"Failed: {os.path.relpath(path, start=target_dir)}: {e}")
|
|
|
|
|
| def main() -> None:
|
| parser = argparse.ArgumentParser(description="Trim JSONs to keep only function, analysis, and timestamp; remove name fields. Recurses directories.")
|
| parser.add_argument(
|
| "--path",
|
| help="A file or directory path to process. If omitted, defaults to the bitsadmin analysis dir.",
|
| )
|
| parser.add_argument("--no-rename", action="store_true", help="Do not rename files to kind_timestamp.json")
|
| args = parser.parse_args()
|
| default_dir = os.path.join(
|
| "mordor_dataset",
|
| "eval_output",
|
| "analysis",
|
| )
|
| target = args.path or default_dir
|
| if os.path.isdir(target):
|
| process_directory(target, do_rename=not args.no_rename)
|
| elif os.path.isfile(target):
|
| process_file(target, do_rename=not args.no_rename)
|
| else:
|
| raise FileNotFoundError(f"Path not found: {target}")
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|
|
|
|
|