#!/usr/bin/env python3 """ cr_parser.py — Parse a CR DOCX's tracked changes into a JSON manifest. Each entry in the manifest is one of: {"type": "text_replace", "location": {...}, "old": "...", "new": "..."} {"type": "para_insert", "location": {...}, "paragraphs": [...]} {"type": "row_insert", "location": {...}, "cells": [...]} Usage: python3 cr_parser.py [--output manifest.json] # or import: from cr_parser import parse_cr """ import argparse import json import re import sys from pathlib import Path import docx from docx.oxml.ns import qn # ── Low-level text helpers ──────────────────────────────────────────────────── def _del_text(elem): """Concatenate all w:delText descendants.""" return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:delText'))) def _ins_text(elem): """Concatenate all w:t descendants (inside w:ins).""" return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:t'))) def _para_new_text(p_elem): """Text of a paragraph after accepting tracked changes (ins included, del excluded).""" return ''.join(t.text or '' for t in p_elem.findall('.//' + qn('w:t'))) def _para_orig_text(p_elem): """Text of a paragraph as it exists in the TS (del included, ins excluded).""" parts = [] for node in p_elem.iter(): if node.tag == qn('w:delText') and node.text: parts.append(node.text) elif node.tag == qn('w:t') and node.text: # Skip if inside a w:ins if not any(a.tag == qn('w:ins') for a in node.iterancestors()): parts.append(node.text) return ''.join(parts) def _style_val(p_elem): pPr = p_elem.find(qn('w:pPr')) if pPr is None: return None pStyle = pPr.find(qn('w:pStyle')) if pStyle is None: return None return pStyle.get(qn('w:val')) def _is_rpr_ins(ins_elem): """True if w:ins is inside w:rPr — a formatting change, not a content insertion.""" p = ins_elem.getparent() return p is not None and p.tag == qn('w:rPr') def _is_inserted_para(p_elem): """True if this paragraph's paragraph-mark is tracked as inserted (whole new para).""" pPr = p_elem.find(qn('w:pPr')) if pPr is None: return False rPr = pPr.find(qn('w:rPr')) if rPr is None: return False return rPr.find(qn('w:ins')) is not None def _is_deleted_para(p_elem): """True if this paragraph's paragraph-mark is tracked as deleted (whole para deleted).""" pPr = p_elem.find(qn('w:pPr')) if pPr is None: return False rPr = pPr.find(qn('w:rPr')) if rPr is None: return False return rPr.find(qn('w:del')) is not None def _is_fully_deleted_tbl(tbl_elem): """True if every row in the table is tracked as a row-level deletion.""" rows = tbl_elem.findall(qn('w:tr')) if not rows: return False return all( tr.find(qn('w:trPr')) is not None and tr.find(qn('w:trPr')).find(qn('w:del')) is not None for tr in rows ) def _is_fully_inserted_tbl(tbl_elem): """True if every row in the table is tracked as a row-level insertion.""" rows = tbl_elem.findall(qn('w:tr')) if not rows: return False return all( tr.find(qn('w:trPr')) is not None and tr.find(qn('w:trPr')).find(qn('w:ins')) is not None for tr in rows ) # ── Table helpers ───────────────────────────────────────────────────────────── def _table_header(tbl_elem): """First row cell texts — used as table identifier.""" first_tr = tbl_elem.find(qn('w:tr')) if first_tr is None: return [] cells = [] for tc in first_tr.findall(qn('w:tc')): p = tc.find('.//' + qn('w:p')) cells.append(_para_new_text(p).strip() if p is not None else '') return cells def _row_col0(tr_elem): """Col-0 text of a table row — used as row anchor.""" tc = tr_elem.find(qn('w:tc')) if tc is None: return '' p = tc.find('.//' + qn('w:p')) return _para_new_text(p).strip() if p is not None else '' # ── Inline del+ins extraction (from a single paragraph) ────────────────────── def _extract_inline_replacements(p_elem): """ Return list of (old_text, new_text) pairs from del+ins sibling pairs. Handles: del-then-ins, ins-then-del, multi-fragment consecutive dels. Filters: whitespace-only dels with no adjacent ins, empty dels, rPr ins. """ children = list(p_elem) pairs = [] skip = set() for i, child in enumerate(children): if i in skip: continue if child.tag != qn('w:del'): continue old_text = _del_text(child) # Empty del (paragraph-mark or line-break deletion) — discard if not old_text: skip.add(i) continue # Merge consecutive del siblings (multi-fragment deletion) j = i + 1 while j < len(children) and children[j].tag == qn('w:del'): old_text += _del_text(children[j]) skip.add(j) j += 1 # Whitespace-only del: only keep if there's an adjacent ins next_sib = children[j] if j < len(children) else None prev_sib = children[i - 1] if i > 0 else None new_text = None if next_sib is not None and next_sib.tag == qn('w:ins') and not _is_rpr_ins(next_sib): new_text = _ins_text(next_sib) skip.add(j) elif prev_sib is not None and prev_sib.tag == qn('w:ins') and not _is_rpr_ins(prev_sib): new_text = _ins_text(prev_sib) if new_text is None: if not old_text.strip(): skip.add(i) continue # whitespace artefact with no counterpart # Pure deletion (no replacement) — record with empty new pairs.append((old_text, '')) else: pairs.append((old_text, new_text)) return pairs # ── Table change extraction ─────────────────────────────────────────────────── def _parse_table(tbl_elem, changes, section_heading=''): header = _table_header(tbl_elem) header_key = header[:3] # first 3 columns enough for matching rows = tbl_elem.findall(qn('w:tr')) for tr_idx, tr in enumerate(rows): trPr = tr.find(qn('w:trPr')) # ── Tracked row insertion ───────────────────────────────────────── if trPr is not None and trPr.find(qn('w:ins')) is not None: # Find preceding stable row for anchor after_anchor = '' for prev_idx in range(tr_idx - 1, -1, -1): prev_tr = rows[prev_idx] prev_trPr = prev_tr.find(qn('w:trPr')) if prev_trPr is None or prev_trPr.find(qn('w:ins')) is None: after_anchor = _row_col0(prev_tr) break cells = [] for tc in tr.findall(qn('w:tc')): tcPr = tc.find(qn('w:tcPr')) # Width width = None if tcPr is not None: tcW = tcPr.find(qn('w:tcW')) if tcW is not None: try: width = int(tcW.get(qn('w:w'), 0)) except (ValueError, TypeError): width = None # vMerge (no w:val attribute = continuation) is_vmerge = False if tcPr is not None: vm = tcPr.find(qn('w:vMerge')) if vm is not None and vm.get(qn('w:val')) is None: is_vmerge = True # Text — prefer ins text, fall back to all text cell_ins_text = _ins_text(tc) p = tc.find('.//' + qn('w:p')) cell_text = cell_ins_text if cell_ins_text else (_para_new_text(p) if p else '') style = _style_val(p) if p is not None else None cells.append({ 'text': cell_text.strip(), 'width': width, 'vmerge': is_vmerge, 'style': style, }) changes.append({ 'type': 'row_insert', 'location': { 'kind': 'table_row', 'table_header': header_key, 'after_row_anchor': after_anchor, 'section_heading': section_heading, }, 'cells': cells, }) continue # ── Cell-level text_replace ─────────────────────────────────────── row_anchor = _row_col0(tr) tcs = tr.findall(qn('w:tc')) for col_idx, tc in enumerate(tcs): for p in tc.findall('.//' + qn('w:p')): for old_text, new_text in _extract_inline_replacements(p): if not old_text: continue changes.append({ 'type': 'text_replace', 'location': { 'kind': 'table_cell', 'table_header': header_key, 'row_anchor': row_anchor, 'col_idx': col_idx, 'section_heading': section_heading, }, 'old': old_text, 'new': new_text, }) # ── Body paragraph extraction ───────────────────────────────────────────────── def _parse_body(body, changes): """ Walk direct children of w:body, emitting changes. Change types emitted: section_replace — a contiguous block of fully-deleted elements (para and/or table, tracked at the paragraph-mark / row level) followed immediately by a contiguous block of fully-inserted elements. The raw XML of ALL those CR elements is stored verbatim so the applicator can transplant them directly into the TS — exactly what Word does on a copy-paste. text_replace — an inline del+ins pair inside an otherwise-stable paragraph. para_insert — one or more wholly-new paragraphs with no corresponding deletion (rare; kept for backward compatibility). """ from lxml import etree prev_stable_text = '' # ── Section-replace accumulator ─────────────────────────────────────────── sec_del = [] # fully-deleted elements (CR del block) sec_sep = [] # empty/separator paragraphs between del and ins blocks sec_ins = [] # fully-inserted elements (CR ins block) sec_state = 'stable' # 'stable' | 'del' | 'sep' | 'ins' sec_anchor = '' def flush_section(): nonlocal sec_state, sec_anchor if not sec_del and not sec_ins: sec_del.clear(); sec_sep.clear(); sec_ins.clear() sec_state = 'stable' return # The del_heading is the text content of the first deleted paragraph del_heading = '' for e in sec_del: tag = e.tag.split('}')[-1] if '}' in e.tag else e.tag if tag == 'p': t = _del_text(e).strip() or _para_orig_text(e).strip() if t: del_heading = t break # Serialize all elements for the manifest (del + sep + ins) all_elems = sec_del + sec_sep + sec_ins elements_xml = [etree.tostring(e, encoding='unicode') for e in all_elems] has_del_table = any( (e.tag.split('}')[-1] if '}' in e.tag else e.tag) == 'tbl' for e in sec_del ) changes.append({ 'type': 'section_replace', 'location': { 'kind': 'body', 'del_heading': del_heading, 'has_del_table': has_del_table, 'anchor_text': sec_anchor, }, 'elements_xml': elements_xml, }) sec_del.clear(); sec_sep.clear(); sec_ins.clear() sec_state = 'stable' # ── Para-insert accumulator (for standalone new paragraphs) ─────────────── insert_group = [] def flush_group(): if not insert_group: return paras = [ {'text': _para_new_text(p).strip(), 'style': _style_val(p)} for p in insert_group ] paras = [p for p in paras if p['text'] or p['style']] if paras: changes.append({ 'type': 'para_insert', 'location': { 'kind': 'body', 'anchor_text': prev_stable_text, }, 'paragraphs': paras, }) insert_group.clear() for elem in body: tag = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag if tag == 'p': is_del = _is_deleted_para(elem) is_ins = _is_inserted_para(elem) is_empty = not _para_orig_text(elem).strip() and not _para_new_text(elem).strip() if is_del: # Start or continue the del block if sec_state == 'ins': flush_section() # ins before del = two separate section_replaces if sec_state == 'stable': flush_group() sec_anchor = prev_stable_text sec_state = 'del' sec_del.append(elem) elif is_ins: if sec_state in ('del', 'sep'): # ins block follows a del block → part of section_replace sec_state = 'ins' sec_ins.append(elem) elif sec_state == 'ins': sec_ins.append(elem) else: # Standalone ins paragraph (no preceding del block) flush_group() # (should already be empty) insert_group.append(elem) elif is_empty: if sec_state == 'del': # Separator between del and ins blocks sec_state = 'sep' sec_sep.append(elem) elif sec_state in ('sep', 'ins'): sec_ins.append(elem) else: # Empty para in stable region — ignore for anchoring pass else: # Stable (or inline-changed) paragraph flush_section() flush_group() for old_text, new_text in _extract_inline_replacements(elem): if not old_text: continue changes.append({ 'type': 'text_replace', 'location': { 'kind': 'body_para', 'para_context': _para_orig_text(elem).strip(), }, 'old': old_text, 'new': new_text, }) orig = _para_orig_text(elem).strip() if orig and not re.fullmatch(r'\[\.[\s\.]*\]', orig): prev_stable_text = orig elif tag == 'tbl': if _is_fully_deleted_tbl(elem): if sec_state == 'ins': flush_section() if sec_state == 'stable': flush_group() sec_anchor = prev_stable_text sec_state = 'del' sec_del.append(elem) elif _is_fully_inserted_tbl(elem): if sec_state in ('del', 'sep', 'ins'): sec_state = 'ins' sec_ins.append(elem) else: # Standalone fully-inserted table (no del block) — treat as section_replace flush_group() sec_anchor = prev_stable_text sec_state = 'ins' sec_ins.append(elem) else: # Table with inline cell changes flush_section() flush_group() _parse_table(elem, changes, section_heading=prev_stable_text) flush_section() flush_group() # ── Public API ──────────────────────────────────────────────────────────────── def parse_cr(cr_path, output_json=None): """ Parse all tracked changes in a CR DOCX. Returns list of change dicts. Optionally saves to JSON. """ doc = docx.Document(str(cr_path)) body = doc.element.body changes = [] _parse_body(body, changes) if output_json: Path(output_json).write_text( json.dumps(changes, indent=2, ensure_ascii=False), encoding='utf-8' ) return changes # ── CLI ─────────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description='Parse CR DOCX tracked changes into JSON manifest.') ap.add_argument('cr_docx', help='CR DOCX file path') ap.add_argument('--output', default=None, help='Output JSON path (default: print to stdout)') args = ap.parse_args() changes = parse_cr(args.cr_docx, output_json=args.output) if args.output: print(f'Wrote {len(changes)} change(s) → {args.output}') else: print(json.dumps(changes, indent=2, ensure_ascii=False)) if __name__ == '__main__': main()