Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| cha_json.py — 將單一 CLAN .cha 轉成 JSON(強化 %mor/%wor/%gra 對齊) | |
| 用法: | |
| # CLI | |
| python3 cha_json.py --input /path/to/input.cha --output /path/to/output.json | |
| 程式化呼叫(供 pipeline 使用): | |
| from cha_json import cha_to_json_file, cha_to_dict | |
| out_path, data = cha_to_json_file("/path/in.cha", "/path/out.json") | |
| data2 = cha_to_dict("/path/in.cha") | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import json | |
| import sys | |
| import argparse | |
| from pathlib import Path | |
| from collections import defaultdict | |
| from typing import List, Dict, Any, Tuple, Optional | |
| # 可接受的跨行停止條件(用於 %mor/%wor/%gra 合併) | |
| TAG_PREFIXES = ("*PAR", "*INV", "%mor:", "%gra:", "%wor:", "@") | |
| WORD_RE = re.compile(r"[A-Za-z0-9]+") | |
| # 病人角色:PAR / PAR0 / PAR1 / ... | |
| ID_PAR_RE = re.compile(r"\|PAR\d*\|") | |
| # 對話行:*INV: 或 *PAR0: / *PAR1: / ... | |
| UTTER_RE = re.compile(r"^\*(INV|PAR\d+):") | |
| # ────────── 同義集合(對齊時容忍形態變化) ────────── | |
| SYN_SETS = [ | |
| {"be", "am", "is", "are", "was", "were", "been", "being"}, | |
| {"have", "has", "had"}, | |
| {"do", "does", "did", "done", "doing"}, | |
| {"go", "goes", "going", "went", "gone"}, | |
| {"run", "runs", "running", "ran"}, | |
| {"see", "sees", "seeing", "saw", "seen"}, | |
| {"get", "gets", "getting", "got", "gotten"}, | |
| {"drop", "drops", "dropping", "dropped"}, | |
| {"swim", "swims", "swimming", "swam", "swum"}, | |
| ] | |
| def same_syn(a: str, b: str) -> bool: | |
| if not a or not b: | |
| return False | |
| for s in SYN_SETS: | |
| if a in s and b in s: | |
| return True | |
| return False | |
| def canonical(txt: str) -> str: | |
| """token/word → 比對用字串:去掉 & ~ - | 之後的非字母數字、轉小寫""" | |
| head = re.split(r"[~\-\&|]", txt, 1)[0] | |
| m = WORD_RE.search(head) | |
| return m.group(0).lower() if m else "" | |
| def merge_multiline(block_lines: List[str]) -> str: | |
| """ | |
| 合併跨行的 %mor/%wor/%gra。 | |
| 規則:以 '%' 開頭者作為起始,往下串,遇到新標籤或 @ 開頭就停。 | |
| """ | |
| merged, buf = [], None | |
| for raw in block_lines: | |
| ln = raw.rstrip("\n").replace("\x15", "") # 去掉 CLAN 控制字 | |
| if ln.lstrip().startswith("%") and ":" in ln: | |
| if buf: | |
| merged.append(buf) | |
| buf = ln | |
| else: | |
| if buf and ln.strip(): | |
| buf += " " + ln.strip() | |
| else: | |
| merged.append(ln) | |
| if buf: | |
| merged.append(buf) | |
| return "\n".join(merged) | |
| def cha_to_json(lines: List[str]) -> Dict[str, Any]: | |
| """ | |
| 將 .cha 檔行列表轉 JSON 結構。 | |
| 回傳格式: | |
| { | |
| "sentences": [...], | |
| "pos_mapping": {...}, | |
| "grammar_mapping": {...}, | |
| "aphasia_types": {...}, | |
| "text_all": "..." # 方便下游模型使用的 PAR 合併文字 | |
| } | |
| """ | |
| # 對應表(pos / gra 從 1 起算;aphasia 類型 0 起) | |
| pos_map: Dict[str, int] = defaultdict(lambda: len(pos_map) + 1) | |
| gra_map: Dict[str, int] = defaultdict(lambda: len(gra_map) + 1) | |
| aphasia_map: Dict[str, int] = defaultdict(lambda: len(aphasia_map)) | |
| data: List[Dict[str, Any]] = [] | |
| sent: Optional[Dict[str, Any]] = None | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i].rstrip("\n") | |
| # 啟段 | |
| if line.startswith("@Begin"): | |
| sent = { | |
| "sentence_id": f"S{len(data)+1}", | |
| "sentence_pid": None, | |
| "aphasia_type": None, # 若最後仍沒有,就標 UNKNOWN | |
| "dialogues": [] # [ { "INV": [...], "PAR": [...] }, ... ] | |
| } | |
| i += 1 | |
| continue | |
| # 結束 | |
| if line.startswith("@End"): | |
| if sent and sent["dialogues"]: | |
| if not sent.get("aphasia_type"): | |
| sent["aphasia_type"] = "UNKNOWN" | |
| aphasia_map["UNKNOWN"] | |
| data.append(sent) | |
| sent = None | |
| i += 1 | |
| continue | |
| # 句子屬性 | |
| if sent and line.startswith("@PID:"): | |
| parts = line.split("\t") | |
| if len(parts) > 1: | |
| sent["sentence_pid"] = parts[1].strip() | |
| i += 1 | |
| continue | |
| if sent and line.startswith("@ID:"): | |
| # 是否為病人那位 PAR* | |
| if ID_PAR_RE.search(line): | |
| aph = "UNKNOWN" | |
| # 如果 @ID 有標註失語類型,可在此使用 regex 抓出來並替換 aph | |
| # m = re.search(r"WAB:([A-Za-z]+)", line) | |
| # if m: aph = m.group(1) | |
| aph = aph.upper() | |
| aphasia_map[aph] # 建立 map(自動編號) | |
| sent["aphasia_type"] = aph | |
| i += 1 | |
| continue | |
| # 對話行:*INV: 或 *PARx: | |
| if sent and UTTER_RE.match(line): | |
| role_tag = UTTER_RE.match(line).group(1) | |
| role = "INV" if role_tag == "INV" else "PAR" | |
| if not sent["dialogues"]: | |
| sent["dialogues"].append({"INV": [], "PAR": []}) | |
| # 新輪對話:若來的是 INV 且上一輪已有 PAR,視為下一輪 | |
| if role == "INV" and sent["dialogues"][-1]["PAR"]: | |
| sent["dialogues"].append({"INV": [], "PAR": []}) | |
| # 新增一個空 turn(之後 %mor/%wor/%gra 會補) | |
| sent["dialogues"][-1][role].append( | |
| {"tokens": [], "word_pos_ids": [], "word_grammar_ids": [], "word_durations": [], "utterance_text": ""} | |
| ) | |
| i += 1 | |
| continue | |
| # %mor | |
| if sent and line.startswith("%mor:"): | |
| blk = [line]; i += 1 | |
| while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): | |
| blk.append(lines[i]); i += 1 | |
| units = merge_multiline(blk).replace("%mor:", "").strip().split() | |
| toks, pos_ids = [], [] | |
| for u in units: | |
| if "|" in u: | |
| pos, rest = u.split("|", 1) | |
| word = rest.split("|", 1)[0] | |
| toks.append(word) | |
| pos_ids.append(pos_map[pos]) | |
| dlg = sent["dialogues"][-1] | |
| tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] | |
| tgt["tokens"], tgt["word_pos_ids"] = toks, pos_ids | |
| # 也保存 plain text 供下游模型使用 | |
| tgt["utterance_text"] = " ".join(toks).strip() | |
| continue | |
| # %wor | |
| if sent and line.startswith("%wor:"): | |
| blk = [line]; i += 1 | |
| while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): | |
| blk.append(lines[i]); i += 1 | |
| merged = merge_multiline(blk).replace("%wor:", "").strip() | |
| # 抓 <word> <start>_<end> | |
| raw_pairs = re.findall(r"(\S+)\s+(\d+)_(\d+)", merged) | |
| wor = [(w, int(s), int(e)) for (w, s, e) in raw_pairs] | |
| dlg = sent["dialogues"][-1] | |
| tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] | |
| # 與 %mor tokens 對齊,duration = end - start | |
| aligned: List[Tuple[str, int]] = [] | |
| j = 0 | |
| for tok in tgt.get("tokens", []): | |
| c_tok = canonical(tok) | |
| match = None | |
| for k in range(j, len(wor)): | |
| c_w = canonical(wor[k][0]) | |
| if ( | |
| c_tok == c_w | |
| or c_w.startswith(c_tok) | |
| or c_tok.startswith(c_w) | |
| or same_syn(c_tok, c_w) | |
| ): | |
| match = wor[k] | |
| j = k + 1 | |
| break | |
| dur = (match[2] - match[1]) if match else 0 | |
| aligned.append([tok, dur]) | |
| tgt["word_durations"] = aligned | |
| continue | |
| # %gra | |
| if sent and line.startswith("%gra:"): | |
| blk = [line]; i += 1 | |
| while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): | |
| blk.append(lines[i]); i += 1 | |
| units = merge_multiline(blk).replace("%gra:", "").strip().split() | |
| triples = [] | |
| for u in units: | |
| # 例:1|2|DET | |
| parts = u.split("|") | |
| if len(parts) == 3: | |
| a, b, r = parts | |
| if a.isdigit() and b.isdigit(): | |
| triples.append([int(a), int(b), gra_map[r]]) | |
| dlg = sent["dialogues"][-1] | |
| tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] | |
| tgt["word_grammar_ids"] = triples | |
| continue | |
| # 其他行 | |
| i += 1 | |
| # 收尾(檔案若意外沒 @End) | |
| if sent and sent["dialogues"]: | |
| if not sent.get("aphasia_type"): | |
| sent["aphasia_type"] = "UNKNOWN" | |
| aphasia_map["UNKNOWN"] | |
| data.append(sent) | |
| # 建立 text_all:把所有 PAR utterance_text 串起來 | |
| par_texts: List[str] = [] | |
| for s in data: | |
| for turn in s.get("dialogues", []): | |
| for par_ut in turn.get("PAR", []): | |
| if par_ut.get("utterance_text"): | |
| par_texts.append(par_ut["utterance_text"]) | |
| text_all = "\n".join(par_texts).strip() | |
| return { | |
| "sentences": data, | |
| "pos_mapping": dict(pos_map), | |
| "grammar_mapping": dict(gra_map), | |
| "aphasia_types": dict(aphasia_map), | |
| "text_all": text_all | |
| } | |
| # ────────── 封裝:檔案 → dict / 檔案 → 檔案 ────────── | |
| def cha_to_dict(cha_path: str) -> Dict[str, Any]: | |
| """讀取 .cha 檔並回傳 dict(不寫檔)。""" | |
| p = Path(cha_path) | |
| if not p.exists(): | |
| raise FileNotFoundError(f"找不到檔案: {cha_path}") | |
| with p.open("r", encoding="utf-8") as fh: | |
| lines = fh.readlines() | |
| return cha_to_json(lines) | |
| def cha_to_json_file(cha_path: str, output_json: Optional[str] = None) -> Tuple[str, Dict[str, Any]]: | |
| """ | |
| 將 .cha 轉成 JSON 並寫檔。 | |
| 回傳:(output_json_path, data_dict) | |
| """ | |
| data = cha_to_dict(cha_path) | |
| out_path = Path(output_json) if output_json else Path(cha_path).with_suffix(".json") | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| with out_path.open("w", encoding="utf-8") as fh: | |
| json.dump(data, fh, ensure_ascii=False, indent=4) | |
| return str(out_path), data | |
| # ────────── CLI ────────── | |
| def parse_args(): | |
| p = argparse.ArgumentParser() | |
| p.add_argument("--input", "-i", type=str, required=True, help="輸入 .cha 檔") | |
| p.add_argument("--output", "-o", type=str, required=True, help="輸出 .json 檔") | |
| return p.parse_args() | |
| def cha_to_json_path(cha_path: str, output_json: str | None = None) -> str: | |
| """Backward-compatible alias for old code.""" | |
| out, _ = cha_to_json_file(cha_path, output_json=output_json) | |
| return out | |
| def main(): | |
| args = parse_args() | |
| in_path = Path(args.input) | |
| out_path = Path(args.output) | |
| if not in_path.exists(): | |
| sys.exit(f"❌ 找不到檔案: {in_path}") | |
| with in_path.open("r", encoding="utf-8") as fh: | |
| lines = fh.readlines() | |
| dataset = cha_to_json(lines) | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| with out_path.open("w", encoding="utf-8") as fh: | |
| json.dump(dataset, fh, ensure_ascii=False, indent=4) | |
| print( | |
| f"✅ 轉換完成 → {out_path}(句數 {len(dataset['sentences'])}," | |
| f"pos={len(dataset['pos_mapping'])},gra={len(dataset['grammar_mapping'])}," | |
| f"類型鍵={list(dataset['aphasia_types'].keys())})" | |
| ) | |
| if __name__ == "__main__": | |
| main() | |