#!/usr/bin/env python3 """ ts_applicator.py — Apply a CR change manifest to a TS DOCX as tracked changes. Reads a JSON manifest produced by cr_parser.py and applies every change to the target TS using docx_helpers tracked-change primitives. Usage: python3 ts_applicator.py [--author NAME] [--output path] # or import: from ts_applicator import apply_manifest """ import argparse import json import re import sys from pathlib import Path import docx from docx.oxml import OxmlElement from docx.oxml.ns import qn sys.path.insert(0, str(Path(__file__).parent)) from docx_helpers import ( RevCounter, tracked_modify_para, tracked_insert_paras_after, AUTHOR as DEFAULT_AUTHOR, DATE as DEFAULT_DATE, ) # ── Text normalisation ──────────────────────────────────────────────────────── def _norm(text): """Normalise non-breaking spaces and common Unicode dashes for comparison.""" return (text .replace('\xa0', ' ') .replace('\u2013', '-') .replace('\u2014', '-') .strip()) def _norm_ws(text): """ Strip all whitespace for structural matching. ETSI TS files store structured paragraphs (references, abbreviations, headings) with a TAB between the code and the body text, e.g.: '[27]\\tGlobalPlatform: ...' 'CLT\\tContactLess Tunnelling' '8.3\\tRAM implementation over HTTPS' The CR's text extraction concatenates runs directly, losing the tab: '[27]GlobalPlatform: ...' 'CLTContactLess Tunnelling' '8.3RAM implementation over HTTPS' Removing all whitespace from both sides before comparing solves this. Used as a third-level fallback (confidence 0.8) after exact and NBSP-norm. """ base = (text .replace('\xa0', '') .replace('\u2013', '-') .replace('\u2014', '-')) return re.sub(r'\s+', '', base) # ── Document search helpers ─────────────────────────────────────────────────── def _full_para_text(para): """All text content including w:t (normal/inserted) and w:delText (deleted runs).""" el = para._element return ''.join(t.text or '' for t in el.findall('.//' + qn('w:t'))) + \ ''.join(t.text or '' for t in el.findall('.//' + qn('w:delText'))) def _find_para(doc, search_text, prefer_not_in_table=False): """ Find the first paragraph containing search_text. Four levels of matching, in order of confidence: 1.0 — exact substring match 0.9 — NBSP/dash-normalised match (_norm) 0.8 — whitespace-stripped match (_norm_ws) handles tab vs nothing in structured paragraphs (refs '[27]\\t...', abbrevs 'CLT\\t...', headings '8.3\\t...') 0.6 — full XML text (including w:del content): handles anchors that were previously deleted by tracked_modify_para in an earlier apply step Returns (para, confidence) or (None, 0.0). """ norm_search = _norm(search_text) ws_search = _norm_ws(search_text) candidates_exact = [] candidates_norm = [] candidates_ws = [] candidates_del = [] for para in doc.paragraphs: pt = para.text if search_text in pt: candidates_exact.append(para) elif norm_search and norm_search in _norm(pt): candidates_norm.append(para) elif ws_search and ws_search in _norm_ws(pt): candidates_ws.append(para) else: # Level 4: check full XML text (catches deleted-but-still-present paragraphs) full_pt = _full_para_text(para) if search_text in full_pt: candidates_del.append(para) elif ws_search and ws_search in _norm_ws(full_pt): candidates_del.append(para) def _in_table(para): p = para._element return any(a.tag == qn('w:tc') for a in p.iterancestors()) for pool, conf in [(candidates_exact, 1.0), (candidates_norm, 0.9), (candidates_ws, 0.8), (candidates_del, 0.6)]: if not pool: continue if prefer_not_in_table: body_only = [p for p in pool if not _in_table(p)] if body_only: return body_only[0], conf return pool[0], conf return None, 0.0 def _find_table_by_section(doc, section_heading): """ Find the table immediately following a paragraph that contains section_heading. Checks both w:t (plain/inserted) and w:delText (tracked-deleted) so the match survives even after the heading was wrapped in a tracked deletion. Empty paragraphs between the heading and the table are tolerated. Returns (table, confidence) or (None, 0.0). """ if not section_heading: return None, 0.0 norm_h = _norm(section_heading) ws_h = _norm_ws(section_heading) heading_seen = False for element in doc.element.body: tag = element.tag.split('}')[-1] if '}' in element.tag else element.tag if tag == 'p': t_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:t'))) d_text = ''.join(t.text or '' for t in element.findall('.//' + qn('w:delText'))) full = (t_text + d_text).strip() if not full: continue # skip empty paras, keep heading_seen state if (section_heading in full or norm_h in _norm(full) or ws_h in _norm_ws(full)): heading_seen = True else: heading_seen = False # non-matching non-empty para resets elif tag == 'tbl': if heading_seen: for tbl in doc.tables: if tbl._tbl is element: return tbl, 1.0 heading_seen = False return None, 0.0 def _find_table(doc, header_key): """ Find a table whose first row cell texts start with header_key. Returns (table, confidence) or (None, 0.0). """ norm_key = [_norm(h) for h in header_key] for tbl in doc.tables: if not tbl.rows: continue first_row_texts = [_norm(c.text) for c in tbl.rows[0].cells] # Match by prefix (header_key may have fewer columns) match = all( i < len(first_row_texts) and norm_key[i] in first_row_texts[i] for i in range(len(norm_key)) ) if match: return tbl, 1.0 return None, 0.0 def _find_row(tbl, anchor_text): """ Find first row in tbl where col-0 cell text contains anchor_text. Returns (row_idx, confidence) or (-1, 0.0). Three confidence levels: 1.0 exact, 0.9 norm, 0.8 whitespace-stripped. """ norm_anchor = _norm(anchor_text) ws_anchor = _norm_ws(anchor_text) best = (-1, 0.0) for idx, row in enumerate(tbl.rows): cell0 = row.cells[0].text if row.cells else '' if anchor_text in cell0: return idx, 1.0 if norm_anchor and norm_anchor in _norm(cell0) and best[1] < 0.9: best = (idx, 0.9) elif ws_anchor and ws_anchor in _norm_ws(cell0) and best[1] < 0.8: best = (idx, 0.8) return best # ── vMerge row insertion ────────────────────────────────────────────────────── def _build_new_tr(cells_data, rev, author, date): """ Build and return a new tracked-insert element (does NOT insert it). cells_data: list of dicts with keys: text, width, vmerge, style. """ def _ins_attr(): return {qn('w:id'): rev.next(), qn('w:author'): author, qn('w:date'): date} def _make_t(text, tag='w:t'): t = OxmlElement(tag) t.text = text or '' if text and (text[0] in (' ', '\t') or text[-1] in (' ', '\t')): t.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve') return t def _make_run(text): r = OxmlElement('w:r') r.append(_make_t(text)) return r new_tr = OxmlElement('w:tr') # trPr: tracked row insertion trPr = OxmlElement('w:trPr') tr_ins = OxmlElement('w:ins') for k, v in _ins_attr().items(): tr_ins.set(k, v) trPr.append(tr_ins) new_tr.append(trPr) for cd in cells_data: tc = OxmlElement('w:tc') tcPr = OxmlElement('w:tcPr') tcW = OxmlElement('w:tcW') if cd.get('width'): tcW.set(qn('w:w'), str(cd['width'])) tcW.set(qn('w:type'), 'dxa') tcPr.append(tcW) if cd.get('vmerge'): vm = OxmlElement('w:vMerge') tcPr.append(vm) tc.append(tcPr) p = OxmlElement('w:p') pPr = OxmlElement('w:pPr') if cd.get('style'): pStyle = OxmlElement('w:pStyle') pStyle.set(qn('w:val'), cd['style']) pPr.append(pStyle) rPr_para = OxmlElement('w:rPr') pm_ins = OxmlElement('w:ins') for k, v in _ins_attr().items(): pm_ins.set(k, v) rPr_para.append(pm_ins) pPr.append(rPr_para) p.append(pPr) if cd.get('text') and not cd.get('vmerge'): ins_el = OxmlElement('w:ins') for k, v in _ins_attr().items(): ins_el.set(k, v) ins_el.append(_make_run(cd['text'])) p.append(ins_el) tc.append(p) new_tr.append(tc) return new_tr def _insert_vmerge_row(tbl, after_row_idx, cells_data, rev, author, date): """ Insert a tracked row after row[after_row_idx]. cells_data: list of dicts with keys: text, width, vmerge, style. Returns the inserted element. """ new_tr = _build_new_tr(cells_data, rev, author, date) ref_tr = tbl.rows[after_row_idx]._tr ref_tr.addnext(new_tr) return new_tr # ── Section replace (direct XML transplant) ─────────────────────────────────── def _apply_section_replace(doc, change, rev, author, date, log): """ Transplant a block of CR elements (del section + ins section) directly into the TS, replacing the old heading+table at the matching location. This mirrors what Word does on copy-paste: the exact XML from the CR is cloned into the TS, with only the tracked-change revision IDs remapped to avoid conflicts. """ from lxml import etree import copy loc = change['location'] del_heading = loc.get('del_heading', '') has_del_table = loc.get('has_del_table', False) elements_xml = change.get('elements_xml', []) if not elements_xml: log.append(' SKIP section_replace: no elements in manifest') return False # ── Find the TS paragraph that matches the deleted heading ───────────────── ts_para_elem = None if del_heading: for para in doc.paragraphs: pt = para.text if del_heading in pt or _norm(del_heading) in _norm(pt): ts_para_elem = para._element break if ts_para_elem is None: # Fallback: include paragraphs whose XML text (inc. del runs) matches for para in doc.paragraphs: if del_heading in _full_para_text(para): ts_para_elem = para._element break if ts_para_elem is None: log.append(f' SKIP section_replace: del_heading {del_heading!r} not found in TS') return False ts_body = ts_para_elem.getparent() # ── Find the table immediately after the heading (if applicable) ─────────── ts_tbl_elem = None if has_del_table: found_para = False for sib in ts_body: if sib is ts_para_elem: found_para = True continue if not found_para: continue sib_tag = sib.tag.split('}')[-1] if '}' in sib.tag else sib.tag if sib_tag == 'p': # Allow empty paragraphs between heading and table if not (''.join(t.text or '' for t in sib.findall('.//' + qn('w:t')))).strip(): continue break # non-empty paragraph before table → no table to remove elif sib_tag == 'tbl': ts_tbl_elem = sib break else: break # ── Clone and remap IDs on the CR elements ───────────────────────────────── cloned = [] for xml_str in elements_xml: elem = etree.fromstring(xml_str) cloned_elem = copy.deepcopy(elem) # Remap w:id in all tracked-change elements (must be unique per document) for el in cloned_elem.iter(): if el.get(qn('w:id')) is not None: el.set(qn('w:id'), rev.next()) cloned.append(cloned_elem) # ── Insert cloned elements before the TS heading paragraph ──────────────── insert_idx = list(ts_body).index(ts_para_elem) for i, elem in enumerate(cloned): ts_body.insert(insert_idx + i, elem) # ── Remove the now-replaced TS elements ─────────────────────────────────── ts_body.remove(ts_para_elem) if ts_tbl_elem is not None: ts_body.remove(ts_tbl_elem) n_del = sum(1 for x in elements_xml if 'w:del' in x[:200]) log.append( f' OK section_replace: {del_heading!r} → {len(elements_xml)} element(s) spliced in' f' (removed heading{"+ table" if has_del_table else ""})' ) return True # ── Per-change-type applicators ─────────────────────────────────────────────── def _apply_text_replace(doc, change, rev, author, date, log): loc = change['location'] old = change['old'] new = change['new'] if loc['kind'] == 'table_cell': tbl, t_conf = _find_table(doc, loc['table_header']) if tbl is None: log.append(f" SKIP text_replace: table not found {loc['table_header'][:2]!r}") return False col_idx = loc['col_idx'] row_anchor = loc['row_anchor'] if row_anchor: row_idx, r_conf = _find_row(tbl, row_anchor) if row_idx < 0: log.append(f" SKIP text_replace: row anchor not found {row_anchor!r}") return False row = tbl.rows[row_idx] if col_idx >= len(row.cells): log.append(f" SKIP text_replace: col_idx {col_idx} out of range") return False cell = row.cells[col_idx] for para in cell.paragraphs: if old in para.text: tracked_modify_para(para, old, new, rev, author, date) log.append(f" OK text_replace (table_cell row={row_idx} col={col_idx}): {old!r} → {new!r}") return True log.append(f" SKIP text_replace: old text {old!r} not in cell (row={row_idx} col={col_idx})") return False else: # Empty row anchor: scan all rows in col_idx. # Prefer the table that follows the section heading (e.g. "Thirty fifth byte:") # because all-empty table headers match any table. section_heading = loc.get('section_heading', '') tbl_by_section, _ = _find_table_by_section(doc, section_heading) if tbl_by_section is not None: tables_to_try = [tbl_by_section] + [t for t in doc.tables if t is not tbl_by_section] else: tables_to_try = [tbl] + [t for t in doc.tables if t is not tbl] for search_tbl in tables_to_try: for r_idx, row in enumerate(search_tbl.rows): if col_idx >= len(row.cells): continue cell = row.cells[col_idx] for para in cell.paragraphs: if old in para.text: tracked_modify_para(para, old, new, rev, author, date) log.append(f" OK text_replace (table_cell scan row={r_idx} col={col_idx}): {old!r} → {new!r}") return True # Final fallback: scan ALL columns of ALL tables _all_start = tbl_by_section if tbl_by_section is not None else tbl for search_tbl in [_all_start] + [t for t in doc.tables if t is not _all_start]: for r_idx, row in enumerate(search_tbl.rows): for c_idx, cell in enumerate(row.cells): for para in cell.paragraphs: if old in para.text: tracked_modify_para(para, old, new, rev, author, date) log.append(f" OK text_replace (table_cell any_col row={r_idx} col={c_idx}): {old!r} → {new!r}") return True log.append(f" SKIP text_replace: old text {old!r} not found in any table column") return False elif loc['kind'] == 'body_para': ctx = loc.get('para_context', '') # Try to find the paragraph by old text first para, conf = _find_para(doc, old, prefer_not_in_table=True) if para is None: # Fall back: find by paragraph context para, conf = _find_para(doc, ctx, prefer_not_in_table=True) if para is None: log.append(f" SKIP text_replace: old text {old!r} not found in TS") return False if old in para.text: tracked_modify_para(para, old, new, rev, author, date) log.append(f" OK text_replace (body_para conf={conf:.1f}): {old!r} → {new!r}") return True log.append(f" SKIP text_replace: old text {old!r} not in resolved paragraph") return False log.append(f" SKIP text_replace: unknown kind {loc['kind']!r}") return False def _apply_para_insert(doc, change, rev, author, date, log): anchor_text = change['location'].get('anchor_text', '') paras_data = change.get('paragraphs', []) if not paras_data: return True anchor_para, conf = _find_para(doc, anchor_text) if anchor_para is None: log.append(f" SKIP para_insert: anchor not found {anchor_text[:60]!r}") return False items = [(p['text'], p['style'] or 'Normal') for p in paras_data] tracked_insert_paras_after(anchor_para, items, rev, author, date) first_text = paras_data[0]['text'][:50] if paras_data else '' log.append(f" OK para_insert ({len(paras_data)} para(s) after anchor conf={conf:.1f}): {first_text!r}...") return True def _apply_row_insert(doc, change, rev, author, date, log, last_inserted=None): loc = change['location'] # Prefer table located by section heading (handles ambiguous all-empty headers) section_heading = loc.get('section_heading', '') tbl_by_section, _ = _find_table_by_section(doc, section_heading) if tbl_by_section is not None: tbl = tbl_by_section else: tbl, t_conf = _find_table(doc, loc['table_header']) if tbl is None: log.append(f" SKIP row_insert: table not found {loc['table_header'][:2]!r}") return False after_anchor = loc.get('after_row_anchor', '') row_idx, r_conf = _find_row(tbl, after_anchor) if row_idx < 0: log.append(f" SKIP row_insert: anchor row not found {after_anchor!r}") return False cells_data = change.get('cells', []) # Fix insertion ordering: when multiple rows target the same (tbl, row_idx), # each new row should go AFTER the previously inserted one, not after row_idx. # last_inserted maps (tbl._tbl id, row_idx) → last w:tr element inserted there. key = (id(tbl._tbl), row_idx) if last_inserted is not None and key in last_inserted: # Insert after the previously inserted row to maintain forward order prev_tr = last_inserted[key] new_tr = _build_new_tr(cells_data, rev, author, date) prev_tr.addnext(new_tr) last_inserted[key] = new_tr else: new_tr = _insert_vmerge_row(tbl, row_idx, cells_data, rev, author, date) if last_inserted is not None: last_inserted[key] = new_tr desc = cells_data[1]['text'] if len(cells_data) > 1 else '?' log.append(f" OK row_insert after row[{row_idx}] ({after_anchor!r}): {desc!r}") return True # ── Manifest pre-processing ─────────────────────────────────────────────────── def _merge_para_inserts(manifest): """ Merge consecutive para_insert entries that share the same anchor_text. When the CR parser emits multiple para_insert entries for the same anchor (because [...] context markers were transparent and kept prev_stable_text unchanged), each would call tracked_insert_paras_after independently. Since each call starts from the same anchor element and uses addnext(), later groups push earlier groups down — producing reversed order. Merging them into one entry ensures a single tracked_insert_paras_after call that inserts all paragraphs in the correct forward order. """ result = [] for change in manifest: if (change.get('type') == 'para_insert' and result and result[-1].get('type') == 'para_insert' and result[-1]['location']['anchor_text'] == change['location']['anchor_text']): result[-1]['paragraphs'].extend(change['paragraphs']) else: merged = dict(change) if change.get('type') == 'para_insert': merged['paragraphs'] = list(change['paragraphs']) result.append(merged) return result # ── Main apply function ─────────────────────────────────────────────────────── def apply_manifest(ts_path, manifest, out_path, author=DEFAULT_AUTHOR, date=DEFAULT_DATE): """ Apply all changes in manifest to ts_path, save to out_path. Returns (n_ok, n_skipped, log_lines). """ doc = docx.Document(str(ts_path)) rev = RevCounter(doc) log = [] n_ok = 0 n_skip = 0 manifest = _merge_para_inserts(manifest) # Track last inserted per (tbl_id, anchor_row_idx) to maintain # forward insertion order when multiple row_inserts target the same anchor. last_inserted = {} for change in manifest: ctype = change.get('type') ok = False if ctype == 'section_replace': ok = _apply_section_replace(doc, change, rev, author, date, log) elif ctype == 'text_replace': ok = _apply_text_replace(doc, change, rev, author, date, log) elif ctype == 'para_insert': ok = _apply_para_insert(doc, change, rev, author, date, log) elif ctype == 'row_insert': ok = _apply_row_insert(doc, change, rev, author, date, log, last_inserted=last_inserted) else: log.append(f" SKIP unknown change type: {ctype!r}") if ok: n_ok += 1 else: n_skip += 1 doc.save(str(out_path)) return n_ok, n_skip, log # ── CLI ─────────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description='Apply CR manifest to TS DOCX as tracked changes.') ap.add_argument('ts_docx', help='Target TS DOCX file') ap.add_argument('manifest', help='JSON manifest from cr_parser.py') ap.add_argument('--author', default=DEFAULT_AUTHOR, help='Tracked change author') ap.add_argument('--output', default=None, help='Output path (default: _applied.docx)') args = ap.parse_args() ts_path = Path(args.ts_docx) out_path = Path(args.output) if args.output else ts_path.parent / (ts_path.stem + '_applied.docx') with open(args.manifest, encoding='utf-8') as f: manifest = json.load(f) print(f'Applying {len(manifest)} change(s) from manifest to {ts_path.name}...') n_ok, n_skip, log = apply_manifest(ts_path, manifest, out_path, author=args.author) for line in log: print(line) print(f'\nResult: {n_ok} applied, {n_skip} skipped') print(f'Output: {out_path}') if __name__ == '__main__': main()