ApplyCRs / scripts /ts_applicator.py
heymenn's picture
init
7eedaf8
#!/usr/bin/env python3
"""
ts_applicator.py β€” Apply a CR change manifest to a TS DOCX as tracked changes.
Reads a JSON manifest produced by cr_parser.py and applies every change
to the target TS using docx_helpers tracked-change primitives.
Usage:
python3 ts_applicator.py <ts.docx> <manifest.json> [--author NAME] [--output path]
# or import: from ts_applicator import apply_manifest
"""
import argparse
import json
import re
import sys
from pathlib import Path
import docx
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
sys.path.insert(0, str(Path(__file__).parent))
from docx_helpers import (
RevCounter,
tracked_modify_para,
tracked_insert_paras_after,
AUTHOR as DEFAULT_AUTHOR,
DATE as DEFAULT_DATE,
)
# ── Text normalisation ────────────────────────────────────────────────────────
def _norm(text):
"""Normalise non-breaking spaces and common Unicode dashes for comparison."""
return (text
.replace('\xa0', ' ')
.replace('\u2013', '-')
.replace('\u2014', '-')
.strip())
def _norm_ws(text):
"""
Strip all whitespace for structural matching.
ETSI TS files store structured paragraphs (references, abbreviations,
headings) with a TAB between the code and the body text, e.g.:
'[27]\\tGlobalPlatform: ...'
'CLT\\tContactLess Tunnelling'
'8.3\\tRAM implementation over HTTPS'
The CR's text extraction concatenates runs directly, losing the tab:
'[27]GlobalPlatform: ...'
'CLTContactLess Tunnelling'
'8.3RAM implementation over HTTPS'
Removing all whitespace from both sides before comparing solves this.
Used as a third-level fallback (confidence 0.8) after exact and NBSP-norm.
"""
base = (text
.replace('\xa0', '')
.replace('\u2013', '-')
.replace('\u2014', '-'))
return re.sub(r'\s+', '', base)
# ── Document search helpers ───────────────────────────────────────────────────
def _full_para_text(para):
"""All text content including w:t (normal/inserted) and w:delText (deleted runs)."""
el = para._element
return ''.join(t.text or '' for t in el.findall('.//' + qn('w:t'))) + \
''.join(t.text or '' for t in el.findall('.//' + qn('w:delText')))
def _find_para(doc, search_text, prefer_not_in_table=False):
"""
Find the first paragraph containing search_text.
Four levels of matching, in order of confidence:
1.0 β€” exact substring match
0.9 β€” NBSP/dash-normalised match (_norm)
0.8 β€” whitespace-stripped match (_norm_ws) handles tab vs nothing in
structured paragraphs (refs '[27]\\t...', abbrevs 'CLT\\t...', headings '8.3\\t...')
0.6 β€” full XML text (including w:del content): handles anchors that were
previously deleted by tracked_modify_para in an earlier apply step
Returns (para, confidence) or (None, 0.0).
"""
norm_search = _norm(search_text)
ws_search = _norm_ws(search_text)
candidates_exact = []
candidates_norm = []
candidates_ws = []
candidates_del = []
for para in doc.paragraphs:
pt = para.text
if search_text in pt:
candidates_exact.append(para)
elif norm_search and norm_search in _norm(pt):
candidates_norm.append(para)
elif ws_search and ws_search in _norm_ws(pt):
candidates_ws.append(para)
else:
# Level 4: check full XML text (catches deleted-but-still-present paragraphs)
full_pt = _full_para_text(para)
if search_text in full_pt:
candidates_del.append(para)
elif ws_search and ws_search in _norm_ws(full_pt):
candidates_del.append(para)
def _in_table(para):
p = para._element
return any(a.tag == qn('w:tc') for a in p.iterancestors())
for pool, conf in [(candidates_exact, 1.0), (candidates_norm, 0.9),
(candidates_ws, 0.8), (candidates_del, 0.6)]:
if not pool:
continue
if prefer_not_in_table:
body_only = [p for p in pool if not _in_table(p)]
if body_only:
return body_only[0], conf
return pool[0], conf
return None, 0.0
def _find_table_by_section(doc, section_heading):
"""
Find the table immediately following a paragraph that contains section_heading.
Checks both w:t (plain/inserted) and w:delText (tracked-deleted) so the match
survives even after the heading was wrapped in a tracked deletion.
Empty paragraphs between the heading and the table are tolerated.
Returns (table, confidence) or (None, 0.0).
"""
if not section_heading:
return None, 0.0
norm_h = _norm(section_heading)
ws_h = _norm_ws(section_heading)
heading_seen = False
for element in doc.element.body:
tag = element.tag.split('}')[-1] if '}' in element.tag else element.tag
if tag == 'p':
t_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:t')))
d_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:delText')))
full = (t_text + d_text).strip()
if not full:
continue # skip empty paras, keep heading_seen state
if (section_heading in full
or norm_h in _norm(full)
or ws_h in _norm_ws(full)):
heading_seen = True
else:
heading_seen = False # non-matching non-empty para resets
elif tag == 'tbl':
if heading_seen:
for tbl in doc.tables:
if tbl._tbl is element:
return tbl, 1.0
heading_seen = False
return None, 0.0
def _find_table(doc, header_key):
"""
Find a table whose first row cell texts start with header_key.
Returns (table, confidence) or (None, 0.0).
"""
norm_key = [_norm(h) for h in header_key]
for tbl in doc.tables:
if not tbl.rows:
continue
first_row_texts = [_norm(c.text) for c in tbl.rows[0].cells]
# Match by prefix (header_key may have fewer columns)
match = all(
i < len(first_row_texts) and norm_key[i] in first_row_texts[i]
for i in range(len(norm_key))
)
if match:
return tbl, 1.0
return None, 0.0
def _find_row(tbl, anchor_text):
"""
Find first row in tbl where col-0 cell text contains anchor_text.
Returns (row_idx, confidence) or (-1, 0.0).
Three confidence levels: 1.0 exact, 0.9 norm, 0.8 whitespace-stripped.
"""
norm_anchor = _norm(anchor_text)
ws_anchor = _norm_ws(anchor_text)
best = (-1, 0.0)
for idx, row in enumerate(tbl.rows):
cell0 = row.cells[0].text if row.cells else ''
if anchor_text in cell0:
return idx, 1.0
if norm_anchor and norm_anchor in _norm(cell0) and best[1] < 0.9:
best = (idx, 0.9)
elif ws_anchor and ws_anchor in _norm_ws(cell0) and best[1] < 0.8:
best = (idx, 0.8)
return best
# ── vMerge row insertion ──────────────────────────────────────────────────────
def _build_new_tr(cells_data, rev, author, date):
"""
Build and return a new tracked-insert <w:tr> element (does NOT insert it).
cells_data: list of dicts with keys: text, width, vmerge, style.
"""
def _ins_attr():
return {qn('w:id'): rev.next(), qn('w:author'): author, qn('w:date'): date}
def _make_t(text, tag='w:t'):
t = OxmlElement(tag)
t.text = text or ''
if text and (text[0] in (' ', '\t') or text[-1] in (' ', '\t')):
t.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve')
return t
def _make_run(text):
r = OxmlElement('w:r')
r.append(_make_t(text))
return r
new_tr = OxmlElement('w:tr')
# trPr: tracked row insertion
trPr = OxmlElement('w:trPr')
tr_ins = OxmlElement('w:ins')
for k, v in _ins_attr().items():
tr_ins.set(k, v)
trPr.append(tr_ins)
new_tr.append(trPr)
for cd in cells_data:
tc = OxmlElement('w:tc')
tcPr = OxmlElement('w:tcPr')
tcW = OxmlElement('w:tcW')
if cd.get('width'):
tcW.set(qn('w:w'), str(cd['width']))
tcW.set(qn('w:type'), 'dxa')
tcPr.append(tcW)
if cd.get('vmerge'):
vm = OxmlElement('w:vMerge')
tcPr.append(vm)
tc.append(tcPr)
p = OxmlElement('w:p')
pPr = OxmlElement('w:pPr')
if cd.get('style'):
pStyle = OxmlElement('w:pStyle')
pStyle.set(qn('w:val'), cd['style'])
pPr.append(pStyle)
rPr_para = OxmlElement('w:rPr')
pm_ins = OxmlElement('w:ins')
for k, v in _ins_attr().items():
pm_ins.set(k, v)
rPr_para.append(pm_ins)
pPr.append(rPr_para)
p.append(pPr)
if cd.get('text') and not cd.get('vmerge'):
ins_el = OxmlElement('w:ins')
for k, v in _ins_attr().items():
ins_el.set(k, v)
ins_el.append(_make_run(cd['text']))
p.append(ins_el)
tc.append(p)
new_tr.append(tc)
return new_tr
def _insert_vmerge_row(tbl, after_row_idx, cells_data, rev, author, date):
"""
Insert a tracked row after row[after_row_idx].
cells_data: list of dicts with keys: text, width, vmerge, style.
Returns the inserted <w:tr> element.
"""
new_tr = _build_new_tr(cells_data, rev, author, date)
ref_tr = tbl.rows[after_row_idx]._tr
ref_tr.addnext(new_tr)
return new_tr
# ── Section replace (direct XML transplant) ───────────────────────────────────
def _apply_section_replace(doc, change, rev, author, date, log):
"""
Transplant a block of CR elements (del section + ins section) directly into
the TS, replacing the old heading+table at the matching location.
This mirrors what Word does on copy-paste: the exact XML from the CR is
cloned into the TS, with only the tracked-change revision IDs remapped to
avoid conflicts.
"""
from lxml import etree
import copy
loc = change['location']
del_heading = loc.get('del_heading', '')
has_del_table = loc.get('has_del_table', False)
elements_xml = change.get('elements_xml', [])
if not elements_xml:
log.append(' SKIP section_replace: no elements in manifest')
return False
# ── Find the TS paragraph that matches the deleted heading ─────────────────
ts_para_elem = None
if del_heading:
for para in doc.paragraphs:
pt = para.text
if del_heading in pt or _norm(del_heading) in _norm(pt):
ts_para_elem = para._element
break
if ts_para_elem is None:
# Fallback: include paragraphs whose XML text (inc. del runs) matches
for para in doc.paragraphs:
if del_heading in _full_para_text(para):
ts_para_elem = para._element
break
if ts_para_elem is None:
log.append(f' SKIP section_replace: del_heading {del_heading!r} not found in TS')
return False
ts_body = ts_para_elem.getparent()
# ── Find the table immediately after the heading (if applicable) ───────────
ts_tbl_elem = None
if has_del_table:
found_para = False
for sib in ts_body:
if sib is ts_para_elem:
found_para = True
continue
if not found_para:
continue
sib_tag = sib.tag.split('}')[-1] if '}' in sib.tag else sib.tag
if sib_tag == 'p':
# Allow empty paragraphs between heading and table
if not (''.join(t.text or '' for t in sib.findall('.//' + qn('w:t')))).strip():
continue
break # non-empty paragraph before table β†’ no table to remove
elif sib_tag == 'tbl':
ts_tbl_elem = sib
break
else:
break
# ── Clone and remap IDs on the CR elements ─────────────────────────────────
cloned = []
for xml_str in elements_xml:
elem = etree.fromstring(xml_str)
cloned_elem = copy.deepcopy(elem)
# Remap w:id in all tracked-change elements (must be unique per document)
for el in cloned_elem.iter():
if el.get(qn('w:id')) is not None:
el.set(qn('w:id'), rev.next())
cloned.append(cloned_elem)
# ── Insert cloned elements before the TS heading paragraph ────────────────
insert_idx = list(ts_body).index(ts_para_elem)
for i, elem in enumerate(cloned):
ts_body.insert(insert_idx + i, elem)
# ── Remove the now-replaced TS elements ───────────────────────────────────
ts_body.remove(ts_para_elem)
if ts_tbl_elem is not None:
ts_body.remove(ts_tbl_elem)
n_del = sum(1 for x in elements_xml if 'w:del' in x[:200])
log.append(
f' OK section_replace: {del_heading!r} β†’ {len(elements_xml)} element(s) spliced in'
f' (removed heading{"+ table" if has_del_table else ""})'
)
return True
# ── Per-change-type applicators ───────────────────────────────────────────────
def _apply_text_replace(doc, change, rev, author, date, log):
loc = change['location']
old = change['old']
new = change['new']
if loc['kind'] == 'table_cell':
tbl, t_conf = _find_table(doc, loc['table_header'])
if tbl is None:
log.append(f" SKIP text_replace: table not found {loc['table_header'][:2]!r}")
return False
col_idx = loc['col_idx']
row_anchor = loc['row_anchor']
if row_anchor:
row_idx, r_conf = _find_row(tbl, row_anchor)
if row_idx < 0:
log.append(f" SKIP text_replace: row anchor not found {row_anchor!r}")
return False
row = tbl.rows[row_idx]
if col_idx >= len(row.cells):
log.append(f" SKIP text_replace: col_idx {col_idx} out of range")
return False
cell = row.cells[col_idx]
for para in cell.paragraphs:
if old in para.text:
tracked_modify_para(para, old, new, rev, author, date)
log.append(f" OK text_replace (table_cell row={row_idx} col={col_idx}): {old!r} β†’ {new!r}")
return True
log.append(f" SKIP text_replace: old text {old!r} not in cell (row={row_idx} col={col_idx})")
return False
else:
# Empty row anchor: scan all rows in col_idx.
# Prefer the table that follows the section heading (e.g. "Thirty fifth byte:")
# because all-empty table headers match any table.
section_heading = loc.get('section_heading', '')
tbl_by_section, _ = _find_table_by_section(doc, section_heading)
if tbl_by_section is not None:
tables_to_try = [tbl_by_section] + [t for t in doc.tables if t is not tbl_by_section]
else:
tables_to_try = [tbl] + [t for t in doc.tables if t is not tbl]
for search_tbl in tables_to_try:
for r_idx, row in enumerate(search_tbl.rows):
if col_idx >= len(row.cells):
continue
cell = row.cells[col_idx]
for para in cell.paragraphs:
if old in para.text:
tracked_modify_para(para, old, new, rev, author, date)
log.append(f" OK text_replace (table_cell scan row={r_idx} col={col_idx}): {old!r} β†’ {new!r}")
return True
# Final fallback: scan ALL columns of ALL tables
_all_start = tbl_by_section if tbl_by_section is not None else tbl
for search_tbl in [_all_start] + [t for t in doc.tables if t is not _all_start]:
for r_idx, row in enumerate(search_tbl.rows):
for c_idx, cell in enumerate(row.cells):
for para in cell.paragraphs:
if old in para.text:
tracked_modify_para(para, old, new, rev, author, date)
log.append(f" OK text_replace (table_cell any_col row={r_idx} col={c_idx}): {old!r} β†’ {new!r}")
return True
log.append(f" SKIP text_replace: old text {old!r} not found in any table column")
return False
elif loc['kind'] == 'body_para':
ctx = loc.get('para_context', '')
# Try to find the paragraph by old text first
para, conf = _find_para(doc, old, prefer_not_in_table=True)
if para is None:
# Fall back: find by paragraph context
para, conf = _find_para(doc, ctx, prefer_not_in_table=True)
if para is None:
log.append(f" SKIP text_replace: old text {old!r} not found in TS")
return False
if old in para.text:
tracked_modify_para(para, old, new, rev, author, date)
log.append(f" OK text_replace (body_para conf={conf:.1f}): {old!r} β†’ {new!r}")
return True
log.append(f" SKIP text_replace: old text {old!r} not in resolved paragraph")
return False
log.append(f" SKIP text_replace: unknown kind {loc['kind']!r}")
return False
def _apply_para_insert(doc, change, rev, author, date, log):
anchor_text = change['location'].get('anchor_text', '')
paras_data = change.get('paragraphs', [])
if not paras_data:
return True
anchor_para, conf = _find_para(doc, anchor_text)
if anchor_para is None:
log.append(f" SKIP para_insert: anchor not found {anchor_text[:60]!r}")
return False
items = [(p['text'], p['style'] or 'Normal') for p in paras_data]
tracked_insert_paras_after(anchor_para, items, rev, author, date)
first_text = paras_data[0]['text'][:50] if paras_data else ''
log.append(f" OK para_insert ({len(paras_data)} para(s) after anchor conf={conf:.1f}): {first_text!r}...")
return True
def _apply_row_insert(doc, change, rev, author, date, log, last_inserted=None):
loc = change['location']
# Prefer table located by section heading (handles ambiguous all-empty headers)
section_heading = loc.get('section_heading', '')
tbl_by_section, _ = _find_table_by_section(doc, section_heading)
if tbl_by_section is not None:
tbl = tbl_by_section
else:
tbl, t_conf = _find_table(doc, loc['table_header'])
if tbl is None:
log.append(f" SKIP row_insert: table not found {loc['table_header'][:2]!r}")
return False
after_anchor = loc.get('after_row_anchor', '')
row_idx, r_conf = _find_row(tbl, after_anchor)
if row_idx < 0:
log.append(f" SKIP row_insert: anchor row not found {after_anchor!r}")
return False
cells_data = change.get('cells', [])
# Fix insertion ordering: when multiple rows target the same (tbl, row_idx),
# each new row should go AFTER the previously inserted one, not after row_idx.
# last_inserted maps (tbl._tbl id, row_idx) β†’ last w:tr element inserted there.
key = (id(tbl._tbl), row_idx)
if last_inserted is not None and key in last_inserted:
# Insert after the previously inserted row to maintain forward order
prev_tr = last_inserted[key]
new_tr = _build_new_tr(cells_data, rev, author, date)
prev_tr.addnext(new_tr)
last_inserted[key] = new_tr
else:
new_tr = _insert_vmerge_row(tbl, row_idx, cells_data, rev, author, date)
if last_inserted is not None:
last_inserted[key] = new_tr
desc = cells_data[1]['text'] if len(cells_data) > 1 else '?'
log.append(f" OK row_insert after row[{row_idx}] ({after_anchor!r}): {desc!r}")
return True
# ── Manifest pre-processing ───────────────────────────────────────────────────
def _merge_para_inserts(manifest):
"""
Merge consecutive para_insert entries that share the same anchor_text.
When the CR parser emits multiple para_insert entries for the same anchor
(because [...] context markers were transparent and kept prev_stable_text
unchanged), each would call tracked_insert_paras_after independently.
Since each call starts from the same anchor element and uses addnext(),
later groups push earlier groups down β€” producing reversed order.
Merging them into one entry ensures a single tracked_insert_paras_after
call that inserts all paragraphs in the correct forward order.
"""
result = []
for change in manifest:
if (change.get('type') == 'para_insert'
and result
and result[-1].get('type') == 'para_insert'
and result[-1]['location']['anchor_text'] == change['location']['anchor_text']):
result[-1]['paragraphs'].extend(change['paragraphs'])
else:
merged = dict(change)
if change.get('type') == 'para_insert':
merged['paragraphs'] = list(change['paragraphs'])
result.append(merged)
return result
# ── Main apply function ───────────────────────────────────────────────────────
def apply_manifest(ts_path, manifest, out_path, author=DEFAULT_AUTHOR, date=DEFAULT_DATE):
"""
Apply all changes in manifest to ts_path, save to out_path.
Returns (n_ok, n_skipped, log_lines).
"""
doc = docx.Document(str(ts_path))
rev = RevCounter(doc)
log = []
n_ok = 0
n_skip = 0
manifest = _merge_para_inserts(manifest)
# Track last inserted <w:tr> per (tbl_id, anchor_row_idx) to maintain
# forward insertion order when multiple row_inserts target the same anchor.
last_inserted = {}
for change in manifest:
ctype = change.get('type')
ok = False
if ctype == 'section_replace':
ok = _apply_section_replace(doc, change, rev, author, date, log)
elif ctype == 'text_replace':
ok = _apply_text_replace(doc, change, rev, author, date, log)
elif ctype == 'para_insert':
ok = _apply_para_insert(doc, change, rev, author, date, log)
elif ctype == 'row_insert':
ok = _apply_row_insert(doc, change, rev, author, date, log, last_inserted=last_inserted)
else:
log.append(f" SKIP unknown change type: {ctype!r}")
if ok:
n_ok += 1
else:
n_skip += 1
doc.save(str(out_path))
return n_ok, n_skip, log
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser(description='Apply CR manifest to TS DOCX as tracked changes.')
ap.add_argument('ts_docx', help='Target TS DOCX file')
ap.add_argument('manifest', help='JSON manifest from cr_parser.py')
ap.add_argument('--author', default=DEFAULT_AUTHOR, help='Tracked change author')
ap.add_argument('--output', default=None, help='Output path (default: <ts>_applied.docx)')
args = ap.parse_args()
ts_path = Path(args.ts_docx)
out_path = Path(args.output) if args.output else ts_path.parent / (ts_path.stem + '_applied.docx')
with open(args.manifest, encoding='utf-8') as f:
manifest = json.load(f)
print(f'Applying {len(manifest)} change(s) from manifest to {ts_path.name}...')
n_ok, n_skip, log = apply_manifest(ts_path, manifest, out_path, author=args.author)
for line in log:
print(line)
print(f'\nResult: {n_ok} applied, {n_skip} skipped')
print(f'Output: {out_path}')
if __name__ == '__main__':
main()