| | import re |
| | from typing import List, Dict, Tuple |
| | from tqdm import tqdm |
| | from utils.text_utils import TextUtils |
| | from config import Config |
| |
|
| |
|
| | class TextProcessor: |
| | """大规模文本处理器""" |
| | |
| | def __init__(self): |
| | self.text_utils = TextUtils() |
| | |
| | def chunk_text(self, text: str, chunk_size: int = None, |
| | overlap: int = None) -> List[Dict]: |
| | """将长文本分块,保持语义完整性 |
| | |
| | Args: |
| | text: 输入文本 |
| | chunk_size: 每块的最大字符数 |
| | overlap: 块之间的重叠字符数 |
| | |
| | Returns: |
| | 分块结果列表,每个元素包含 text, start, end, chunk_id |
| | """ |
| | chunk_size = chunk_size or Config.MAX_CHUNK_SIZE |
| | overlap = overlap or Config.CHUNK_OVERLAP |
| | |
| | |
| | paragraphs = text.split('\n\n') |
| | |
| | chunks = [] |
| | current_chunk = "" |
| | current_start = 0 |
| | total_processed = 0 |
| | |
| | print(f"开始分块处理 (块大小: {chunk_size}, 重叠: {overlap})...") |
| | |
| | for para in tqdm(paragraphs, desc="分块进度"): |
| | para = para.strip() |
| | if not para: |
| | continue |
| | |
| | |
| | if len(current_chunk) + len(para) + 2 > chunk_size: |
| | if current_chunk: |
| | |
| | chunks.append({ |
| | 'text': current_chunk.strip(), |
| | 'start': current_start, |
| | 'end': current_start + len(current_chunk), |
| | 'chunk_id': len(chunks) |
| | }) |
| | |
| | |
| | if len(current_chunk) > overlap: |
| | |
| | overlap_text = current_chunk[-overlap:] |
| | |
| | sentences = self.text_utils.split_into_sentences(overlap_text) |
| | if sentences: |
| | overlap_text = sentences[-1] if len(sentences) == 1 else ' '.join(sentences[-2:]) |
| | else: |
| | overlap_text = current_chunk |
| | |
| | |
| | total_processed += len(current_chunk) - len(overlap_text) |
| | current_start = total_processed |
| | |
| | |
| | current_chunk = overlap_text + "\n\n" + para |
| | else: |
| | |
| | current_chunk = para |
| | current_start = total_processed |
| | else: |
| | |
| | if current_chunk: |
| | current_chunk += "\n\n" + para |
| | else: |
| | current_chunk = para |
| | |
| | |
| | if current_chunk: |
| | chunks.append({ |
| | 'text': current_chunk.strip(), |
| | 'start': current_start, |
| | 'end': current_start + len(current_chunk), |
| | 'chunk_id': len(chunks) |
| | }) |
| | |
| | print(f"✓ 文本分块完成: 总共 {len(chunks)} 块") |
| | return chunks |
| | |
| | def chunk_text_by_tokens(self, text: str, max_tokens: int = 1500, |
| | overlap_tokens: int = 150) -> List[Dict]: |
| | """按 token 数量分块(更精确但较慢) |
| | |
| | Args: |
| | text: 输入文本 |
| | max_tokens: 每块的最大 token 数 |
| | overlap_tokens: 重叠的 token 数 |
| | |
| | Returns: |
| | 分块结果列表 |
| | """ |
| | sentences = self.text_utils.split_into_sentences(text) |
| | |
| | chunks = [] |
| | current_chunk = [] |
| | current_tokens = 0 |
| | current_start = 0 |
| | |
| | print(f"按 token 分块处理 (最大: {max_tokens} tokens)...") |
| | |
| | for sentence in tqdm(sentences, desc="处理句子"): |
| | sentence_tokens = self.text_utils.count_tokens(sentence) |
| | |
| | if current_tokens + sentence_tokens > max_tokens and current_chunk: |
| | |
| | chunk_text = ' '.join(current_chunk) |
| | chunks.append({ |
| | 'text': chunk_text, |
| | 'start': current_start, |
| | 'end': current_start + len(chunk_text), |
| | 'chunk_id': len(chunks), |
| | 'token_count': current_tokens |
| | }) |
| | |
| | |
| | overlap_chunk = [] |
| | overlap_tokens_count = 0 |
| | for s in reversed(current_chunk): |
| | s_tokens = self.text_utils.count_tokens(s) |
| | if overlap_tokens_count + s_tokens <= overlap_tokens: |
| | overlap_chunk.insert(0, s) |
| | overlap_tokens_count += s_tokens |
| | else: |
| | break |
| | |
| | current_chunk = overlap_chunk + [sentence] |
| | current_tokens = overlap_tokens_count + sentence_tokens |
| | current_start += len(chunk_text) - len(' '.join(overlap_chunk)) |
| | else: |
| | current_chunk.append(sentence) |
| | current_tokens += sentence_tokens |
| | |
| | |
| | if current_chunk: |
| | chunk_text = ' '.join(current_chunk) |
| | chunks.append({ |
| | 'text': chunk_text, |
| | 'start': current_start, |
| | 'end': current_start + len(chunk_text), |
| | 'chunk_id': len(chunks), |
| | 'token_count': current_tokens |
| | }) |
| | |
| | print(f"✓ Token 分块完成: 总共 {len(chunks)} 块") |
| | return chunks |
| | |
| | def extract_dialogues(self, text: str) -> List[Dict]: |
| | """提取对话片段 |
| | |
| | Args: |
| | text: 输入文本 |
| | |
| | Returns: |
| | 对话列表,每个元素包含 content, attribution, position |
| | """ |
| | |
| | language = self.text_utils.detect_language(text) |
| | |
| | dialogues = [] |
| | |
| | if language == "zh": |
| | |
| | patterns = [ |
| | (r'"([^"]+)"[,,]?\s*([^说道讲告诉问答叫喊]*(?:说|道|讲|告诉|问|答|叫|喊))', 'chinese_quote'), |
| | (r'「([^」]+)」[,,]?\s*([^说道讲]*(?:说|道|讲))', 'chinese_bracket'), |
| | (r'"([^"]+)"', 'simple_quote'), |
| | ] |
| | else: |
| | |
| | patterns = [ |
| | (r'"([^"]+)",?\s+([A-Z][a-z]+\s+(?:said|asked|replied|shouted|whispered|muttered|exclaimed))', 'english_quote_said'), |
| | (r'"([^"]+)"', 'simple_quote'), |
| | (r"'([^']+)',?\s+([A-Z][a-z]+\s+said)", 'english_single_quote'), |
| | ] |
| | |
| | for pattern, pattern_type in patterns: |
| | matches = re.finditer(pattern, text, re.IGNORECASE) |
| | for match in matches: |
| | dialogue = { |
| | 'content': match.group(1).strip(), |
| | 'attribution': match.group(2).strip() if len(match.groups()) > 1 else '', |
| | 'position': match.start(), |
| | 'type': pattern_type |
| | } |
| | |
| | if len(dialogue['content']) > 5: |
| | dialogues.append(dialogue) |
| | |
| | |
| | dialogues.sort(key=lambda x: x['position']) |
| | |
| | return dialogues |
| | |
| | def split_by_chapters(self, text: str) -> List[Dict]: |
| | """按章节分割文本 |
| | |
| | Args: |
| | text: 输入文本 |
| | |
| | Returns: |
| | 章节列表,每个元素包含 title, content, chapter_num |
| | """ |
| | |
| | chapter_patterns = [ |
| | r'Chapter\s+(\d+)[:\s]*([^\n]*)', |
| | r'第([一二三四五六七八九十百千零\d]+)章[:\s]*([^\n]*)', |
| | r'CHAPTER\s+([IVXLCDM]+)[:\s]*([^\n]*)', |
| | ] |
| | |
| | chapters = [] |
| | last_pos = 0 |
| | |
| | for pattern in chapter_patterns: |
| | matches = list(re.finditer(pattern, text, re.IGNORECASE | re.MULTILINE)) |
| | |
| | if matches: |
| | for i, match in enumerate(matches): |
| | start = match.start() |
| | end = matches[i + 1].start() if i + 1 < len(matches) else len(text) |
| | |
| | chapters.append({ |
| | 'chapter_num': match.group(1), |
| | 'title': match.group(2).strip() if len(match.groups()) > 1 else '', |
| | 'content': text[start:end].strip(), |
| | 'start': start, |
| | 'end': end |
| | }) |
| | break |
| | |
| | |
| | if not chapters: |
| | chapters.append({ |
| | 'chapter_num': '1', |
| | 'title': 'Full Text', |
| | 'content': text, |
| | 'start': 0, |
| | 'end': len(text) |
| | }) |
| | |
| | return chapters |
| | |
| | def get_statistics(self, text: str) -> Dict: |
| | """获取文本统计信息 |
| | |
| | Args: |
| | text: 输入文本 |
| | |
| | Returns: |
| | 统计信息字典 |
| | """ |
| | |
| | total_length = len(text) |
| | total_tokens = self.text_utils.count_tokens(text) |
| | |
| | |
| | paragraphs = [p for p in text.split('\n\n') if p.strip()] |
| | paragraph_count = len(paragraphs) |
| | |
| | |
| | sentences = self.text_utils.split_into_sentences(text) |
| | sentence_count = len(sentences) |
| | |
| | |
| | words = re.findall(r'\b\w+\b', text) |
| | word_count = len(words) |
| | |
| | |
| | language = self.text_utils.detect_language(text) |
| | |
| | |
| | dialogues = self.extract_dialogues(text[:10000]) |
| | dialogue_count = len(dialogues) |
| | |
| | |
| | chapters = self.split_by_chapters(text) |
| | chapter_count = len(chapters) |
| | |
| | return { |
| | 'total_length': total_length, |
| | 'total_tokens': total_tokens, |
| | 'paragraphs': paragraph_count, |
| | 'sentences': sentence_count, |
| | 'words': word_count, |
| | 'language': language, |
| | 'dialogues': dialogue_count, |
| | 'chapters': chapter_count, |
| | 'avg_paragraph_length': total_length // paragraph_count if paragraph_count > 0 else 0, |
| | 'avg_sentence_length': total_length // sentence_count if sentence_count > 0 else 0, |
| | } |
| | |
| | def clean_text(self, text: str, |
| | remove_extra_whitespace: bool = True, |
| | normalize_quotes: bool = True) -> str: |
| | """清理文本 |
| | |
| | Args: |
| | text: 输入文本 |
| | remove_extra_whitespace: 是否移除多余空白 |
| | normalize_quotes: 是否标准化引号 |
| | |
| | Returns: |
| | 清理后的文本 |
| | """ |
| | cleaned = text |
| | |
| | |
| | if remove_extra_whitespace: |
| | |
| | cleaned = '\n'.join(line.strip() for line in cleaned.split('\n')) |
| | |
| | cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) |
| | |
| | cleaned = cleaned.replace('\t', ' ') |
| | |
| | cleaned = re.sub(r' {2,}', ' ', cleaned) |
| | |
| | |
| | if normalize_quotes: |
| | |
| | cleaned = cleaned.replace('『', '"').replace('』', '"') |
| | cleaned = cleaned.replace('「', '"').replace('」', '"') |
| | |
| | cleaned = cleaned.replace('"', '"').replace('"', '"') |
| | cleaned = cleaned.replace(''', "'").replace(''', "'") |
| | |
| | return cleaned |
| | |
| | def extract_metadata(self, text: str) -> Dict: |
| | """提取文本元数据(标题、作者等) |
| | |
| | Args: |
| | text: 输入文本 |
| | |
| | Returns: |
| | 元数据字典 |
| | """ |
| | metadata = { |
| | 'title': None, |
| | 'author': None, |
| | 'year': None, |
| | } |
| | |
| | |
| | lines = text.split('\n')[:20] |
| | |
| | for line in lines: |
| | line = line.strip() |
| | |
| | |
| | if not metadata['title'] and len(line) > 5 and len(line) < 100: |
| | |
| | if line.isupper() or line.istitle(): |
| | metadata['title'] = line |
| | |
| | |
| | author_patterns = [ |
| | r'by\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', |
| | r'作者[::]\s*(.+)', |
| | r'Author[:\s]+(.+)', |
| | ] |
| | |
| | for pattern in author_patterns: |
| | match = re.search(pattern, line, re.IGNORECASE) |
| | if match: |
| | metadata['author'] = match.group(1).strip() |
| | break |
| | |
| | |
| | year_match = re.search(r'\b(19|20)\d{2}\b', line) |
| | if year_match: |
| | metadata['year'] = year_match.group(0) |
| | |
| | return metadata |
| | |
| | def sample_text(self, text: str, sample_size: int = 1000, |
| | strategy: str = 'random') -> str: |
| | """从文本中采样 |
| | |
| | Args: |
| | text: 输入文本 |
| | sample_size: 采样大小(字符数) |
| | strategy: 采样策略 ('start', 'random', 'distributed') |
| | |
| | Returns: |
| | 采样的文本 |
| | """ |
| | if len(text) <= sample_size: |
| | return text |
| | |
| | if strategy == 'start': |
| | |
| | return text[:sample_size] |
| | |
| | elif strategy == 'random': |
| | |
| | import random |
| | start = random.randint(0, len(text) - sample_size) |
| | return text[start:start + sample_size] |
| | |
| | elif strategy == 'distributed': |
| | |
| | num_samples = 3 |
| | sample_per_part = sample_size // num_samples |
| | samples = [] |
| | |
| | for i in range(num_samples): |
| | start = (len(text) // num_samples) * i |
| | end = min(start + sample_per_part, len(text)) |
| | samples.append(text[start:end]) |
| | |
| | return '\n...\n'.join(samples) |
| | |
| | else: |
| | return text[:sample_size] |