Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| orchestrate_cr.py β Fully automated CR application pipeline. | |
| Reads an Excel contribution list, downloads all Accepted CRs and their target | |
| TSs, parses tracked changes from each CR, applies them to the TS, and | |
| finalises the document metadata β all without any per-CR manual scripting. | |
| Usage: | |
| python3 orchestrate_cr.py <excel_path> [person_name] [--output-dir DIR] [--author NAME] | |
| Arguments: | |
| excel_path Path to .xls or .xlsx contribution list (Windows paths OK) | |
| person_name Name to match in SubmittedBy column (default: "Ly Thanh PHAN") | |
| Options: | |
| --output-dir Base output folder (default: ~/CR_Processing) | |
| --author Tracked-change author name (default: "CR Application") | |
| """ | |
| import argparse | |
| import contextlib | |
| import datetime | |
| import io | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import docx as docx_lib | |
| # ββ sys.path setup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SCRIPT_DIR = Path(__file__).parent | |
| FETCH_SCRIPTS = SCRIPT_DIR.parent.parent / 'fetch-crs' / 'scripts' | |
| sys.path.insert(0, str(SCRIPT_DIR)) | |
| sys.path.insert(0, str(FETCH_SCRIPTS)) | |
| from fetch_crs import parse_excel, download_cr, parse_cr_cover, download_ts, wsl_path | |
| from cr_parser import parse_cr | |
| from ts_applicator import apply_manifest | |
| from finalize_ts import ( | |
| extract_cr_metadata, | |
| compute_pub_date, | |
| derive_new_version, | |
| update_change_history_table, | |
| update_history_table, | |
| update_title_para, | |
| NoChangeHistoryTable, | |
| ) | |
| from docx_helpers import RevCounter, AUTHOR as DEFAULT_AUTHOR, DATE as DEFAULT_DATE | |
| # ββ Display / logging helpers βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _section(title): | |
| bar = '=' * 60 | |
| print(f'\n{bar}') | |
| print(f' {title}') | |
| print(bar) | |
| class _TeeWriter: | |
| """Writes to both real stdout and a StringIO buffer simultaneously.""" | |
| def __init__(self, real, buf): | |
| self._real = real | |
| self._buf = buf | |
| def write(self, s): | |
| self._real.write(s) | |
| self._buf.write(s) | |
| def flush(self): | |
| self._real.flush() | |
| # ββ Main ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| ap = argparse.ArgumentParser( | |
| description='Fully automated CR application pipeline.', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| ap.add_argument( | |
| 'excel_path', | |
| nargs='?', | |
| default=None, | |
| help='Path to .xls or .xlsx contribution list (not required in --retry-mode)', | |
| ) | |
| ap.add_argument( | |
| 'person_name', | |
| nargs='?', | |
| default='Ly Thanh PHAN', | |
| help='Name to match in SubmittedBy column (default: "Ly Thanh PHAN")', | |
| ) | |
| ap.add_argument( | |
| '--output-dir', | |
| default=str(Path.home() / 'CR_Processing'), | |
| help='Base output directory (default: ~/CR_Processing)', | |
| ) | |
| ap.add_argument( | |
| '--author', | |
| default=DEFAULT_AUTHOR, | |
| help=f'Tracked change author name (default: "{DEFAULT_AUTHOR}")', | |
| ) | |
| ap.add_argument( | |
| '--retry-mode', | |
| action='store_true', | |
| help='Skip steps 1-4; apply CRs to TSs listed in failed_ts.json that now have their DOCX on disk', | |
| ) | |
| args = ap.parse_args() | |
| if not args.retry_mode and not args.excel_path: | |
| ap.error('excel_path is required when not in --retry-mode') | |
| eol_user = os.environ.get("EOL_USER", "") | |
| eol_password = os.environ.get("EOL_PASSWORD", "") | |
| if not eol_user or not eol_password: | |
| sys.exit("ERROR: EOL_USER and EOL_PASSWORD must be set") | |
| output_dir = Path(wsl_path(args.output_dir)).expanduser() | |
| cr_dir = output_dir / 'CRs' | |
| ts_dir = output_dir / 'TS' # spec subfolders created per-TS below | |
| cr_dir.mkdir(parents=True, exist_ok=True) | |
| ts_dir.mkdir(parents=True, exist_ok=True) | |
| author = args.author | |
| tc_date = DEFAULT_DATE | |
| # ββ Retry mode β skip steps 1-4, reconstruct state from failed_ts.json βββ | |
| if args.retry_mode: | |
| failed_ts_path = output_dir / 'failed_ts.json' | |
| if not failed_ts_path.exists(): | |
| sys.exit('ERROR: failed_ts.json not found in output directory') | |
| failed_ts_entries = json.loads(failed_ts_path.read_text()) | |
| if not failed_ts_entries: | |
| print('No failed TSs in failed_ts.json β nothing to retry.') | |
| return | |
| _section('Retry mode β Steps 5 & 6 only') | |
| print(f'Retrying {len(failed_ts_entries)} TS(s) from failed_ts.json') | |
| ts_groups = {} | |
| spec_dirs = {} | |
| ts_paths = {} | |
| cr_paths = {} | |
| for entry in failed_ts_entries: | |
| spec_number = entry['spec_number'] | |
| version = entry['version'] | |
| key = (spec_number, version) | |
| ts_groups[key] = entry['cr_uids'] | |
| spec_dir = Path(entry['spec_dir']) | |
| spec_dirs[key] = spec_dir | |
| expected = spec_dir / entry['expected_filename'] | |
| if expected.exists(): | |
| ts_paths[key] = expected | |
| print(f' [TS {spec_number} v{version}] DOCX found β will apply') | |
| else: | |
| print(f' [TS {spec_number} v{version}] DOCX missing β skipping') | |
| # Reconstruct cr_paths for each UID | |
| cr_entry_dir = Path(entry['cr_dir']) | |
| for uid in entry['cr_uids']: | |
| extracted = cr_entry_dir / f'{uid}_extracted.docx' | |
| plain = cr_entry_dir / f'{uid}.docx' | |
| if extracted.exists(): | |
| cr_paths[uid] = extracted | |
| elif plain.exists(): | |
| cr_paths[uid] = plain | |
| # ββ Steps 5 & 6 (retry mode falls through to shared loop below) ββββββ | |
| report = [] | |
| for (spec_number, version), uids in ts_groups.items(): | |
| ts_key = f'TS {spec_number} v{version}' | |
| spec_compact = spec_number.replace(' ', '') | |
| spec_dir = spec_dirs.get((spec_number, version), ts_dir / spec_compact) | |
| spec_dir.mkdir(parents=True, exist_ok=True) | |
| new_v = derive_new_version(version) | |
| stem = f'ts_{spec_compact}_v{new_v}_was_v{version}' | |
| ts_applied = spec_dir / f'ts_{spec_compact}_v{version}_applied.docx' | |
| ts_final = spec_dir / f'{stem}.docx' | |
| log_path = spec_dir / f'{stem}.log' | |
| errors = [] | |
| print(f'\n-- {ts_key} ({len(uids)} CR(s): {", ".join(uids)}) --') | |
| if (spec_number, version) not in ts_paths: | |
| msg = 'TS DOCX not on disk β skipping' | |
| print(f' SKIP: {msg}') | |
| report.append((ts_key, 0, 0, len(uids), None, log_path, [msg])) | |
| continue | |
| ts_in = ts_paths[(spec_number, version)] | |
| log_buf = io.StringIO() | |
| tee = _TeeWriter(sys.stdout, log_buf) | |
| with contextlib.redirect_stdout(tee): | |
| log_header = ( | |
| f'Pipeline Log (retry)\n' | |
| f'TS: {spec_number} v{version} -> v{new_v}\n' | |
| f'CRs: {", ".join(uids)}\n' | |
| f'Date: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n' | |
| f'{"=" * 60}\n' | |
| ) | |
| print(log_header, end='') | |
| combined_manifest = [] | |
| participating_uids = [] | |
| for uid in uids: | |
| if uid not in cr_paths: | |
| errors.append(f'[{uid}] CR DOCX not found β skipped') | |
| continue | |
| print(f' Parsing {uid}... ', end='', flush=True) | |
| try: | |
| changes = parse_cr(cr_paths[uid]) | |
| combined_manifest.extend(changes) | |
| participating_uids.append(uid) | |
| print(f'{len(changes)} change(s)') | |
| except Exception as e: | |
| errors.append(f'[{uid}] parse ERROR: {e}') | |
| print(f'ERROR: {e}') | |
| if not combined_manifest: | |
| print(' No changes parsed β skipping apply step.') | |
| report.append((ts_key, 0, 0, len(uids), None, log_path, | |
| errors + ['No changes parsed'])) | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| continue | |
| print(f' Applying {len(combined_manifest)} change(s) to {ts_in.name}...') | |
| try: | |
| n_ok, n_skip, log_lines = apply_manifest( | |
| ts_in, combined_manifest, ts_applied, author=author, date=tc_date | |
| ) | |
| except Exception as e: | |
| errors.append(f'apply_manifest ERROR: {e}') | |
| print(f' ERROR: {e}') | |
| report.append((ts_key, 0, 0, len(uids), None, log_path, errors)) | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| continue | |
| for line in log_lines: | |
| print(f' {line}') | |
| # Bubble every un-applied change into the warnings list | |
| for line in log_lines: | |
| if line.strip().startswith('ERROR'): | |
| errors.append(line.strip()) | |
| print(f' -> Applied: {n_ok} Skipped: {n_skip}') | |
| print(' Finalising metadata...') | |
| try: | |
| ts_doc = docx_lib.Document(str(ts_applied)) | |
| rev = RevCounter(ts_doc) | |
| pub_ym, pub_month_year = compute_pub_date() | |
| old_v = version | |
| title_text = ts_doc.paragraphs[0].text | |
| date_match = re.search(r'\((\d{4}-\d{2})\)', title_text) | |
| old_date_str = date_match.group(1) if date_match else '' | |
| print(f' Version: {old_v} -> {new_v}') | |
| print(f' Publication: {pub_month_year} ({pub_ym})') | |
| for uid in participating_uids: | |
| try: | |
| meta = extract_cr_metadata(str(cr_paths[uid])) | |
| ch_cells = update_change_history_table( | |
| ts_doc, meta, pub_ym, old_v, new_v, rev, author, tc_date | |
| ) | |
| print(f' [Change History] {uid}: {ch_cells}') | |
| except NoChangeHistoryTable: | |
| print(f' [Change History] {uid}: NOT PRESENT β this document has no Change History table (History table only)') | |
| except Exception as e: | |
| errors.append(f'[{uid}] Change History ERROR: {e}') | |
| print(f' [Change History] {uid}: ERROR β {e}') | |
| try: | |
| h_cells = update_history_table( | |
| ts_doc, new_v, pub_month_year, rev, author, tc_date | |
| ) | |
| print(f' [History] {h_cells}') | |
| except Exception as e: | |
| errors.append(f'History table ERROR: {e}') | |
| print(f' [History] ERROR β {e}') | |
| if old_date_str: | |
| try: | |
| update_title_para( | |
| ts_doc, old_v, new_v, old_date_str, pub_ym, rev, author, tc_date | |
| ) | |
| print(f' [Title] V{old_v} -> V{new_v}, ({old_date_str}) -> ({pub_ym})') | |
| except Exception as e: | |
| errors.append(f'Title update ERROR: {e}') | |
| print(f' [Title] ERROR β {e}') | |
| else: | |
| print(f' [Title] SKIP β no (YYYY-MM) pattern in: {title_text!r}') | |
| ts_doc.save(str(ts_final)) | |
| print(f' Saved: {spec_compact}/{ts_final.name}') | |
| print(f' Log: {spec_compact}/{log_path.name}') | |
| report.append((ts_key, n_ok, n_skip, len(uids), ts_final, log_path, errors)) | |
| except Exception as e: | |
| errors.append(f'Finalisation ERROR: {e}') | |
| print(f' Finalisation ERROR: {e}') | |
| report.append((ts_key, n_ok, n_skip, len(uids), ts_applied, log_path, errors)) | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| # Update failed_ts.json β remove entries that are now resolved | |
| still_failed = [ | |
| e for e in failed_ts_entries | |
| if not (Path(e['spec_dir']) / e['expected_filename']).exists() | |
| ] | |
| failed_ts_path.write_text(json.dumps(still_failed, indent=2)) | |
| _section('Retry Summary') | |
| n_success = sum(1 for r in report if r[4] is not None and not r[6]) | |
| n_partial = sum(1 for r in report if r[4] is not None and r[6]) | |
| n_failed = sum(1 for r in report if r[4] is None) | |
| print(f'TSs processed: {n_success} fully OK, {n_partial} with warnings, {n_failed} skipped/failed') | |
| for ts_key, n_ok, n_skip, n_crs, out_path, log_path, errors in report: | |
| status_tag = 'OK' if out_path and not errors else ('WARN' if out_path else 'SKIP') | |
| print(f' [{status_tag}] {ts_key}') | |
| for err in errors: | |
| print(f' ! {err}') | |
| return | |
| excel_path = wsl_path(args.excel_path) | |
| # ββ Step 1: Parse Excel βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _section('Step 1 β Parsing Excel') | |
| print(f'Excel: {excel_path}') | |
| print(f'Person: {args.person_name!r}') | |
| try: | |
| cr_list = parse_excel(excel_path, args.person_name) | |
| except Exception as e: | |
| sys.exit(f'ERROR parsing Excel: {e}') | |
| print(f'Found {len(cr_list)} Accepted CR(s)') | |
| if not cr_list: | |
| print('Nothing to process.') | |
| return | |
| # ββ Step 2: Download CR DOCXs βββββββββββββββββββββββββββββββββββββββββββββ | |
| _section('Step 2 β Downloading CR DOCXs') | |
| cr_paths = {} # uid -> Path | |
| for uid, _ in cr_list: | |
| docx_path, note = download_cr(uid, cr_dir, eol_user, eol_password) | |
| if docx_path: | |
| cr_paths[uid] = docx_path | |
| print(f' [{uid}] OK ({note}) β {docx_path.name}') | |
| n_cr_failed = len(cr_list) - len(cr_paths) | |
| if n_cr_failed: | |
| print(f' {len(cr_paths)}/{len(cr_list)} downloaded ({n_cr_failed} failed β details in warnings)') | |
| else: | |
| print(f' All {len(cr_list)} CR(s) downloaded successfully') | |
| # ββ Step 3: Parse cover pages β group by target TS βββββββββββββββββββββββ | |
| _section('Step 3 β Parsing CR cover pages') | |
| ts_groups = {} # (spec_number, version) -> [uid, ...] | |
| uid_cover_failed = [] | |
| for uid in cr_paths: | |
| spec_number, version = parse_cr_cover(cr_paths[uid]) | |
| if spec_number and version: | |
| key = (spec_number, version) | |
| ts_groups.setdefault(key, []).append(uid) | |
| print(f' [{uid}] -> TS {spec_number} v{version}') | |
| else: | |
| uid_cover_failed.append(uid) | |
| print(f' [{uid}] WARNING: could not parse cover page β skipping') | |
| if not ts_groups: | |
| print('\nNo TSs identified. Nothing to apply.') | |
| return | |
| # ββ Step 4: Download TSs ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _section('Step 4 β Downloading TSs') | |
| ts_paths = {} # (spec_number, version) -> Path | |
| spec_dirs = {} # (spec_number, version) -> Path (per-spec subfolder) | |
| for (spec_number, version) in ts_groups: | |
| spec_compact = spec_number.replace(' ', '') | |
| spec_dir = ts_dir / spec_compact | |
| spec_dir.mkdir(parents=True, exist_ok=True) | |
| spec_dirs[(spec_number, version)] = spec_dir | |
| print(f' [TS {spec_number} v{version}] ', end='', flush=True) | |
| filename, note = None, "not attempted" | |
| for attempt in range(1, 4): | |
| filename, note = download_ts(spec_number, version, spec_dir, eol_user, eol_password) | |
| if filename: | |
| break | |
| if attempt < 3: | |
| print(f'\n [attempt {attempt}/3 failed β retrying in 5s: {note}]', flush=True) | |
| print(f' [TS {spec_number} v{version}] ', end='', flush=True) | |
| time.sleep(5) | |
| else: | |
| print(f'\n [all 3 attempts failed]', flush=True) | |
| if filename: | |
| ts_paths[(spec_number, version)] = spec_dir / filename | |
| print(f'OK ({note}) β {spec_compact}/{filename}') | |
| else: | |
| print(f'FAILED β {note}') | |
| # Write failed_ts.json (even when empty so app.py can detect "no failures") | |
| failed_ts_entries = [ | |
| { | |
| "spec_number": spec_number, | |
| "version": version, | |
| "spec_compact": spec_number.replace(' ', ''), | |
| "spec_dir": str(spec_dirs[(spec_number, version)]), | |
| "expected_filename": f"ts_{spec_number.replace(' ', '')}_v{version}.docx", | |
| "cr_uids": ts_groups[(spec_number, version)], | |
| "cr_dir": str(cr_dir), | |
| } | |
| for (spec_number, version) in ts_groups | |
| if (spec_number, version) not in ts_paths | |
| ] | |
| (output_dir / "failed_ts.json").write_text( | |
| json.dumps(failed_ts_entries, indent=2) | |
| ) | |
| # ββ Steps 5 & 6: Apply CRs + Finalise each TS ββββββββββββββββββββββββββββ | |
| _section('Steps 5 & 6 β Applying CRs and Finalising Metadata') | |
| report = [] # (ts_key, n_ok, n_skip, n_crs, out_path, log_path, errors) | |
| for (spec_number, version), uids in ts_groups.items(): | |
| ts_key = f'TS {spec_number} v{version}' | |
| spec_compact = spec_number.replace(' ', '') | |
| spec_dir = spec_dirs.get((spec_number, version), ts_dir / spec_compact) | |
| spec_dir.mkdir(parents=True, exist_ok=True) | |
| # Derive new version early so filenames are known upfront | |
| new_v = derive_new_version(version) | |
| stem = f'ts_{spec_compact}_v{new_v}_was_v{version}' | |
| ts_applied = spec_dir / f'ts_{spec_compact}_v{version}_applied.docx' | |
| ts_final = spec_dir / f'{stem}.docx' | |
| log_path = spec_dir / f'{stem}.log' | |
| errors = [] | |
| print(f'\n-- {ts_key} ({len(uids)} CR(s): {", ".join(uids)}) --') | |
| if (spec_number, version) not in ts_paths: | |
| msg = 'TS download failed β skipping' | |
| print(f' SKIP: {msg}') | |
| report.append((ts_key, 0, 0, len(uids), None, log_path, [msg])) | |
| continue | |
| ts_in = ts_paths[(spec_number, version)] | |
| # All per-TS output is captured to log_buf (tee: stdout + file) | |
| log_buf = io.StringIO() | |
| tee = _TeeWriter(sys.stdout, log_buf) | |
| with contextlib.redirect_stdout(tee): | |
| log_header = ( | |
| f'Pipeline Log\n' | |
| f'TS: {spec_number} v{version} -> v{new_v}\n' | |
| f'CRs: {", ".join(uids)}\n' | |
| f'Date: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n' | |
| f'{"=" * 60}\n' | |
| ) | |
| print(log_header, end='') | |
| # 5a. Parse all CR manifests and combine | |
| combined_manifest = [] | |
| participating_uids = [] | |
| for uid in uids: | |
| if uid not in cr_paths: | |
| errors.append(f'[{uid}] CR download had failed β skipped') | |
| continue | |
| print(f' Parsing {uid}... ', end='', flush=True) | |
| try: | |
| changes = parse_cr(cr_paths[uid]) | |
| combined_manifest.extend(changes) | |
| participating_uids.append(uid) | |
| print(f'{len(changes)} change(s)') | |
| except Exception as e: | |
| errors.append(f'[{uid}] parse ERROR: {e}') | |
| print(f'ERROR: {e}') | |
| if not combined_manifest: | |
| print(' No changes parsed β skipping apply step.') | |
| report.append((ts_key, 0, 0, len(uids), None, log_path, | |
| errors + ['No changes parsed'])) | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| continue | |
| # 5b. Apply manifest to TS | |
| print(f' Applying {len(combined_manifest)} change(s) to {ts_in.name}...') | |
| try: | |
| n_ok, n_skip, log_lines = apply_manifest( | |
| ts_in, combined_manifest, ts_applied, author=author, date=tc_date | |
| ) | |
| except Exception as e: | |
| errors.append(f'apply_manifest ERROR: {e}') | |
| print(f' ERROR: {e}') | |
| report.append((ts_key, 0, 0, len(uids), None, log_path, errors)) | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| continue | |
| for line in log_lines: | |
| print(f' {line}') | |
| # Bubble every un-applied change into the warnings list | |
| for line in log_lines: | |
| if line.strip().startswith('ERROR'): | |
| errors.append(line.strip()) | |
| print(f' -> Applied: {n_ok} Skipped: {n_skip}') | |
| # 6. Finalise metadata (Change History, History, title paragraph) | |
| print(' Finalising metadata...') | |
| try: | |
| ts_doc = docx_lib.Document(str(ts_applied)) | |
| rev = RevCounter(ts_doc) | |
| pub_ym, pub_month_year = compute_pub_date() | |
| old_v = version | |
| # Extract old date string from first paragraph | |
| title_text = ts_doc.paragraphs[0].text | |
| date_match = re.search(r'\((\d{4}-\d{2})\)', title_text) | |
| old_date_str = date_match.group(1) if date_match else '' | |
| print(f' Version: {old_v} -> {new_v}') | |
| print(f' Publication: {pub_month_year} ({pub_ym})') | |
| # One Change History row per CR | |
| for uid in participating_uids: | |
| try: | |
| meta = extract_cr_metadata(str(cr_paths[uid])) | |
| ch_cells = update_change_history_table( | |
| ts_doc, meta, pub_ym, old_v, new_v, rev, author, tc_date | |
| ) | |
| print(f' [Change History] {uid}: {ch_cells}') | |
| except NoChangeHistoryTable: | |
| print(f' [Change History] {uid}: NOT PRESENT β this document has no Change History table (History table only)') | |
| except Exception as e: | |
| errors.append(f'[{uid}] Change History ERROR: {e}') | |
| print(f' [Change History] {uid}: ERROR β {e}') | |
| # One History row for the whole TS | |
| try: | |
| h_cells = update_history_table( | |
| ts_doc, new_v, pub_month_year, rev, author, tc_date | |
| ) | |
| print(f' [History] {h_cells}') | |
| except Exception as e: | |
| errors.append(f'History table ERROR: {e}') | |
| print(f' [History] ERROR β {e}') | |
| # Title paragraph version + date | |
| if old_date_str: | |
| try: | |
| update_title_para( | |
| ts_doc, old_v, new_v, old_date_str, pub_ym, rev, author, tc_date | |
| ) | |
| print(f' [Title] V{old_v} -> V{new_v}, ({old_date_str}) -> ({pub_ym})') | |
| except Exception as e: | |
| errors.append(f'Title update ERROR: {e}') | |
| print(f' [Title] ERROR β {e}') | |
| else: | |
| print(f' [Title] SKIP β no (YYYY-MM) pattern in: {title_text!r}') | |
| ts_doc.save(str(ts_final)) | |
| print(f' Saved: {spec_compact}/{ts_final.name}') | |
| print(f' Log: {spec_compact}/{log_path.name}') | |
| report.append((ts_key, n_ok, n_skip, len(uids), ts_final, log_path, errors)) | |
| except Exception as e: | |
| errors.append(f'Finalisation ERROR: {e}') | |
| print(f' Finalisation ERROR: {e}') | |
| report.append((ts_key, n_ok, n_skip, len(uids), ts_applied, log_path, errors)) | |
| # Write log file after the tee context exits | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| # ββ Final Report ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _section('Final Report') | |
| n_success = sum(1 for r in report if r[4] is not None and not r[6]) | |
| n_partial = sum(1 for r in report if r[4] is not None and r[6]) | |
| n_failed = sum(1 for r in report if r[4] is None) | |
| print(f'Person: {args.person_name}') | |
| print(f'Excel: {excel_path}') | |
| print(f'CRs found: {len(cr_list)}') | |
| print(f'TSs updated: {n_success} fully OK, {n_partial} with warnings, {n_failed} failed') | |
| print() | |
| for ts_key, n_ok, n_skip, n_crs, out_path, log_path, errors in report: | |
| if out_path and not errors: | |
| status = 'OK' | |
| elif out_path: | |
| status = 'WARN' | |
| else: | |
| status = 'FAIL' | |
| print(f' [{status}] {ts_key}') | |
| print(f' CRs: {n_crs} | Body changes applied: {n_ok} | Skipped: {n_skip}') | |
| if out_path: | |
| print(f' Output: {out_path.parent.name}/{out_path.name}') | |
| if log_path and log_path.exists(): | |
| print(f' Log: {log_path.parent.name}/{log_path.name}') | |
| for err in errors: | |
| print(f' ! {err}') | |
| print() | |
| print(f'Output directory: {output_dir}/') | |
| if __name__ == '__main__': | |
| main() | |