| import os |
| import re |
| import json |
| import logging |
| import warnings |
| from pathlib import Path |
| from typing import List, Dict, Optional, Tuple |
| from dataclasses import dataclass, field |
| from enum import Enum |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| import stanza |
| import pyarabic.araby as araby |
| from sentence_transformers import SentenceTransformer, util |
| from fastapi import FastAPI, HTTPException, Query |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, Field |
|
|
| warnings.filterwarnings("ignore") |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") |
| logger = logging.getLogger("ArabicSignNLP") |
|
|
|
|
| |
| class Config: |
| CSV_PATH: str = os.getenv("CSV_PATH", "arabic_sign_lang_features.csv") |
| KEYPOINTS_FOLDER: str = os.getenv("KEYPOINTS_FOLDER", "keypoints") |
| SEQUENCE_OUTPUT_PATH: str = "/tmp/sequence.txt" |
| EMBEDDING_MODEL: str = "aubmindlab/bert-base-arabertv2" |
| SIMILARITY_THRESHOLD: float = float(os.getenv("SIMILARITY_THRESHOLD", "0.72")) |
| INCLUDE_PREPOSITION_WORDS: bool = False |
| API_HOST: str = "0.0.0.0" |
| API_PORT: int = 7860 |
| CSV_LABEL_COLUMN: str = "label" |
|
|
|
|
| |
| ARABIC_LETTER_TO_LABEL: Dict[str, str] = { |
| "ا": "Alef", "أ": "Alef", "إ": "Alef", "آ": "Alef", |
| "ب": "Beh", "ت": "Teh", "ة": "Teh_Marbuta", |
| "ث": "Theh", "ج": "Jeem", "ح": "Hah", |
| "خ": "Khah", "د": "Dal", "ذ": "Thal", |
| "ر": "Reh", "ز": "Zain", "س": "Seen", |
| "ش": "Sheen", "ص": "Sad", "ض": "Dad", |
| "ط": "Tah", "ظ": "Zah", "ع": "Ain", |
| "غ": "Ghain", "ف": "Feh", "ق": "Qaf", |
| "ك": "Kaf", "ل": "Lam", "م": "Meem", |
| "ن": "Noon", "ه": "Heh", "و": "Waw", |
| "ي": "Yeh", "ى": "Yeh", "لا": "Laa", |
| } |
|
|
|
|
| |
| class ArabicTextNormalizer: |
| DIALECT_TO_FUSA: Dict[str, str] = { |
| "مش": "لا", "مو": "لا", "ماش": "لا", |
| "عايز": "يريد", "عاوز": "يريد", "بدي": "يريد", "بدك": "يريد", "بده": "يريد", |
| "حابب": "يحب", "بحب": "يحب", "باحب": "يحب", "بتحب": "يحب", |
| "فين": "اين", "وين": "اين", "منين": "من اين", "منيين": "من اين", |
| "ايه": "ماذا", "ايش": "ماذا", "شو": "ماذا", "وش": "ماذا", |
| "كيفك": "كيف حالك", "كيفكم": "كيف حالكم", "عامل ايه": "كيف حالك", |
| "تعال": "اقبل", "تعالى": "اقبل", |
| "هيك": "هكذا", "كده": "هكذا", "كدا": "هكذا", "هكيه": "هكذا", |
| "دلوقتي": "الان", "دلوقت": "الان", "هلا": "الان", "هلق": "الان", "هسه": "الان", |
| "بكره": "غدا", "بكرا": "غدا", "بكرة": "غدا", |
| "امبارح": "امس", "مبارح": "امس", |
| "ليش": "لماذا", "ليه": "لماذا", "علاش": "لماذا", |
| "تمام": "جيد", "ماشي": "جيد", "عادي": "جيد", |
| "روح": "يذهب", "اروح": "يذهب", "يروح": "يذهب", "رايح": "يذهب", |
| "جاي": "يأتي", "جاية": "يأتي", "جاييين": "يأتي", |
| "اشتري": "يشتري", "اشترى": "يشتري", "بشتري": "يشتري", "بيشتري": "يشتري", |
| "باكل": "ياكل", "بياكل": "ياكل", |
| "بشرب": "يشرب", "بيشرب": "يشرب", |
| "عارف": "يعرف", "عارفة": "يعرف", "بعرف": "يعرف", |
| "شغل": "عمل", "بشتغل": "يعمل", "بيشتغل": "يعمل", |
| } |
|
|
| _SUFFIXES = ["ين", "ون", "ات", "ة", "ها", "هم", "هن", "كم", "كن", "نا", "وا", "ا"] |
|
|
| def __init__(self): |
| self._non_arabic_pattern = re.compile(r"[^\u0600-\u06FF\s]") |
| self._multi_space_pattern = re.compile(r"\s+") |
| self._tatweel_pattern = re.compile(r"\u0640+") |
|
|
| def normalize(self, text: str) -> str: |
| if not text or not isinstance(text, str): |
| raise ValueError("Input text must be a non-empty string.") |
| text = text.strip() |
| text = self._apply_dialect_mapping(text) |
| text = araby.strip_tashkeel(text) |
| text = self._tatweel_pattern.sub("", text) |
| text = re.sub(r"[\u0625\u0623\u0622]", "\u0627", text) |
| text = re.sub(r"[\u0624\u0626]", "\u0648", text) |
| text = re.sub(r"\u0649(?=\s|$)", "\u064a", text) |
| text = re.sub(r"\u0629(?=\s|$)", "\u0647", text) |
| text = self._non_arabic_pattern.sub(" ", text) |
| text = self._multi_space_pattern.sub(" ", text).strip() |
| if not text: |
| raise ValueError("Text became empty after normalization.") |
| return text |
|
|
| def _apply_dialect_mapping(self, text: str) -> str: |
| words = text.split() |
| result = [] |
| for word in words: |
| if word in self.DIALECT_TO_FUSA: |
| result.append(self.DIALECT_TO_FUSA[word]) |
| continue |
| matched = False |
| for suffix in self._SUFFIXES: |
| if word.endswith(suffix) and len(word) > len(suffix) + 1: |
| root = word[: -len(suffix)] |
| if root in self.DIALECT_TO_FUSA: |
| result.append(self.DIALECT_TO_FUSA[root]) |
| matched = True |
| break |
| if not matched: |
| result.append(word) |
| return " ".join(result) |
|
|
| def normalize_label(self, label: str) -> str: |
| try: |
| return self.normalize(label) |
| except ValueError: |
| return label |
|
|
|
|
| |
| @dataclass |
| class ProcessedWord: |
| original: str |
| normalized: str |
| lemma: str |
| pos: str |
| is_person_name: bool |
| is_place_name: bool |
|
|
|
|
| class ArabicNLPProcessor: |
| SKIP_WORDS_CORE = {"و", "ف", "ب", "ل", "ك", "ال", "قد", "لقد", "سوف", "ان", "إن", "لان", "حتى", "كي"} |
| SKIP_WORDS_PREPOSITIONS = {"في", "من", "الى", "على", "عن", "مع", "عند", "لدى"} |
| _AL_WHITELIST = {"الان", "الله", "الذي", "التي", "اللذين", "اللتين"} |
|
|
| def _active_skip_words(self) -> set: |
| s = set(self.SKIP_WORDS_CORE) |
| if not Config.INCLUDE_PREPOSITION_WORDS: |
| s.update(self.SKIP_WORDS_PREPOSITIONS) |
| return s |
|
|
| def __init__(self): |
| self._pipeline = None |
|
|
| def load(self): |
| logger.info("Downloading Stanza Arabic models...") |
| stanza.download("ar", verbose=False) |
| self._pipeline = stanza.Pipeline(lang="ar", processors="tokenize,mwt,pos,lemma,ner", verbose=False) |
| logger.info("Stanza Arabic pipeline ready.") |
|
|
| def _strip_al(self, word: str) -> str: |
| if word in self._AL_WHITELIST: |
| return word |
| if word.startswith("ال") and len(word) > 3: |
| return word[2:] |
| return word |
|
|
| def process(self, normalized_text: str) -> List[ProcessedWord]: |
| if self._pipeline is None: |
| raise RuntimeError("Call load() before process().") |
| doc = self._pipeline(normalized_text) |
| results: List[ProcessedWord] = [] |
| skip_words = self._active_skip_words() |
| for sentence in doc.sentences: |
| for word in sentence.words: |
| if word.text in skip_words: |
| continue |
| if word.pos in {"PUNCT", "SYM", "X", "DET", "CCONJ", "SCONJ"}: |
| continue |
| if len(word.text) <= 1: |
| continue |
| ner_tag = word.parent.ner if word.parent else "O" |
| normalized = self._strip_al(word.text) |
| results.append(ProcessedWord( |
| original=word.text, |
| normalized=normalized, |
| lemma=word.lemma if word.lemma else word.text, |
| pos=word.pos if word.pos else "NOUN", |
| is_person_name="PER" in ner_tag or "PERS" in ner_tag, |
| is_place_name="LOC" in ner_tag or "GPE" in ner_tag, |
| )) |
| return results |
|
|
|
|
| |
| @dataclass |
| class SignMatch: |
| found: bool |
| sign_label: str |
| confidence: float |
| method: str |
|
|
|
|
| class SemanticSignMatcher: |
| def __init__(self, csv_path: str, label_column: str, threshold: float): |
| self.threshold = threshold |
| self._word_signs: List[str] = [] |
| self._raw_labels: List[str] = [] |
| self._sign_embeddings = None |
| self._model: Optional[SentenceTransformer] = None |
| self._device = "cuda" if torch.cuda.is_available() else "cpu" |
| self._normalizer: Optional[ArabicTextNormalizer] = None |
| self._load_database(csv_path, label_column) |
|
|
| def set_normalizer(self, normalizer: ArabicTextNormalizer): |
| self._normalizer = normalizer |
|
|
| def _normalize_label(self, label: str) -> str: |
| if self._normalizer: |
| return self._normalizer.normalize_label(label) |
| return label |
|
|
| def _load_database(self, csv_path: str, label_column: str): |
| |
| if not os.path.exists(csv_path): |
| logger.info("CSV not found locally. Downloading from Hugging Face...") |
| import urllib.request |
| url = "https://huggingface.co/spaces/SondosM/avatarAPI/resolve/main/arabic_sign_lang_features.csv" |
| try: |
| urllib.request.urlretrieve(url, csv_path) |
| logger.info("CSV downloaded successfully.") |
| except Exception as e: |
| logger.warning(f"Failed to download CSV: {e}. No word signs loaded.") |
| return |
| |
|
|
| df = pd.read_csv(csv_path, low_memory=False) |
| if label_column not in df.columns: |
| raise ValueError(f"Column '{label_column}' not found. Available: {list(df.columns)}") |
| all_labels = df[label_column].dropna().unique().tolist() |
| arabic_labels = [ |
| str(l) for l in all_labels |
| if isinstance(l, str) and any("\u0600" <= c <= "\u06ff" for c in str(l)) |
| ] |
| self._raw_labels = arabic_labels |
| self._word_signs = arabic_labels.copy() |
| logger.info(f"Database: {len(arabic_labels)} Arabic word labels loaded.") |
|
|
| def _finalize_labels(self): |
| if self._normalizer and self._raw_labels: |
| self._word_signs = [self._normalize_label(l) for l in self._raw_labels] |
|
|
| def load_model(self): |
| self._finalize_labels() |
| if not self._word_signs: |
| logger.warning("No Arabic words to encode. Skipping model load.") |
| return |
| logger.info(f"Loading {Config.EMBEDDING_MODEL} on {self._device} ...") |
| self._model = SentenceTransformer(Config.EMBEDDING_MODEL, device=self._device) |
| logger.info(f"Encoding {len(self._word_signs)} labels...") |
| self._sign_embeddings = self._model.encode( |
| self._word_signs, convert_to_tensor=True, device=self._device, |
| show_progress_bar=True, batch_size=64, |
| ) |
| logger.info("Sign matcher ready.") |
|
|
| def find_sign(self, word_text: str, lemma: str) -> SignMatch: |
| if not self._word_signs: |
| return SignMatch(found=False, sign_label="", confidence=0.0, method="none") |
| norm_word = self._normalize_label(word_text) |
| norm_lemma = self._normalize_label(lemma) if lemma else "" |
| if norm_word in self._word_signs: |
| idx = self._word_signs.index(norm_word) |
| return SignMatch(True, self._raw_labels[idx], 1.0, "exact") |
| if norm_lemma and norm_lemma != norm_word and norm_lemma in self._word_signs: |
| idx = self._word_signs.index(norm_lemma) |
| return SignMatch(True, self._raw_labels[idx], 0.95, "lemma") |
| if self._model is None or self._sign_embeddings is None: |
| return SignMatch(False, "", 0.0, "none") |
| candidates = list({norm_word, norm_lemma} - {""}) |
| embs = self._model.encode(candidates, convert_to_tensor=True, device=self._device, batch_size=len(candidates)) |
| scores = util.cos_sim(embs, self._sign_embeddings) |
| best_val = float(scores.max()) |
| best_idx = int(scores.argmax() % len(self._word_signs)) |
| if best_val >= self.threshold: |
| return SignMatch(True, self._raw_labels[best_idx], best_val, "semantic") |
| return SignMatch(False, self._raw_labels[best_idx] if self._raw_labels else "", best_val, "none") |
|
|
| def letter_to_label(self, arabic_letter: str) -> Optional[str]: |
| return ARABIC_LETTER_TO_LABEL.get(arabic_letter) |
|
|
| @property |
| def available_signs(self) -> List[str]: |
| return self._raw_labels.copy() |
|
|
|
|
| |
| class ActionType(str, Enum): |
| SIGN = "SIGN" |
| LETTER = "LETTER" |
|
|
|
|
| @dataclass |
| class ExecutionStep: |
| action_type: ActionType |
| identifier: str |
| source_word: str |
| confidence: float |
| match_method: str |
|
|
|
|
| class ExecutionPlanBuilder: |
| def __init__(self, normalizer: ArabicTextNormalizer, nlp_proc: ArabicNLPProcessor, matcher: SemanticSignMatcher): |
| self.normalizer = normalizer |
| self.nlp_proc = nlp_proc |
| self.matcher = matcher |
|
|
| def build(self, raw_text: str) -> List[ExecutionStep]: |
| normalized = self.normalizer.normalize(raw_text) |
| processed_words = self.nlp_proc.process(normalized) |
| plan: List[ExecutionStep] = [] |
| for word in processed_words: |
| if word.is_person_name or word.is_place_name: |
| plan.extend(self._fingerspell(word.original)) |
| continue |
| match = self.matcher.find_sign(word.normalized, word.lemma) |
| if match.found: |
| plan.append(ExecutionStep(ActionType.SIGN, match.sign_label, word.original, match.confidence, match.method)) |
| else: |
| plan.extend(self._fingerspell(word.original)) |
| return plan |
|
|
| def _fingerspell(self, word: str) -> List[ExecutionStep]: |
| steps = [] |
| i = 0 |
| while i < len(word): |
| if i + 1 < len(word) and word[i:i+2] == "لا": |
| label = ARABIC_LETTER_TO_LABEL.get("لا") |
| if label: |
| steps.append(ExecutionStep(ActionType.LETTER, label, word, 1.0, "fingerspell")) |
| i += 2 |
| continue |
| letter = word[i] |
| label = ARABIC_LETTER_TO_LABEL.get(letter) |
| if label: |
| steps.append(ExecutionStep(ActionType.LETTER, label, word, 1.0, "fingerspell")) |
| i += 1 |
| return steps |
|
|
|
|
| |
| class BlenderSequenceWriter: |
| def __init__(self, output_path: str, keypoints_folder: str): |
| self.output_path = output_path |
| self.keypoints_folder = keypoints_folder |
|
|
| def write(self, plan: List[ExecutionStep]) -> Dict: |
| if not plan: |
| raise ValueError("Execution plan is empty.") |
| output_dir = Path(self.output_path).parent |
| output_dir.mkdir(parents=True, exist_ok=True) |
| identifiers = [step.identifier for step in plan] |
| missing_files = self._check_missing_keypoints(plan) |
| with open(self.output_path, "w", encoding="utf-8") as f: |
| f.write("\n".join(identifiers)) |
| sign_steps = [s for s in plan if s.action_type == ActionType.SIGN] |
| letter_steps = [s for s in plan if s.action_type == ActionType.LETTER] |
| return { |
| "output_file": self.output_path, |
| "total_steps": len(plan), |
| "sign_count": len(sign_steps), |
| "letter_count": len(letter_steps), |
| "missing_keypoint_files": missing_files, |
| "sequence": identifiers, |
| "detailed_plan": [ |
| {"step": i+1, "type": s.action_type.value, "identifier": s.identifier, |
| "source_word": s.source_word, "confidence": round(s.confidence, 3), "method": s.match_method} |
| for i, s in enumerate(plan) |
| ], |
| } |
|
|
| def _check_missing_keypoints(self, plan: List[ExecutionStep]) -> List[str]: |
| missing = [] |
| for step in plan: |
| npy_path = os.path.join(self.keypoints_folder, f"{step.identifier}.npy") |
| if not os.path.exists(npy_path): |
| missing.append(f"{step.identifier}.npy") |
| return missing |
|
|
|
|
| |
| class ArabicSignTranslator: |
| def __init__(self, plan_builder: ExecutionPlanBuilder, writer: BlenderSequenceWriter): |
| self.builder = plan_builder |
| self.writer = writer |
|
|
| def translate(self, text: str, save_to_file: bool = True) -> Dict: |
| plan = self.builder.build(text) |
| if not plan: |
| return {"status": "error", "message": "No translatable content found.", "input": text} |
| result = {"status": "success", "input": text} |
| if save_to_file: |
| report = self.writer.write(plan) |
| result.update(report) |
| else: |
| result["sequence"] = [step.identifier for step in plan] |
| result["total_steps"] = len(plan) |
| result["sign_count"] = sum(1 for s in plan if s.action_type == ActionType.SIGN) |
| result["letter_count"] = sum(1 for s in plan if s.action_type == ActionType.LETTER) |
| result["missing_keypoint_files"] = [] |
| result["detailed_plan"] = [ |
| {"type": s.action_type.value, "identifier": s.identifier, |
| "source_word": s.source_word, "confidence": round(s.confidence, 3), "method": s.match_method} |
| for s in plan |
| ] |
| return result |
|
|
|
|
| |
| logger.info("Initializing pipeline components...") |
| normalizer = ArabicTextNormalizer() |
| nlp_processor = ArabicNLPProcessor() |
| nlp_processor.load() |
|
|
| sign_matcher = SemanticSignMatcher( |
| csv_path=Config.CSV_PATH, |
| label_column=Config.CSV_LABEL_COLUMN, |
| threshold=Config.SIMILARITY_THRESHOLD, |
| ) |
| sign_matcher.set_normalizer(normalizer) |
| sign_matcher.load_model() |
|
|
| plan_builder = ExecutionPlanBuilder(normalizer, nlp_processor, sign_matcher) |
| writer = BlenderSequenceWriter(Config.SEQUENCE_OUTPUT_PATH, Config.KEYPOINTS_FOLDER) |
| translator = ArabicSignTranslator(plan_builder, writer) |
| logger.info("All components ready.") |
|
|
|
|
| |
| class TranslateRequest(BaseModel): |
| text: str = Field(description="Arabic input text (Fus-ha or Ammiya)", min_length=1, max_length=4000, examples=["انا عايز اروح المدرسة"]) |
| save_sequence: bool = Field(default=False, description="Save sequence file to /tmp/sequence.txt") |
|
|
|
|
| class StepDetail(BaseModel): |
| type: str |
| identifier: str |
| source_word: str |
| confidence: float |
| method: str |
|
|
|
|
| class TranslateResponse(BaseModel): |
| status: str |
| input_text: str |
| sequence: List[str] |
| total_steps: int |
| sign_count: int |
| letter_count: int |
| missing_keypoint_files: List[str] |
| detailed_plan: List[StepDetail] |
|
|
|
|
| app = FastAPI( |
| title="Arabic Sign Language NLP API", |
| description="Translates Arabic text (Fus-ha and Ammiya) into sign animation sequences.", |
| version="1.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| @app.get("/") |
| def health_check(): |
| return { |
| "status": "running", |
| "model": Config.EMBEDDING_MODEL, |
| "signs_in_database": len(sign_matcher.available_signs), |
| } |
|
|
|
|
| @app.post("/translate", response_model=TranslateResponse) |
| def translate_post(request: TranslateRequest): |
| try: |
| result = translator.translate(request.text, save_to_file=request.save_sequence) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
| if result["status"] == "error": |
| raise HTTPException(status_code=422, detail=result["message"]) |
| return TranslateResponse( |
| status=result["status"], |
| input_text=request.text, |
| sequence=result.get("sequence", []), |
| total_steps=result.get("total_steps", 0), |
| sign_count=result.get("sign_count", 0), |
| letter_count=result.get("letter_count", 0), |
| missing_keypoint_files=result.get("missing_keypoint_files", []), |
| detailed_plan=[ |
| StepDetail(type=s["type"], identifier=s["identifier"], source_word=s["source_word"], |
| confidence=s["confidence"], method=s["method"]) |
| for s in result.get("detailed_plan", []) |
| ], |
| ) |
|
|
|
|
| @app.get("/translate") |
| def translate_get( |
| text: str = Query(description="Arabic text to translate"), |
| save_sequence: bool = Query(default=False), |
| ): |
| return translate_post(TranslateRequest(text=text, save_sequence=save_sequence)) |
|
|
| @app.get("/sign/{word}") |
| def get_single_sign(word: str): |
| match = sign_matcher.find_sign(word, word) |
| if match.found: |
| return { |
| "status": "success", |
| "word": word, |
| "identifier": match.sign_label, |
| "confidence": match.confidence, |
| "method": match.method |
| } |
| return { |
| "status": "not_found", |
| "word": word, |
| "message": "الكلمة مش موجودة — هيتم التهجئة حرف حرف" |
| } |
| @app.get("/signs") |
| def list_signs(): |
| return {"total": len(sign_matcher.available_signs), "signs": sign_matcher.available_signs} |
|
|
|
|
| @app.get("/sequence-file") |
| def read_sequence_file(): |
| path = Config.SEQUENCE_OUTPUT_PATH |
| if not os.path.exists(path): |
| raise HTTPException(status_code=404, detail="Sequence file not found. Run a translation first.") |
| with open(path, "r", encoding="utf-8") as f: |
| lines = [line.strip() for line in f.readlines() if line.strip()] |
| return {"file_path": path, "sequence": lines, "count": len(lines)} |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host=Config.API_HOST, port=Config.API_PORT) |