ApplyCRs / scripts /cr_parser.py
heymenn's picture
init
7eedaf8
#!/usr/bin/env python3
"""
cr_parser.py β€” Parse a CR DOCX's tracked changes into a JSON manifest.
Each entry in the manifest is one of:
{"type": "text_replace", "location": {...}, "old": "...", "new": "..."}
{"type": "para_insert", "location": {...}, "paragraphs": [...]}
{"type": "row_insert", "location": {...}, "cells": [...]}
Usage:
python3 cr_parser.py <cr.docx> [--output manifest.json]
# or import: from cr_parser import parse_cr
"""
import argparse
import json
import re
import sys
from pathlib import Path
import docx
from docx.oxml.ns import qn
# ── Low-level text helpers ────────────────────────────────────────────────────
def _del_text(elem):
"""Concatenate all w:delText descendants."""
return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:delText')))
def _ins_text(elem):
"""Concatenate all w:t descendants (inside w:ins)."""
return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:t')))
def _para_new_text(p_elem):
"""Text of a paragraph after accepting tracked changes (ins included, del excluded)."""
return ''.join(t.text or '' for t in p_elem.findall('.//' + qn('w:t')))
def _para_orig_text(p_elem):
"""Text of a paragraph as it exists in the TS (del included, ins excluded)."""
parts = []
for node in p_elem.iter():
if node.tag == qn('w:delText') and node.text:
parts.append(node.text)
elif node.tag == qn('w:t') and node.text:
# Skip if inside a w:ins
if not any(a.tag == qn('w:ins') for a in node.iterancestors()):
parts.append(node.text)
return ''.join(parts)
def _style_val(p_elem):
pPr = p_elem.find(qn('w:pPr'))
if pPr is None:
return None
pStyle = pPr.find(qn('w:pStyle'))
if pStyle is None:
return None
return pStyle.get(qn('w:val'))
def _is_rpr_ins(ins_elem):
"""True if w:ins is inside w:rPr β€” a formatting change, not a content insertion."""
p = ins_elem.getparent()
return p is not None and p.tag == qn('w:rPr')
def _is_inserted_para(p_elem):
"""True if this paragraph's paragraph-mark is tracked as inserted (whole new para)."""
pPr = p_elem.find(qn('w:pPr'))
if pPr is None:
return False
rPr = pPr.find(qn('w:rPr'))
if rPr is None:
return False
return rPr.find(qn('w:ins')) is not None
def _is_deleted_para(p_elem):
"""True if this paragraph's paragraph-mark is tracked as deleted (whole para deleted)."""
pPr = p_elem.find(qn('w:pPr'))
if pPr is None:
return False
rPr = pPr.find(qn('w:rPr'))
if rPr is None:
return False
return rPr.find(qn('w:del')) is not None
def _is_fully_deleted_tbl(tbl_elem):
"""True if every row in the table is tracked as a row-level deletion."""
rows = tbl_elem.findall(qn('w:tr'))
if not rows:
return False
return all(
tr.find(qn('w:trPr')) is not None and
tr.find(qn('w:trPr')).find(qn('w:del')) is not None
for tr in rows
)
def _is_fully_inserted_tbl(tbl_elem):
"""True if every row in the table is tracked as a row-level insertion."""
rows = tbl_elem.findall(qn('w:tr'))
if not rows:
return False
return all(
tr.find(qn('w:trPr')) is not None and
tr.find(qn('w:trPr')).find(qn('w:ins')) is not None
for tr in rows
)
# ── Table helpers ─────────────────────────────────────────────────────────────
def _table_header(tbl_elem):
"""First row cell texts β€” used as table identifier."""
first_tr = tbl_elem.find(qn('w:tr'))
if first_tr is None:
return []
cells = []
for tc in first_tr.findall(qn('w:tc')):
p = tc.find('.//' + qn('w:p'))
cells.append(_para_new_text(p).strip() if p is not None else '')
return cells
def _row_col0(tr_elem):
"""Col-0 text of a table row β€” used as row anchor."""
tc = tr_elem.find(qn('w:tc'))
if tc is None:
return ''
p = tc.find('.//' + qn('w:p'))
return _para_new_text(p).strip() if p is not None else ''
# ── Inline del+ins extraction (from a single paragraph) ──────────────────────
def _extract_inline_replacements(p_elem):
"""
Return list of (old_text, new_text) pairs from del+ins sibling pairs.
Handles: del-then-ins, ins-then-del, multi-fragment consecutive dels.
Filters: whitespace-only dels with no adjacent ins, empty dels, rPr ins.
"""
children = list(p_elem)
pairs = []
skip = set()
for i, child in enumerate(children):
if i in skip:
continue
if child.tag != qn('w:del'):
continue
old_text = _del_text(child)
# Empty del (paragraph-mark or line-break deletion) β€” discard
if not old_text:
skip.add(i)
continue
# Merge consecutive del siblings (multi-fragment deletion)
j = i + 1
while j < len(children) and children[j].tag == qn('w:del'):
old_text += _del_text(children[j])
skip.add(j)
j += 1
# Whitespace-only del: only keep if there's an adjacent ins
next_sib = children[j] if j < len(children) else None
prev_sib = children[i - 1] if i > 0 else None
new_text = None
if next_sib is not None and next_sib.tag == qn('w:ins') and not _is_rpr_ins(next_sib):
new_text = _ins_text(next_sib)
skip.add(j)
elif prev_sib is not None and prev_sib.tag == qn('w:ins') and not _is_rpr_ins(prev_sib):
new_text = _ins_text(prev_sib)
if new_text is None:
if not old_text.strip():
skip.add(i)
continue # whitespace artefact with no counterpart
# Pure deletion (no replacement) β€” record with empty new
pairs.append((old_text, ''))
else:
pairs.append((old_text, new_text))
return pairs
# ── Table change extraction ───────────────────────────────────────────────────
def _parse_table(tbl_elem, changes, section_heading=''):
header = _table_header(tbl_elem)
header_key = header[:3] # first 3 columns enough for matching
rows = tbl_elem.findall(qn('w:tr'))
for tr_idx, tr in enumerate(rows):
trPr = tr.find(qn('w:trPr'))
# ── Tracked row insertion ─────────────────────────────────────────
if trPr is not None and trPr.find(qn('w:ins')) is not None:
# Find preceding stable row for anchor
after_anchor = ''
for prev_idx in range(tr_idx - 1, -1, -1):
prev_tr = rows[prev_idx]
prev_trPr = prev_tr.find(qn('w:trPr'))
if prev_trPr is None or prev_trPr.find(qn('w:ins')) is None:
after_anchor = _row_col0(prev_tr)
break
cells = []
for tc in tr.findall(qn('w:tc')):
tcPr = tc.find(qn('w:tcPr'))
# Width
width = None
if tcPr is not None:
tcW = tcPr.find(qn('w:tcW'))
if tcW is not None:
try:
width = int(tcW.get(qn('w:w'), 0))
except (ValueError, TypeError):
width = None
# vMerge (no w:val attribute = continuation)
is_vmerge = False
if tcPr is not None:
vm = tcPr.find(qn('w:vMerge'))
if vm is not None and vm.get(qn('w:val')) is None:
is_vmerge = True
# Text β€” prefer ins text, fall back to all text
cell_ins_text = _ins_text(tc)
p = tc.find('.//' + qn('w:p'))
cell_text = cell_ins_text if cell_ins_text else (_para_new_text(p) if p else '')
style = _style_val(p) if p is not None else None
cells.append({
'text': cell_text.strip(),
'width': width,
'vmerge': is_vmerge,
'style': style,
})
changes.append({
'type': 'row_insert',
'location': {
'kind': 'table_row',
'table_header': header_key,
'after_row_anchor': after_anchor,
'section_heading': section_heading,
},
'cells': cells,
})
continue
# ── Cell-level text_replace ───────────────────────────────────────
row_anchor = _row_col0(tr)
tcs = tr.findall(qn('w:tc'))
for col_idx, tc in enumerate(tcs):
for p in tc.findall('.//' + qn('w:p')):
for old_text, new_text in _extract_inline_replacements(p):
if not old_text:
continue
changes.append({
'type': 'text_replace',
'location': {
'kind': 'table_cell',
'table_header': header_key,
'row_anchor': row_anchor,
'col_idx': col_idx,
'section_heading': section_heading,
},
'old': old_text,
'new': new_text,
})
# ── Body paragraph extraction ─────────────────────────────────────────────────
def _parse_body(body, changes):
"""
Walk direct children of w:body, emitting changes.
Change types emitted:
section_replace β€” a contiguous block of fully-deleted elements (para and/or
table, tracked at the paragraph-mark / row level) followed
immediately by a contiguous block of fully-inserted elements.
The raw XML of ALL those CR elements is stored verbatim so
the applicator can transplant them directly into the TS β€”
exactly what Word does on a copy-paste.
text_replace β€” an inline del+ins pair inside an otherwise-stable paragraph.
para_insert β€” one or more wholly-new paragraphs with no corresponding
deletion (rare; kept for backward compatibility).
"""
from lxml import etree
prev_stable_text = ''
# ── Section-replace accumulator ───────────────────────────────────────────
sec_del = [] # fully-deleted elements (CR del block)
sec_sep = [] # empty/separator paragraphs between del and ins blocks
sec_ins = [] # fully-inserted elements (CR ins block)
sec_state = 'stable' # 'stable' | 'del' | 'sep' | 'ins'
sec_anchor = ''
def flush_section():
nonlocal sec_state, sec_anchor
if not sec_del and not sec_ins:
sec_del.clear(); sec_sep.clear(); sec_ins.clear()
sec_state = 'stable'
return
# The del_heading is the text content of the first deleted paragraph
del_heading = ''
for e in sec_del:
tag = e.tag.split('}')[-1] if '}' in e.tag else e.tag
if tag == 'p':
t = _del_text(e).strip() or _para_orig_text(e).strip()
if t:
del_heading = t
break
# Serialize all elements for the manifest (del + sep + ins)
all_elems = sec_del + sec_sep + sec_ins
elements_xml = [etree.tostring(e, encoding='unicode') for e in all_elems]
has_del_table = any(
(e.tag.split('}')[-1] if '}' in e.tag else e.tag) == 'tbl'
for e in sec_del
)
changes.append({
'type': 'section_replace',
'location': {
'kind': 'body',
'del_heading': del_heading,
'has_del_table': has_del_table,
'anchor_text': sec_anchor,
},
'elements_xml': elements_xml,
})
sec_del.clear(); sec_sep.clear(); sec_ins.clear()
sec_state = 'stable'
# ── Para-insert accumulator (for standalone new paragraphs) ───────────────
insert_group = []
def flush_group():
if not insert_group:
return
paras = [
{'text': _para_new_text(p).strip(), 'style': _style_val(p)}
for p in insert_group
]
paras = [p for p in paras if p['text'] or p['style']]
if paras:
changes.append({
'type': 'para_insert',
'location': {
'kind': 'body',
'anchor_text': prev_stable_text,
},
'paragraphs': paras,
})
insert_group.clear()
for elem in body:
tag = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag
if tag == 'p':
is_del = _is_deleted_para(elem)
is_ins = _is_inserted_para(elem)
is_empty = not _para_orig_text(elem).strip() and not _para_new_text(elem).strip()
if is_del:
# Start or continue the del block
if sec_state == 'ins':
flush_section() # ins before del = two separate section_replaces
if sec_state == 'stable':
flush_group()
sec_anchor = prev_stable_text
sec_state = 'del'
sec_del.append(elem)
elif is_ins:
if sec_state in ('del', 'sep'):
# ins block follows a del block β†’ part of section_replace
sec_state = 'ins'
sec_ins.append(elem)
elif sec_state == 'ins':
sec_ins.append(elem)
else:
# Standalone ins paragraph (no preceding del block)
flush_group() # (should already be empty)
insert_group.append(elem)
elif is_empty:
if sec_state == 'del':
# Separator between del and ins blocks
sec_state = 'sep'
sec_sep.append(elem)
elif sec_state in ('sep', 'ins'):
sec_ins.append(elem)
else:
# Empty para in stable region β€” ignore for anchoring
pass
else:
# Stable (or inline-changed) paragraph
flush_section()
flush_group()
for old_text, new_text in _extract_inline_replacements(elem):
if not old_text:
continue
changes.append({
'type': 'text_replace',
'location': {
'kind': 'body_para',
'para_context': _para_orig_text(elem).strip(),
},
'old': old_text,
'new': new_text,
})
orig = _para_orig_text(elem).strip()
if orig and not re.fullmatch(r'\[\.[\s\.]*\]', orig):
prev_stable_text = orig
elif tag == 'tbl':
if _is_fully_deleted_tbl(elem):
if sec_state == 'ins':
flush_section()
if sec_state == 'stable':
flush_group()
sec_anchor = prev_stable_text
sec_state = 'del'
sec_del.append(elem)
elif _is_fully_inserted_tbl(elem):
if sec_state in ('del', 'sep', 'ins'):
sec_state = 'ins'
sec_ins.append(elem)
else:
# Standalone fully-inserted table (no del block) β€” treat as section_replace
flush_group()
sec_anchor = prev_stable_text
sec_state = 'ins'
sec_ins.append(elem)
else:
# Table with inline cell changes
flush_section()
flush_group()
_parse_table(elem, changes, section_heading=prev_stable_text)
flush_section()
flush_group()
# ── Public API ────────────────────────────────────────────────────────────────
def parse_cr(cr_path, output_json=None):
"""
Parse all tracked changes in a CR DOCX.
Returns list of change dicts. Optionally saves to JSON.
"""
doc = docx.Document(str(cr_path))
body = doc.element.body
changes = []
_parse_body(body, changes)
if output_json:
Path(output_json).write_text(
json.dumps(changes, indent=2, ensure_ascii=False), encoding='utf-8'
)
return changes
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description='Parse CR DOCX tracked changes into JSON manifest.')
ap.add_argument('cr_docx', help='CR DOCX file path')
ap.add_argument('--output', default=None, help='Output JSON path (default: print to stdout)')
args = ap.parse_args()
changes = parse_cr(args.cr_docx, output_json=args.output)
if args.output:
print(f'Wrote {len(changes)} change(s) β†’ {args.output}')
else:
print(json.dumps(changes, indent=2, ensure_ascii=False))
if __name__ == '__main__':
main()