Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| fetch_crs.py β Download CRs and TSs from a 3GPP/ETSI Excel contribution list. | |
| Usage: | |
| python3 fetch_crs.py <excel_path> <person_name> [--output-dir DIR] | |
| Steps: | |
| 1. Parse Excel, filter Accepted CRs by person name | |
| 2. Download CR DOCXs via docfinder /find/tdoc/download | |
| 3. Parse CR cover pages to extract target TS spec + version | |
| 4. Download TS DOCXs via docfinder /find/docx | |
| 5. Print summary report | |
| """ | |
| import argparse | |
| import os | |
| import re | |
| import sys | |
| import time | |
| import zipfile | |
| from pathlib import Path | |
| import requests | |
| BASE_URL = "https://organizedprogrammers-docfinder.hf.space" | |
| _proxy = os.environ.get("http_proxy") or None | |
| PROXIES = {"http": _proxy, "https": os.environ.get("https_proxy") or None} | |
| # --------------------------------------------------------------------------- | |
| # Path helpers | |
| # --------------------------------------------------------------------------- | |
| def wsl_path(p: str) -> str: | |
| """Convert Windows path (C:\\...) to WSL path (/mnt/c/...) if needed.""" | |
| p = p.strip() | |
| if len(p) >= 2 and p[1] == ":" and p[0].isalpha(): | |
| drive = p[0].lower() | |
| rest = p[2:].replace("\\", "/") | |
| return f"/mnt/{drive}{rest}" | |
| return p | |
| # --------------------------------------------------------------------------- | |
| # Step 1 β Parse Excel | |
| # --------------------------------------------------------------------------- | |
| def parse_excel(excel_path: str, person_name: str): | |
| """ | |
| Return list of (uid, title) for Accepted CRs matching person_name. | |
| Handles both .xls and .xlsx. | |
| """ | |
| path = Path(wsl_path(excel_path)) | |
| ext = path.suffix.lower() | |
| if ext == ".xls": | |
| return _parse_xls(path, person_name) | |
| elif ext == ".xlsx": | |
| return _parse_xlsx(path, person_name) | |
| else: | |
| raise ValueError(f"Unsupported file extension: {ext!r}. Expected .xls or .xlsx") | |
| def _name_pattern(name: str) -> re.Pattern: | |
| return re.compile(r"\b" + re.escape(name) + r"\b", re.IGNORECASE) | |
| def _parse_xls(path: Path, person_name: str): | |
| try: | |
| import xlrd | |
| except ImportError: | |
| sys.exit("ERROR: xlrd is not installed. Run: pip install xlrd") | |
| wb = xlrd.open_workbook(str(path)) | |
| # Try "Contributions" sheet first, fall back to first sheet | |
| try: | |
| ws = wb.sheet_by_name("Contributions") | |
| except xlrd.XLRDError: | |
| ws = wb.sheet_by_index(0) | |
| # Row 0 is headers; row 1 is an empty duplicate β skip it | |
| headers = [str(ws.cell_value(0, c)).strip() for c in range(ws.ncols)] | |
| col = {h: i for i, h in enumerate(headers)} | |
| uid_col = col.get("Uid") or col.get("UID") or col.get("uid") | |
| type_col = col.get("Type") or col.get("type") | |
| status_col = col.get("Status") or col.get("status") | |
| by_col = col.get("SubmittedBy") or col.get("Submitted By") or col.get("submittedby") | |
| title_col = col.get("Title") or col.get("title") | |
| for name, c in [("Uid", uid_col), ("Type", type_col), | |
| ("Status", status_col), ("SubmittedBy", by_col)]: | |
| if c is None: | |
| raise ValueError(f"Column {name!r} not found. Available: {list(col.keys())}") | |
| pattern = _name_pattern(person_name) | |
| results = [] | |
| for r in range(2, ws.nrows): # skip header + empty duplicate | |
| uid = str(ws.cell_value(r, uid_col)).strip() | |
| doc_type = str(ws.cell_value(r, type_col)).strip() | |
| status = str(ws.cell_value(r, status_col)).strip() | |
| submitted_by = str(ws.cell_value(r, by_col)).strip() | |
| title = str(ws.cell_value(r, title_col)).strip() if title_col is not None else "" | |
| if doc_type != "CR": | |
| continue | |
| if status != "Accepted": | |
| continue | |
| if not pattern.search(submitted_by): | |
| continue | |
| results.append((uid, title)) | |
| return results | |
| def _parse_xlsx(path: Path, person_name: str): | |
| try: | |
| import openpyxl | |
| except ImportError: | |
| sys.exit("ERROR: openpyxl is not installed. Run: pip install openpyxl") | |
| wb = openpyxl.load_workbook(str(path), read_only=True, data_only=True) | |
| ws = wb["Contributions"] if "Contributions" in wb.sheetnames else wb.active | |
| rows = iter(ws.iter_rows(values_only=True)) | |
| # Row 0: headers | |
| header_row = next(rows) | |
| headers = [str(h).strip() if h is not None else "" for h in header_row] | |
| col = {h: i for i, h in enumerate(headers)} | |
| # Row 1: empty duplicate β skip | |
| next(rows, None) | |
| uid_col = col.get("Uid") or col.get("UID") or col.get("uid") | |
| type_col = col.get("Type") or col.get("type") | |
| status_col = col.get("Status") or col.get("status") | |
| by_col = col.get("SubmittedBy") or col.get("Submitted By") or col.get("submittedby") | |
| title_col = col.get("Title") or col.get("title") | |
| for name, c in [("Uid", uid_col), ("Type", type_col), | |
| ("Status", status_col), ("SubmittedBy", by_col)]: | |
| if c is None: | |
| raise ValueError(f"Column {name!r} not found. Available: {list(col.keys())}") | |
| pattern = _name_pattern(person_name) | |
| results = [] | |
| for row in rows: | |
| def cell(c): | |
| v = row[c] if c < len(row) else None | |
| return str(v).strip() if v is not None else "" | |
| uid = cell(uid_col) | |
| doc_type = cell(type_col) | |
| status = cell(status_col) | |
| submitted_by = cell(by_col) | |
| title = cell(title_col) if title_col is not None else "" | |
| if not uid: | |
| continue | |
| if doc_type != "CR": | |
| continue | |
| if status != "Accepted": | |
| continue | |
| if not pattern.search(submitted_by): | |
| continue | |
| results.append((uid, title)) | |
| return results | |
| # --------------------------------------------------------------------------- | |
| # Step 2 β Download CR DOCXs | |
| # --------------------------------------------------------------------------- | |
| def download_cr(uid: str, cr_dir: Path): | |
| """ | |
| Download CR DOCX for the given UID. | |
| Returns: | |
| (docx_path, note) β docx_path is the file to use for parsing | |
| note is a human-readable string for the summary | |
| Returns (None, error_msg) on failure. | |
| """ | |
| dest = cr_dir / f"{uid}.docx" | |
| if dest.exists(): | |
| return dest, "already existed" | |
| try: | |
| resp = requests.post( | |
| f"{BASE_URL}/find/tdoc/download", | |
| json={"doc_id": uid}, | |
| proxies=PROXIES, | |
| timeout=60, | |
| ) | |
| except requests.RequestException as e: | |
| return None, f"network error: {e}" | |
| if not resp.ok: | |
| return None, f"HTTP {resp.status_code}" | |
| content = resp.content | |
| if not content: | |
| return None, "empty response" | |
| dest.write_bytes(content) | |
| # ZIP detection | |
| if content[:4] == b"PK\x03\x04": | |
| try: | |
| with zipfile.ZipFile(dest) as zf: | |
| docx_entries = [n for n in zf.namelist() if n.endswith(".docx")] | |
| if docx_entries: | |
| extracted_name = f"{uid}_extracted.docx" | |
| extracted_path = cr_dir / extracted_name | |
| with zf.open(docx_entries[0]) as src, open(extracted_path, "wb") as dst: | |
| dst.write(src.read()) | |
| return extracted_path, "extracted from ZIP" | |
| except zipfile.BadZipFile: | |
| pass # Not actually a ZIP despite magic bytes β treat as raw DOCX | |
| return dest, "downloaded" | |
| # --------------------------------------------------------------------------- | |
| # Step 3 β Parse CR Cover Pages | |
| # --------------------------------------------------------------------------- | |
| SPEC_PATTERN = re.compile(r"^\d{3}\s\d{3}$") | |
| VERSION_PATTERN = re.compile(r"^\d+\.\d+\.\d+$") | |
| def parse_cr_cover(docx_path: Path): | |
| """ | |
| Parse the CR cover table (tables[0]) to extract (spec_number, version). | |
| Returns (spec_number, version) e.g. ("102 221", "18.3.0") | |
| Returns (None, None) if parsing fails. | |
| """ | |
| try: | |
| from docx import Document | |
| except ImportError: | |
| sys.exit("ERROR: python-docx is not installed. Run: pip install python-docx") | |
| try: | |
| doc = Document(str(docx_path)) | |
| except Exception as e: | |
| return None, None | |
| if not doc.tables: | |
| return None, None | |
| table = doc.tables[0] | |
| # Collect all non-empty cell texts in order | |
| cells = [] | |
| for row in table.rows: | |
| for cell in row.cells: | |
| text = cell.text.strip() | |
| if text: | |
| cells.append(text) | |
| spec_number = None | |
| version = None | |
| for i, text in enumerate(cells): | |
| # Look for spec number: "NNN NNN" pattern | |
| if SPEC_PATTERN.match(text) and spec_number is None: | |
| spec_number = text | |
| # Look for version: cell immediately after "Current version:" | |
| if text == "Current version:" and i + 1 < len(cells): | |
| candidate = cells[i + 1] | |
| if VERSION_PATTERN.match(candidate): | |
| version = candidate | |
| # Also accept "Current version" without colon | |
| if text in ("Current version:", "Current version") and version is None: | |
| if i + 1 < len(cells) and VERSION_PATTERN.match(cells[i + 1]): | |
| version = cells[i + 1] | |
| return spec_number, version | |
| # --------------------------------------------------------------------------- | |
| # Step 4 β Download TS DOCXs | |
| # --------------------------------------------------------------------------- | |
| def _is_html(resp: requests.Response) -> bool: | |
| """Return True if the response body is an HTML page (e.g. HF Space loading page).""" | |
| ct = resp.headers.get("content-type", "") | |
| if "text/html" in ct: | |
| return True | |
| return resp.content[:5].lower() in (b"<!doc", b"<html") | |
| def download_ts(spec_number: str, version: str, ts_dir: Path, | |
| max_retries: int = 3, retry_delay: int = 10): | |
| """ | |
| Download TS DOCX for spec_number (e.g. "102 221") and version (e.g. "18.3.0"). | |
| Retries up to max_retries times when the HF Space returns an HTML loading page | |
| instead of the DOCX binary (happens on cold-start / brief restarts). | |
| Returns (filename, note) or (None, error_msg). | |
| """ | |
| spec_no_space = spec_number.replace(" ", "") | |
| filename = f"ts_{spec_no_space}_v{version}.docx" | |
| dest = ts_dir / filename | |
| if dest.exists(): | |
| return filename, "already existed" | |
| last_error = "no attempts made" | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| resp = requests.post( | |
| f"{BASE_URL}/find/docx", | |
| json={"doc_id": spec_number, "version": version}, | |
| proxies=PROXIES, | |
| timeout=120, | |
| ) | |
| except requests.RequestException as e: | |
| return None, f"network error: {e}" | |
| if not resp.ok: | |
| return None, f"HTTP {resp.status_code}" | |
| content = resp.content | |
| if not content: | |
| return None, "empty response" | |
| # Detect HTML splash page (HF Space cold-start) β retry after a delay | |
| if _is_html(resp): | |
| last_error = f"got HTML instead of DOCX (attempt {attempt}/{max_retries})" | |
| if attempt < max_retries: | |
| print(f"\n [retry in {retry_delay}s β HF Space loadingβ¦]", flush=True) | |
| time.sleep(retry_delay) | |
| continue | |
| return None, f"invalid file (not a ZIP/DOCX, starts with {content[:4]!r}) after {max_retries} attempts" | |
| # Good binary response | |
| dest.write_bytes(content) | |
| if content[:2] != b"PK": | |
| dest.unlink() | |
| return None, f"invalid file (not a ZIP/DOCX, starts with {content[:4]!r})" | |
| # Verify the TS contains the expected spec number in its first paragraph | |
| try: | |
| import docx as _docx | |
| _doc = _docx.Document(dest) | |
| first_para = _doc.paragraphs[0].text if _doc.paragraphs else '' | |
| if spec_no_space not in first_para.replace(' ', ''): | |
| dest.unlink() | |
| return None, f"wrong TS returned by API: got {first_para[:80]!r} (expected spec {spec_no_space})" | |
| except Exception: | |
| pass # Trust the ZIP check above | |
| note = "downloaded" if attempt == 1 else f"downloaded (after {attempt} attempts)" | |
| return filename, note | |
| return None, last_error | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Download CRs and TSs from a 3GPP/ETSI Excel contribution list." | |
| ) | |
| parser.add_argument("excel_path", help="Path to .xls or .xlsx contribution list") | |
| parser.add_argument("person_name", help="Name to search for in SubmittedBy column") | |
| parser.add_argument( | |
| "--output-dir", | |
| default=str(Path.home() / "CR_Processing"), | |
| help="Base output directory (default: ~/CR_Processing)", | |
| ) | |
| args = parser.parse_args() | |
| excel_path = wsl_path(args.excel_path) | |
| person_name = args.person_name | |
| output_dir = Path(wsl_path(args.output_dir)).expanduser() | |
| cr_dir = output_dir / "CRs" | |
| ts_dir = output_dir / "TS" | |
| cr_dir.mkdir(parents=True, exist_ok=True) | |
| ts_dir.mkdir(parents=True, exist_ok=True) | |
| # --- Step 1: Parse Excel --- | |
| print(f"Parsing Excel: {excel_path}") | |
| print(f"Filtering for: {person_name!r} | Type=CR | Status=Accepted\n") | |
| try: | |
| cr_list = parse_excel(excel_path, person_name) | |
| except Exception as e: | |
| sys.exit(f"ERROR parsing Excel: {e}") | |
| print(f"Found {len(cr_list)} matching CR(s).\n") | |
| if not cr_list: | |
| print("Nothing to download.") | |
| return | |
| # --- Step 2: Download CR DOCXs --- | |
| print("Downloading CRs...") | |
| cr_results = [] # list of (uid, docx_path_or_None, note) | |
| for uid, title in cr_list: | |
| print(f" [{uid}] ", end="", flush=True) | |
| docx_path, note = download_cr(uid, cr_dir) | |
| cr_results.append((uid, docx_path, note)) | |
| if docx_path: | |
| print(f"OK ({note}) β {docx_path.name}") | |
| else: | |
| print(f"FAILED β {note}") | |
| print() | |
| # --- Step 3: Parse cover pages --- | |
| print("Parsing CR cover pages...") | |
| ts_targets = {} # (spec_number, version) -> list of uids | |
| for uid, docx_path, note in cr_results: | |
| if docx_path is None: | |
| continue | |
| spec_number, version = parse_cr_cover(docx_path) | |
| if spec_number and version: | |
| key = (spec_number, version) | |
| ts_targets.setdefault(key, []).append(uid) | |
| print(f" [{uid}] β TS {spec_number} v{version}") | |
| else: | |
| print(f" [{uid}] WARNING: could not parse cover page (spec/version not found)") | |
| print() | |
| # --- Step 4: Download TSs --- | |
| print("Downloading TSs...") | |
| ts_results = [] # list of (spec_number, version, filename_or_None, note) | |
| for (spec_number, version), uids in ts_targets.items(): | |
| print(f" [TS {spec_number} v{version}] ", end="", flush=True) | |
| filename, note = download_ts(spec_number, version, ts_dir) | |
| ts_results.append((spec_number, version, filename, note)) | |
| if filename: | |
| print(f"OK ({note}) β {filename}") | |
| else: | |
| print(f"FAILED β {note}") | |
| print() | |
| # --- Step 5: Summary --- | |
| print("=" * 50) | |
| print("=== fetch-crs summary ===") | |
| print(f"Person: {person_name}") | |
| print(f"Excel: {excel_path}") | |
| print(f"CRs found: {len(cr_list)} (Accepted, Type=CR)") | |
| print() | |
| print("CRs downloaded:") | |
| for uid, docx_path, note in cr_results: | |
| if docx_path: | |
| print(f" β {docx_path.name} [{note}]") | |
| else: | |
| print(f" β {uid} β {note}") | |
| print() | |
| print("TSs downloaded:") | |
| for spec_number, version, filename, note in ts_results: | |
| if filename: | |
| print(f" β {filename} [{note}]") | |
| else: | |
| print(f" β ts_{spec_number.replace(' ', '')} v{version} β {note}") | |
| print() | |
| print(f"Output: {output_dir}/") | |
| if __name__ == "__main__": | |
| main() | |