| """Evolution runner — starts the overnight Karpathy loop with safety guardrails.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import logging |
| import sys |
| from pathlib import Path |
|
|
| if sys.platform == "win32": |
| sys.stdout.reconfigure(encoding="utf-8", errors="replace") |
|
|
| project_root = Path(__file__).resolve().parent.parent |
| sys.path.insert(0, str(project_root)) |
|
|
| from mcphunter.config import ( |
| EVOLUTION_CONFIG, |
| EVOLUTION_LOG, |
| RESULTS_DIR, |
| RULES_PATH, |
| ShieldConfig, |
| EvolutionConfig, |
| ) |
| from mcphunter.evolution.loop import EvolutionLoop |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| MAX_ITERATIONS_HARD_CAP = 100 |
| MAX_RULES_FILE_BYTES = 10 * 1024 * 1024 |
| MAX_EVOLUTION_LOG_BYTES = 50 * 1024 * 1024 |
| MAX_ATTACKS_LOG_BYTES = 100 * 1024 * 1024 |
|
|
|
|
| def check_disk_limits() -> list[str]: |
| """Check file sizes against safety limits. Returns list of warnings.""" |
| warnings: list[str] = [] |
|
|
| if RULES_PATH.exists() and RULES_PATH.stat().st_size > MAX_RULES_FILE_BYTES: |
| warnings.append( |
| f"pattern_rules.json exceeds {MAX_RULES_FILE_BYTES // 1024 // 1024}MB limit " |
| f"({RULES_PATH.stat().st_size // 1024 // 1024}MB)" |
| ) |
|
|
| if EVOLUTION_LOG.exists() and EVOLUTION_LOG.stat().st_size > MAX_EVOLUTION_LOG_BYTES: |
| rotated = EVOLUTION_LOG.with_name( |
| f"{EVOLUTION_LOG.stem}_{EVOLUTION_LOG.stat().st_size // 1024 // 1024}mb{EVOLUTION_LOG.suffix}" |
| ) |
| EVOLUTION_LOG.rename(rotated) |
| warnings.append(f"evolution_log.jsonl rotated to {rotated.name}") |
|
|
| attacks_log = RESULTS_DIR / "attacks.jsonl" |
| if attacks_log.exists() and attacks_log.stat().st_size > MAX_ATTACKS_LOG_BYTES: |
| rotated = attacks_log.with_name( |
| f"{attacks_log.stem}_{attacks_log.stat().st_size // 1024 // 1024}mb{attacks_log.suffix}" |
| ) |
| attacks_log.rename(rotated) |
| warnings.append(f"attacks.jsonl rotated to {rotated.name}") |
|
|
| return warnings |
|
|
|
|
| def print_banner( |
| config: EvolutionConfig, |
| shield_config: ShieldConfig, |
| n: int, |
| sandbox: bool, |
| ) -> None: |
| print() |
| print("=" * 60) |
| print(" MCPHunter Evolution Engine") |
| print(" The Karpathy Loop: HUNTER attacks SHIELD, SHIELD learns") |
| print("=" * 60) |
| print() |
| print(f" Iterations: {n} (hard cap: {MAX_ITERATIONS_HARD_CAP})") |
| print(f" Attacks/iter: {config.attacks_per_iteration}") |
| print(f" Sleep between: {config.sleep_seconds}s") |
| print(f" Target det. rate: {config.target_detection_rate:.0%}") |
| print(f" Sandbox mode: {'ON' if sandbox else 'OFF'}") |
| print() |
| print(f" SHIELD layers: L1={'ON' if shield_config.regex_layer_enabled else 'OFF'} " |
| f"L2={'ON' if shield_config.encoding_layer_enabled else 'OFF'} " |
| f"L3={'ON' if shield_config.heuristic_layer_enabled else 'OFF'} " |
| f"L4={'ON' if shield_config.llm_layer_enabled else 'OFF'}") |
| print() |
| if sandbox: |
| print(" SANDBOX: No network calls, no git, writes only to results/") |
| print() |
| print(" Windows overnight tip: disable sleep in Settings > Power,") |
| print(" or run in a separate terminal:") |
| print(" powershell -Command \"while(1){(New-Object -ComObject WScript.Shell).SendKeys('{F15}');Start-Sleep 240}\"") |
| print() |
| print("-" * 60) |
| print() |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="MCPHunter Evolution Engine") |
| parser.add_argument( |
| "-n", "--iterations", type=int, default=50, |
| help=f"Number of iterations (default: 50, hard cap: {MAX_ITERATIONS_HARD_CAP})", |
| ) |
| parser.add_argument("--sleep", type=int, default=None, help="Seconds between iterations") |
| parser.add_argument("--attacks", type=int, default=None, help="Attacks per iteration") |
| parser.add_argument("--no-llm", action="store_true", help="Disable LLM calls (faster, deterministic)") |
| parser.add_argument( |
| "--sandbox", action="store_true", |
| help="Sandbox mode: no network, no git, writes only to results/", |
| ) |
| parser.add_argument("--resume", action="store_true", help="Resume from last checkpoint") |
| parser.add_argument("--seed", type=int, default=None, help="Random seed for reproducibility") |
| args = parser.parse_args() |
|
|
| |
| iterations = min(args.iterations, MAX_ITERATIONS_HARD_CAP) |
| if args.iterations > MAX_ITERATIONS_HARD_CAP: |
| print(f" Warning: capped iterations from {args.iterations} to {MAX_ITERATIONS_HARD_CAP}") |
|
|
| sleep_sec = args.sleep if args.sleep is not None else EVOLUTION_CONFIG.sleep_seconds |
| attacks_per = args.attacks if args.attacks is not None else EVOLUTION_CONFIG.attacks_per_iteration |
|
|
| evo_config = EvolutionConfig( |
| attacks_per_iteration=attacks_per, |
| sleep_seconds=sleep_sec, |
| max_iterations=iterations, |
| ) |
|
|
| |
| use_llm = not args.no_llm and not args.sandbox |
| shield_config = ShieldConfig(llm_layer_enabled=use_llm) |
|
|
| print_banner(evo_config, shield_config, iterations, args.sandbox) |
|
|
| |
| disk_warnings = check_disk_limits() |
| for w in disk_warnings: |
| print(f" DISK WARNING: {w}") |
|
|
| loop = EvolutionLoop( |
| config=evo_config, shield_config=shield_config, random_seed=args.seed |
| ) |
|
|
| try: |
| loop.run(max_iterations=iterations, resume=args.resume) |
| except KeyboardInterrupt: |
| print("\n\n Interrupted by user (Ctrl+C)") |
|
|
| loop.metrics.print_final_summary() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|