| | import io |
| | import json |
| | import os |
| | import zipfile |
| | from dataclasses import dataclass |
| | from typing import Any, Dict, List, Optional, Tuple |
| |
|
| | from .canon import DRP_BUNDLE_SPEC, DRP_EVENT_SPEC, hash_event, now_utc_iso |
| |
|
| |
|
| | @dataclass |
| | class Bundle: |
| | manifest: Dict[str, Any] |
| | events: List[Dict[str, Any]] |
| |
|
| |
|
| | def _read_json_from_zip(z: zipfile.ZipFile, name: str) -> Dict[str, Any]: |
| | with z.open(name, "r") as f: |
| | return json.loads(f.read().decode("utf-8")) |
| |
|
| |
|
| | def _read_jsonl_from_zip(z: zipfile.ZipFile, name: str) -> List[Dict[str, Any]]: |
| | out: List[Dict[str, Any]] = [] |
| | with z.open(name, "r") as f: |
| | for line in f.read().decode("utf-8").splitlines(): |
| | line = line.strip() |
| | if not line: |
| | continue |
| | out.append(json.loads(line)) |
| | return out |
| |
|
| |
|
| | def load_bundle(zip_path: str) -> Bundle: |
| | with zipfile.ZipFile(zip_path, "r") as z: |
| | manifest = _read_json_from_zip(z, "manifest.json") |
| | events = _read_jsonl_from_zip(z, "events.jsonl") |
| | return Bundle(manifest=manifest, events=events) |
| |
|
| |
|
| | def verify_bundle(zip_path: str) -> Tuple[bool, Dict[str, Any]]: |
| | """ |
| | Verifies: |
| | - bundle spec fields exist |
| | - each event has correct hash |
| | - hash chain prev pointers match |
| | """ |
| | b = load_bundle(zip_path) |
| |
|
| | issues: List[str] = [] |
| | if b.manifest.get("spec") != DRP_BUNDLE_SPEC: |
| | issues.append(f"manifest.spec mismatch (expected {DRP_BUNDLE_SPEC})") |
| |
|
| | events = b.events |
| | if not events: |
| | issues.append("no events found") |
| | return (False, {"ok": False, "issues": issues}) |
| |
|
| | prev_hash: Optional[str] = None |
| | for idx, ev in enumerate(events): |
| | if ev.get("spec") != DRP_EVENT_SPEC: |
| | issues.append(f"event[{idx}].spec mismatch (expected {DRP_EVENT_SPEC})") |
| |
|
| | computed = hash_event(ev) |
| | if ev.get("hash") != computed: |
| | issues.append(f"event[{idx}] hash mismatch") |
| |
|
| | if idx > 0: |
| | if ev.get("prev") != prev_hash: |
| | issues.append(f"event[{idx}] prev pointer mismatch") |
| |
|
| | prev_hash = ev.get("hash") |
| |
|
| | ok = len(issues) == 0 |
| | summary = { |
| | "ok": ok, |
| | "issues": issues, |
| | "event_count": len(events), |
| | "run_id": b.manifest.get("run_id"), |
| | "created_at": b.manifest.get("created_at"), |
| | "framework": b.manifest.get("framework"), |
| | "model_id": b.manifest.get("model_id"), |
| | } |
| | return (ok, summary) |
| |
|
| |
|
| | def write_bundle_zip( |
| | out_zip_path: str, |
| | *, |
| | run_id: str, |
| | framework: str, |
| | model_id: str, |
| | env_fingerprint: Dict[str, Any], |
| | events_payloads: List[Dict[str, Any]], |
| | created_at: Optional[str] = None, |
| | replay: Optional[Dict[str, Any]] = None, |
| | run_url: Optional[str] = None, |
| | ) -> str: |
| | """ |
| | Creates a DRP bundle zip: |
| | - manifest.json |
| | - events.jsonl (hash-chained) |
| | """ |
| | created_at = created_at or now_utc_iso() |
| |
|
| | manifest: Dict[str, Any] = { |
| | "spec": DRP_BUNDLE_SPEC, |
| | "run_id": run_id, |
| | "created_at": created_at, |
| | "framework": framework, |
| | "model_id": model_id, |
| | "env": env_fingerprint, |
| | } |
| | if replay: |
| | manifest["replay"] = replay |
| | if run_url: |
| | manifest["run_url"] = run_url |
| |
|
| | events: List[Dict[str, Any]] = [] |
| | prev_hash: Optional[str] = None |
| | for i, payload in enumerate(events_payloads): |
| | ev = { |
| | "spec": DRP_EVENT_SPEC, |
| | "i": i, |
| | "ts": payload.get("ts") or now_utc_iso(), |
| | "kind": payload.get("kind", "state_snapshot"), |
| | "step": payload.get("step", f"step-{i}"), |
| | "payload": payload.get("payload", {}), |
| | "prev": prev_hash, |
| | } |
| | ev["hash"] = hash_event(ev) |
| | prev_hash = ev["hash"] |
| | events.append(ev) |
| |
|
| | os.makedirs(os.path.dirname(out_zip_path) or ".", exist_ok=True) |
| | with zipfile.ZipFile(out_zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z: |
| | z.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2)) |
| | buf = io.StringIO() |
| | for ev in events: |
| | buf.write(json.dumps(ev, ensure_ascii=False, separators=(",", ":"))) |
| | buf.write("\n") |
| | z.writestr("events.jsonl", buf.getvalue()) |
| |
|
| | return out_zip_path |