| import csv |
| from datetime import datetime |
| import subprocess |
| from subprocess import CompletedProcess |
| from typing import Literal |
| import re |
| import time |
| import difflib |
| from functools import wraps |
| from pathlib import Path |
|
|
| import textdistance |
| import numpy as np |
| import soundfile as sf |
|
|
| def timer(func): |
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| start_time = time.perf_counter() |
| result = func(*args, **kwargs) |
| end_time = time.perf_counter() |
| run_time = end_time - start_time |
| print(f"函数 {func.__name__!r} 执行耗时: {run_time:.4f} 秒") |
| return result |
| return wrapper |
|
|
| class Timer: |
| def __init__(self, log=""): |
| self.log = log |
|
|
| def __enter__(self): |
| self.start = time.perf_counter() |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| end = time.perf_counter() |
| self.duration = end - self.start |
| print(f"{self.log} cost: {self.duration:.4f} 秒") |
|
|
| def get_time_str(level:Literal["d","s","ms"]="d"): |
| time = datetime.now() |
| if level == "d": |
| return time.strftime("%Y-%m-%d") |
| if level == "s": |
| return time.strftime("%H%M%S") |
| if level == "ms": |
| return time.strftime("%H%M%S.%f") |
|
|
|
|
| def save_csv(file_path, header, rows): |
| with open(file_path, "w", encoding="utf-8", newline="") as f: |
| writer = csv.writer(f) |
| if header: |
| writer.writerow(header) |
| writer.writerows(rows) |
| print(f"write csv to {file_path}") |
|
|
| def cmd(command: str, check=True, capture_output=False) -> CompletedProcess: |
| print(command) |
| if capture_output: |
| ret = subprocess.run(command, shell=True, check=check, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
| universal_newlines=True) |
| else: |
| ret = subprocess.run(command, shell=True, check=check) |
| print(ret.stdout) |
| return ret |
|
|
| import cn2an |
| def clean_text_for_comparison_zh(text): |
| """移除中文标点,并把所有数字都转换成中文的形式""" |
| symbol_pattern = "[ ,。、!?::‘’-《》!?;,\n]" |
| to = "" |
| text = re.sub(symbol_pattern, to, text).lower() |
| if re.search(r"\d", text): |
| text = cn2an.transform(text, "an2cn") |
| return text |
|
|
| def clean_text_for_comparison_en(text): |
| symbol_pattern = "[,.\n]" |
| to = "" |
| return re.sub(symbol_pattern, to, text).lower() |
|
|
|
|
| def run_textdistance(text1, text2): |
| d = textdistance.levenshtein.distance(text1, text2) |
| nd = d / len(text1) |
| |
| return d, nd |
|
|
| def highlight_diff(a, b, spliter=""): |
| if spliter: |
| a = a.split(spliter) |
| b = b.split(spliter) |
| matcher = difflib.SequenceMatcher(None, a, b) |
| output = [] |
|
|
| for tag, a_start, a_end, b_start, b_end in matcher.get_opcodes(): |
| if tag == 'equal': |
| output.append(spliter.join(a[a_start:a_end])) |
| elif tag == 'delete': |
| deleted = spliter.join(a[a_start:a_end]) |
| output.append(f"[-{deleted}-]") |
| elif tag == 'insert': |
| inserted = spliter.join(b[b_start:b_end]) |
| output.append(f"{{+{inserted}+}}") |
| elif tag == 'replace': |
| deleted = spliter.join(a[a_start:a_end]) |
| inserted = spliter.join(b[b_start:b_end]) |
| output.append(f"[-{deleted}-]{{+{inserted}+}}") |
|
|
| return spliter.join(output) |
|
|
| def time_to_float(s: str): |
| if d := s.replace("s", ""): |
| return float(d) |
| return 0.0 |
|
|
| def read_audio(file:Path)->np.ndarray: |
| audio, sr = sf.read(file) |
| if sr != 16000: |
| raise ValueError(f"只支持 16k 采样率的音频,当前采样率为 {sr}") |
| return audio.astype(np.float32) |
|
|
| def write_audio(file:Path, audio:np.ndarray, sr=16000): |
| sf.write(file, audio, sr) |
| print(f"写入音频文件 {file}") |
|
|
| if __name__ == '__main__': |
| with Timer() as duration_b: |
| print("开始操作 B...") |
| time.sleep(0.4) |
| print(duration_b.duration) |
| with Timer("C") as duration_b: |
| print("开始操作 C...") |
| time.sleep(0.5) |
| print(duration_b.duration) |