Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| cr_parser.py β Parse a CR DOCX's tracked changes into a JSON manifest. | |
| Each entry in the manifest is one of: | |
| {"type": "text_replace", "location": {...}, "old": "...", "new": "..."} | |
| {"type": "para_insert", "location": {...}, "paragraphs": [...]} | |
| {"type": "row_insert", "location": {...}, "cells": [...]} | |
| Usage: | |
| python3 cr_parser.py <cr.docx> [--output manifest.json] | |
| # or import: from cr_parser import parse_cr | |
| """ | |
| import argparse | |
| import json | |
| import re | |
| import sys | |
| from pathlib import Path | |
| import docx | |
| from docx.oxml.ns import qn | |
| # ββ Low-level text helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _del_text(elem): | |
| """Concatenate all w:delText descendants.""" | |
| return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:delText'))) | |
| def _ins_text(elem): | |
| """Concatenate all w:t descendants (inside w:ins).""" | |
| return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:t'))) | |
| def _para_new_text(p_elem): | |
| """Text of a paragraph after accepting tracked changes (ins included, del excluded).""" | |
| return ''.join(t.text or '' for t in p_elem.findall('.//' + qn('w:t'))) | |
| def _para_orig_text(p_elem): | |
| """Text of a paragraph as it exists in the TS (del included, ins excluded).""" | |
| parts = [] | |
| for node in p_elem.iter(): | |
| if node.tag == qn('w:delText') and node.text: | |
| parts.append(node.text) | |
| elif node.tag == qn('w:t') and node.text: | |
| # Skip if inside a w:ins | |
| if not any(a.tag == qn('w:ins') for a in node.iterancestors()): | |
| parts.append(node.text) | |
| return ''.join(parts) | |
| def _style_val(p_elem): | |
| pPr = p_elem.find(qn('w:pPr')) | |
| if pPr is None: | |
| return None | |
| pStyle = pPr.find(qn('w:pStyle')) | |
| if pStyle is None: | |
| return None | |
| return pStyle.get(qn('w:val')) | |
| def _is_rpr_ins(ins_elem): | |
| """True if w:ins is inside w:rPr β a formatting change, not a content insertion.""" | |
| p = ins_elem.getparent() | |
| return p is not None and p.tag == qn('w:rPr') | |
| def _is_inserted_para(p_elem): | |
| """True if this paragraph's paragraph-mark is tracked as inserted (whole new para).""" | |
| pPr = p_elem.find(qn('w:pPr')) | |
| if pPr is None: | |
| return False | |
| rPr = pPr.find(qn('w:rPr')) | |
| if rPr is None: | |
| return False | |
| return rPr.find(qn('w:ins')) is not None | |
| def _is_deleted_para(p_elem): | |
| """True if this paragraph's paragraph-mark is tracked as deleted (whole para deleted).""" | |
| pPr = p_elem.find(qn('w:pPr')) | |
| if pPr is None: | |
| return False | |
| rPr = pPr.find(qn('w:rPr')) | |
| if rPr is None: | |
| return False | |
| return rPr.find(qn('w:del')) is not None | |
| def _is_fully_deleted_tbl(tbl_elem): | |
| """True if every row in the table is tracked as a row-level deletion.""" | |
| rows = tbl_elem.findall(qn('w:tr')) | |
| if not rows: | |
| return False | |
| return all( | |
| tr.find(qn('w:trPr')) is not None and | |
| tr.find(qn('w:trPr')).find(qn('w:del')) is not None | |
| for tr in rows | |
| ) | |
| def _is_fully_inserted_tbl(tbl_elem): | |
| """True if every row in the table is tracked as a row-level insertion.""" | |
| rows = tbl_elem.findall(qn('w:tr')) | |
| if not rows: | |
| return False | |
| return all( | |
| tr.find(qn('w:trPr')) is not None and | |
| tr.find(qn('w:trPr')).find(qn('w:ins')) is not None | |
| for tr in rows | |
| ) | |
| # ββ Table helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _table_header(tbl_elem): | |
| """First row cell texts β used as table identifier.""" | |
| first_tr = tbl_elem.find(qn('w:tr')) | |
| if first_tr is None: | |
| return [] | |
| cells = [] | |
| for tc in first_tr.findall(qn('w:tc')): | |
| p = tc.find('.//' + qn('w:p')) | |
| cells.append(_para_new_text(p).strip() if p is not None else '') | |
| return cells | |
| def _row_col0(tr_elem): | |
| """Col-0 text of a table row β used as row anchor.""" | |
| tc = tr_elem.find(qn('w:tc')) | |
| if tc is None: | |
| return '' | |
| p = tc.find('.//' + qn('w:p')) | |
| return _para_new_text(p).strip() if p is not None else '' | |
| # ββ Inline del+ins extraction (from a single paragraph) ββββββββββββββββββββββ | |
| def _extract_inline_replacements(p_elem): | |
| """ | |
| Return list of (old_text, new_text) pairs from del+ins sibling pairs. | |
| Handles: del-then-ins, ins-then-del, multi-fragment consecutive dels. | |
| Filters: whitespace-only dels with no adjacent ins, empty dels, rPr ins. | |
| """ | |
| children = list(p_elem) | |
| pairs = [] | |
| skip = set() | |
| for i, child in enumerate(children): | |
| if i in skip: | |
| continue | |
| if child.tag != qn('w:del'): | |
| continue | |
| old_text = _del_text(child) | |
| # Empty del (paragraph-mark or line-break deletion) β discard | |
| if not old_text: | |
| skip.add(i) | |
| continue | |
| # Merge consecutive del siblings (multi-fragment deletion) | |
| j = i + 1 | |
| while j < len(children) and children[j].tag == qn('w:del'): | |
| old_text += _del_text(children[j]) | |
| skip.add(j) | |
| j += 1 | |
| # Whitespace-only del: only keep if there's an adjacent ins | |
| next_sib = children[j] if j < len(children) else None | |
| prev_sib = children[i - 1] if i > 0 else None | |
| new_text = None | |
| if next_sib is not None and next_sib.tag == qn('w:ins') and not _is_rpr_ins(next_sib): | |
| new_text = _ins_text(next_sib) | |
| skip.add(j) | |
| elif prev_sib is not None and prev_sib.tag == qn('w:ins') and not _is_rpr_ins(prev_sib): | |
| new_text = _ins_text(prev_sib) | |
| if new_text is None: | |
| if not old_text.strip(): | |
| skip.add(i) | |
| continue # whitespace artefact with no counterpart | |
| # Pure deletion (no replacement) β record with empty new | |
| pairs.append((old_text, '')) | |
| else: | |
| pairs.append((old_text, new_text)) | |
| return pairs | |
| # ββ Table change extraction βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _parse_table(tbl_elem, changes, section_heading=''): | |
| header = _table_header(tbl_elem) | |
| header_key = header[:3] # first 3 columns enough for matching | |
| rows = tbl_elem.findall(qn('w:tr')) | |
| for tr_idx, tr in enumerate(rows): | |
| trPr = tr.find(qn('w:trPr')) | |
| # ββ Tracked row insertion βββββββββββββββββββββββββββββββββββββββββ | |
| if trPr is not None and trPr.find(qn('w:ins')) is not None: | |
| # Find preceding stable row for anchor | |
| after_anchor = '' | |
| for prev_idx in range(tr_idx - 1, -1, -1): | |
| prev_tr = rows[prev_idx] | |
| prev_trPr = prev_tr.find(qn('w:trPr')) | |
| if prev_trPr is None or prev_trPr.find(qn('w:ins')) is None: | |
| after_anchor = _row_col0(prev_tr) | |
| break | |
| cells = [] | |
| for tc in tr.findall(qn('w:tc')): | |
| tcPr = tc.find(qn('w:tcPr')) | |
| # Width | |
| width = None | |
| if tcPr is not None: | |
| tcW = tcPr.find(qn('w:tcW')) | |
| if tcW is not None: | |
| try: | |
| width = int(tcW.get(qn('w:w'), 0)) | |
| except (ValueError, TypeError): | |
| width = None | |
| # vMerge (no w:val attribute = continuation) | |
| is_vmerge = False | |
| if tcPr is not None: | |
| vm = tcPr.find(qn('w:vMerge')) | |
| if vm is not None and vm.get(qn('w:val')) is None: | |
| is_vmerge = True | |
| # Text β prefer ins text, fall back to all text | |
| cell_ins_text = _ins_text(tc) | |
| p = tc.find('.//' + qn('w:p')) | |
| cell_text = cell_ins_text if cell_ins_text else (_para_new_text(p) if p else '') | |
| style = _style_val(p) if p is not None else None | |
| cells.append({ | |
| 'text': cell_text.strip(), | |
| 'width': width, | |
| 'vmerge': is_vmerge, | |
| 'style': style, | |
| }) | |
| changes.append({ | |
| 'type': 'row_insert', | |
| 'location': { | |
| 'kind': 'table_row', | |
| 'table_header': header_key, | |
| 'after_row_anchor': after_anchor, | |
| 'section_heading': section_heading, | |
| }, | |
| 'cells': cells, | |
| }) | |
| continue | |
| # ββ Cell-level text_replace βββββββββββββββββββββββββββββββββββββββ | |
| row_anchor = _row_col0(tr) | |
| tcs = tr.findall(qn('w:tc')) | |
| for col_idx, tc in enumerate(tcs): | |
| for p in tc.findall('.//' + qn('w:p')): | |
| for old_text, new_text in _extract_inline_replacements(p): | |
| if not old_text: | |
| continue | |
| changes.append({ | |
| 'type': 'text_replace', | |
| 'location': { | |
| 'kind': 'table_cell', | |
| 'table_header': header_key, | |
| 'row_anchor': row_anchor, | |
| 'col_idx': col_idx, | |
| 'section_heading': section_heading, | |
| }, | |
| 'old': old_text, | |
| 'new': new_text, | |
| }) | |
| # ββ Body paragraph extraction βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _parse_body(body, changes): | |
| """ | |
| Walk direct children of w:body, emitting changes. | |
| Change types emitted: | |
| section_replace β a contiguous block of fully-deleted elements (para and/or | |
| table, tracked at the paragraph-mark / row level) followed | |
| immediately by a contiguous block of fully-inserted elements. | |
| The raw XML of ALL those CR elements is stored verbatim so | |
| the applicator can transplant them directly into the TS β | |
| exactly what Word does on a copy-paste. | |
| text_replace β an inline del+ins pair inside an otherwise-stable paragraph. | |
| para_insert β one or more wholly-new paragraphs with no corresponding | |
| deletion (rare; kept for backward compatibility). | |
| """ | |
| from lxml import etree | |
| prev_stable_text = '' | |
| # ββ Section-replace accumulator βββββββββββββββββββββββββββββββββββββββββββ | |
| sec_del = [] # fully-deleted elements (CR del block) | |
| sec_sep = [] # empty/separator paragraphs between del and ins blocks | |
| sec_ins = [] # fully-inserted elements (CR ins block) | |
| sec_state = 'stable' # 'stable' | 'del' | 'sep' | 'ins' | |
| sec_anchor = '' | |
| def flush_section(): | |
| nonlocal sec_state, sec_anchor | |
| if not sec_del and not sec_ins: | |
| sec_del.clear(); sec_sep.clear(); sec_ins.clear() | |
| sec_state = 'stable' | |
| return | |
| # The del_heading is the text content of the first deleted paragraph | |
| del_heading = '' | |
| for e in sec_del: | |
| tag = e.tag.split('}')[-1] if '}' in e.tag else e.tag | |
| if tag == 'p': | |
| t = _del_text(e).strip() or _para_orig_text(e).strip() | |
| if t: | |
| del_heading = t | |
| break | |
| # Serialize all elements for the manifest (del + sep + ins) | |
| all_elems = sec_del + sec_sep + sec_ins | |
| elements_xml = [etree.tostring(e, encoding='unicode') for e in all_elems] | |
| has_del_table = any( | |
| (e.tag.split('}')[-1] if '}' in e.tag else e.tag) == 'tbl' | |
| for e in sec_del | |
| ) | |
| changes.append({ | |
| 'type': 'section_replace', | |
| 'location': { | |
| 'kind': 'body', | |
| 'del_heading': del_heading, | |
| 'has_del_table': has_del_table, | |
| 'anchor_text': sec_anchor, | |
| }, | |
| 'elements_xml': elements_xml, | |
| }) | |
| sec_del.clear(); sec_sep.clear(); sec_ins.clear() | |
| sec_state = 'stable' | |
| # ββ Para-insert accumulator (for standalone new paragraphs) βββββββββββββββ | |
| insert_group = [] | |
| def flush_group(): | |
| if not insert_group: | |
| return | |
| paras = [ | |
| {'text': _para_new_text(p).strip(), 'style': _style_val(p)} | |
| for p in insert_group | |
| ] | |
| paras = [p for p in paras if p['text'] or p['style']] | |
| if paras: | |
| changes.append({ | |
| 'type': 'para_insert', | |
| 'location': { | |
| 'kind': 'body', | |
| 'anchor_text': prev_stable_text, | |
| }, | |
| 'paragraphs': paras, | |
| }) | |
| insert_group.clear() | |
| for elem in body: | |
| tag = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag | |
| if tag == 'p': | |
| is_del = _is_deleted_para(elem) | |
| is_ins = _is_inserted_para(elem) | |
| is_empty = not _para_orig_text(elem).strip() and not _para_new_text(elem).strip() | |
| if is_del: | |
| # Start or continue the del block | |
| if sec_state == 'ins': | |
| flush_section() # ins before del = two separate section_replaces | |
| if sec_state == 'stable': | |
| flush_group() | |
| sec_anchor = prev_stable_text | |
| sec_state = 'del' | |
| sec_del.append(elem) | |
| elif is_ins: | |
| if sec_state in ('del', 'sep'): | |
| # ins block follows a del block β part of section_replace | |
| sec_state = 'ins' | |
| sec_ins.append(elem) | |
| elif sec_state == 'ins': | |
| sec_ins.append(elem) | |
| else: | |
| # Standalone ins paragraph (no preceding del block) | |
| flush_group() # (should already be empty) | |
| insert_group.append(elem) | |
| elif is_empty: | |
| if sec_state == 'del': | |
| # Separator between del and ins blocks | |
| sec_state = 'sep' | |
| sec_sep.append(elem) | |
| elif sec_state in ('sep', 'ins'): | |
| sec_ins.append(elem) | |
| else: | |
| # Empty para in stable region β ignore for anchoring | |
| pass | |
| else: | |
| # Stable (or inline-changed) paragraph | |
| flush_section() | |
| flush_group() | |
| for old_text, new_text in _extract_inline_replacements(elem): | |
| if not old_text: | |
| continue | |
| changes.append({ | |
| 'type': 'text_replace', | |
| 'location': { | |
| 'kind': 'body_para', | |
| 'para_context': _para_orig_text(elem).strip(), | |
| }, | |
| 'old': old_text, | |
| 'new': new_text, | |
| }) | |
| orig = _para_orig_text(elem).strip() | |
| if orig and not re.fullmatch(r'\[\.[\s\.]*\]', orig): | |
| prev_stable_text = orig | |
| elif tag == 'tbl': | |
| if _is_fully_deleted_tbl(elem): | |
| if sec_state == 'ins': | |
| flush_section() | |
| if sec_state == 'stable': | |
| flush_group() | |
| sec_anchor = prev_stable_text | |
| sec_state = 'del' | |
| sec_del.append(elem) | |
| elif _is_fully_inserted_tbl(elem): | |
| if sec_state in ('del', 'sep', 'ins'): | |
| sec_state = 'ins' | |
| sec_ins.append(elem) | |
| else: | |
| # Standalone fully-inserted table (no del block) β treat as section_replace | |
| flush_group() | |
| sec_anchor = prev_stable_text | |
| sec_state = 'ins' | |
| sec_ins.append(elem) | |
| else: | |
| # Table with inline cell changes | |
| flush_section() | |
| flush_group() | |
| _parse_table(elem, changes, section_heading=prev_stable_text) | |
| flush_section() | |
| flush_group() | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def parse_cr(cr_path, output_json=None): | |
| """ | |
| Parse all tracked changes in a CR DOCX. | |
| Returns list of change dicts. Optionally saves to JSON. | |
| """ | |
| doc = docx.Document(str(cr_path)) | |
| body = doc.element.body | |
| changes = [] | |
| _parse_body(body, changes) | |
| if output_json: | |
| Path(output_json).write_text( | |
| json.dumps(changes, indent=2, ensure_ascii=False), encoding='utf-8' | |
| ) | |
| return changes | |
| # ββ CLI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| ap = argparse.ArgumentParser(description='Parse CR DOCX tracked changes into JSON manifest.') | |
| ap.add_argument('cr_docx', help='CR DOCX file path') | |
| ap.add_argument('--output', default=None, help='Output JSON path (default: print to stdout)') | |
| args = ap.parse_args() | |
| changes = parse_cr(args.cr_docx, output_json=args.output) | |
| if args.output: | |
| print(f'Wrote {len(changes)} change(s) β {args.output}') | |
| else: | |
| print(json.dumps(changes, indent=2, ensure_ascii=False)) | |
| if __name__ == '__main__': | |
| main() | |