""" ========================================== Table-Driven DFA Tokenizer ========================================== """ from __future__ import annotations import json import os from dataclasses import dataclass, field from typing import Optional # --------------------------------------------------------------------------- # Schema loading and validation # --------------------------------------------------------------------------- class SchemaError(ValueError): """Raised when a language schema JSON is malformed or incomplete.""" @dataclass class LanguageSchema: language: str grammar_notation: str unicode_blocks: list[tuple[int, int]] char_classes: dict[str, set[int]] transitions: dict[str, dict[str, Optional[str]]] start_state: str accept_states: set[str] emit_states: set[str] def get_regex(self) -> str: parts = [] for cps in self.char_classes.values(): for cp in cps: parts.append(chr(cp)) if not parts: return "" safe_parts = [] for p in parts: if p in ('-', ']', '\\', '^'): safe_parts.append('\\' + p) else: safe_parts.append(p) char_set = "".join(set(safe_parts)) return f"[{char_set}]+" class SchemaLoader: def load(self, path: str) -> LanguageSchema: with open(path, "r", encoding="utf-8") as fh: raw = json.load(fh) language = raw.get("language", "unknown") grammar = raw.get("grammar_notation", "") if "char_classes" not in raw: raise SchemaError(f"[{path}] Missing 'char_classes' key.") if "dfa" not in raw: raise SchemaError(f"[{path}] Missing 'dfa' key.") unicode_blocks = [] for rng in raw.get("unicode_blocks", []): unicode_blocks.append((int(rng[0], 16), int(rng[1], 16))) char_classes: dict[str, set[int]] = {} for label, definition in raw["char_classes"].items(): if label.startswith("_"): continue cps: set[int] = set() for rng in definition.get("ranges", []): lo, hi = int(rng[0], 16), int(rng[1], 16) cps.update(range(lo, hi + 1)) for cp_hex in definition.get("codepoints", []): cps.add(int(cp_hex, 16)) char_classes[label] = cps dfa_raw = raw["dfa"] start_state = dfa_raw.get("start", "START") accept_states = set(dfa_raw.get("accept_states", [])) emit_states = set(dfa_raw.get("emit_states", [])) transitions = dfa_raw.get("transitions", {}) return LanguageSchema( language=language, grammar_notation=grammar, unicode_blocks=unicode_blocks, char_classes=char_classes, transitions=transitions, start_state=start_state, accept_states=accept_states, emit_states=emit_states, ) # --------------------------------------------------------------------------- # Codepoint classifier # --------------------------------------------------------------------------- class CharClassifier: def __init__(self, schema: LanguageSchema): self._table: dict[int, str] = {} for label, cps in schema.char_classes.items(): for cp in cps: if cp in self._table: continue self._table[cp] = label def classify(self, ch: str) -> str: return self._table.get(ord(ch), "O") # --------------------------------------------------------------------------- # DFA Tokenizer # --------------------------------------------------------------------------- class LinguisTrie: def __init__(self, schema: LanguageSchema): self._schema = schema self._classifier = CharClassifier(schema) self._transitions = schema.transitions self._start = schema.start_state self._accept = schema.accept_states self._emit = schema.emit_states def tokenize(self, text: str, leading_space: bool = False) -> list[str]: tokens: list[str] = [] n = len(text) pos = 0 pending_space = " " if leading_space and text and text[0] not in (" ", "\t", "\n", "\r") else "" while pos < n: ch = text[pos] # ─── Whitespace handling (leading-space mode) ──────────── if leading_space and ch in (" ", "\t", "\n", "\r"): ws_buffer = "" while pos < n and text[pos] in (" ", "\t", "\n", "\r"): ws_buffer += text[pos] pos += 1 if ws_buffer.endswith(" "): for ws_char in ws_buffer[:-1]: tokens.append(ws_char) pending_space = " " else: for ws_char in ws_buffer: tokens.append(ws_char) pending_space = "" continue # ─── DFA syllable recognition ──────────────────── cls = self._classifier.classify(ch) init_next = self._transitions.get(self._start, {}).get(cls) if init_next is None: if pending_space: tokens.append(pending_space + ch) pending_space = "" else: tokens.append(ch) pos += 1 continue if init_next in self._emit: tokens.append(pending_space + ch) pending_space = "" pos += 1 continue span_start = pos state = init_next pos += 1 last_accept_pos = pos if state in self._accept else -1 while pos < n: ch2 = text[pos] cls2 = self._classifier.classify(ch2) next_state = self._transitions.get(state, {}).get(cls2) if next_state is None: break state = next_state pos += 1 if state in self._accept: last_accept_pos = pos elif state in self._emit: last_accept_pos = pos break if last_accept_pos > span_start: emit_end = last_accept_pos else: emit_end = span_start + 1 # Fallback: Emit only the first character as an ORPHAN tokens.append(pending_space + text[span_start:emit_end]) pending_space = "" pos = emit_end if pending_space: tokens.append(pending_space) return tokens # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ @property def language(self) -> str: return self._schema.language @property def unicode_blocks(self) -> list[tuple[int, int]]: return self._schema.unicode_blocks @property def regex(self) -> str: return self._schema.get_regex() @property def grammar(self) -> str: return self._schema.grammar_notation # --------------------------------------------------------------------------- # Factory # --------------------------------------------------------------------------- _SCHEMA_DIR = os.path.join(os.path.dirname(__file__), "schemas") _schema_loader = SchemaLoader() _dfa_cache: dict[str, LinguisTrie] = {} def build_linguis_trie(schema_path: str) -> LinguisTrie: if schema_path not in _dfa_cache: schema = _schema_loader.load(schema_path) _dfa_cache[schema_path] = LinguisTrie(schema) return _dfa_cache[schema_path] def load_dfa_map(script_mode: str) -> dict[str, LinguisTrie]: import glob dfa_map = {} pattern = os.path.join(_SCHEMA_DIR, "*.json") for file in glob.glob(pattern): try: trie = build_linguis_trie(file) if script_mode in ("mixed", "all") or script_mode == trie.language: dfa_map[trie.language] = trie except Exception as e: print(f"Warning: Failed to load schema {file}: {e}") return dfa_map # --------------------------------------------------------------------------- # Self-test # --------------------------------------------------------------------------- if __name__ == "__main__": import sys print("=" * 65) print("DFA Tokenizer — self-test") print("=" * 65) # --- Load All Schemas --- dfas = load_dfa_map("all") sinhala_dfa = dfas.get("sinhala") if sinhala_dfa: print(f"\n[Sinhala DFA] grammar: {sinhala_dfa.grammar}\n") sinhala_tests = [ "ශ්‍රී ලංකා ද්වීපයේ ස්වෛරීභාවය සහ ත්‍රිවිධ හමුදාව.", "භාෂාවේ ප්‍රෞඪත්වය විදහාපායි", "ආචාර්යවරයාගේ වෛද්‍ය විද්‍යා පර්යේෂණය සාර්ථකයි.", "චන්ද්‍රයාගේ ආලෝකය පෘථිවියට ක්ෂණිකව ලැබේ.", "මම ක්‍ෂණිකව ගඟට පැන්නා", "සඤ්ඤක ක්ෂමතාවය ක්‍රමය සහ ඥානය", "ද්වී ත්වේ ලං කඃ", "2026 වසරේ AI තාක්ෂණය 60% දියුණුයි!", ] for text in sinhala_tests: toks = sinhala_dfa.tokenize(text, leading_space=True) print(f" Input : {text}") print(f" Syllables: {toks}") print(f" Count : {len(toks)}") print("-" * 65) # --- Devanagari --- deva_dfa = dfas.get("devanagari") if deva_dfa: print(f"\n[Devanagari DFA] grammar: {deva_dfa.grammar}\n") deva_tests = [ "नमस्ते", "भारत", "हिन्दी", "संस्कृत", "क़िला", "ज़िंदगी", "प्रेम", "द्वारा", "श्रीमान्", "हिन्दुस्तान", "नमस्कार दुनिया", "मैं ठीक हूँ", "विद्यालय में पढ़ाई होती है।", ] for text in deva_tests: toks = deva_dfa.tokenize(text, leading_space=True) print(f" Input : {text}") print(f" Syllables: {toks}") print(f" Count : {len(toks)}") print("-" * 65) print("\nAll self-tests complete.") sys.exit(0)