| | import asyncio, base64, copy, hashlib, io, json, os, re, tempfile, time, uuid, httpx, logging |
| |
|
| | from backend import lens_core as core |
| | from http import HTTPStatus |
| | from collections import OrderedDict |
| | from threading import Lock, Semaphore |
| | from dataclasses import dataclass |
| | from typing import Any, Dict, List, Optional |
| | from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request |
| | from fastapi.middleware.cors import CORSMiddleware |
| |
|
| | SERVER_MAX_WORKERS = int(os.environ.get('SERVER_MAX_WORKERS', '15')) |
| | JOB_TTL_SEC = int(os.environ.get('JOB_TTL_SEC', '3600')) |
| | HTTP_TIMEOUT_SEC = float(os.environ.get( |
| | 'HTTP_TIMEOUT_SEC', str(getattr(core, 'AI_TIMEOUT_SEC', 120)))) |
| | SUPPORTED_MODES = {"lens_images", "lens_text"} |
| | BUILD_ID = os.environ.get('TP_BUILD_ID', 'v9-backendfix-20260129') |
| | TP_DEBUG = str(os.environ.get('TP_DEBUG', '')).strip( |
| | ).lower() in ('1', 'true', 'yes', 'on') |
| |
|
| | TP_PARA_MARKER_PREFIX = '<<TP_P' |
| | TP_PARA_MARKER_SUFFIX = '>>' |
| |
|
| | TP_RESULT_CACHE_MAX = int(os.environ.get('TP_RESULT_CACHE_MAX', '24')) |
| | TP_AI_RESULT_CACHE_MAX = int(os.environ.get('TP_AI_RESULT_CACHE_MAX', '16')) |
| | TP_WARMUP_LANG = (os.environ.get('TP_WARMUP_LANG', 'th') or 'th').strip() |
| |
|
| | _result_cache: OrderedDict[str, Dict[str, Any]] = OrderedDict() |
| | _ai_result_cache: OrderedDict[str, Dict[str, Any]] = OrderedDict() |
| | _jobs: Dict[str, Dict[str, Any]] = {} |
| | _job_queue: asyncio.Queue = asyncio.Queue() |
| | _result_cache_lock = Lock() |
| | _ai_cache_lock = Lock() |
| |
|
| | HF_AI_MAX_CONCURRENCY = max( |
| | 1, int(os.environ.get('HF_AI_MAX_CONCURRENCY', '1'))) |
| | HF_AI_MIN_INTERVAL_SEC = max(0.0, float( |
| | os.environ.get('HF_AI_MIN_INTERVAL_SEC', '5'))) |
| | HF_AI_MAX_RETRIES = max(1, int(os.environ.get('HF_AI_MAX_RETRIES', '6'))) |
| | HF_AI_RETRY_BASE_SEC = max(0.2, float( |
| | os.environ.get('HF_AI_RETRY_BASE_SEC', '2'))) |
| | _hf_ai_sem = Semaphore(HF_AI_MAX_CONCURRENCY) |
| | _hf_ai_lock = Lock() |
| | _hf_ai_last_ts = 0.0 |
| | _tp_marker_re = re.compile(r'<<TP_P\d+>>') |
| |
|
| | TP_ACCESS_LOG_MODE = (os.environ.get('TP_ACCESS_LOG_MODE', 'custom') or 'custom').strip().lower() |
| | if TP_ACCESS_LOG_MODE in ('custom', 'tp', 'plain'): |
| | try: |
| | _uv = logging.getLogger('uvicorn.access') |
| | _uv.disabled = True |
| | _uv.propagate = False |
| | _uv.setLevel(logging.CRITICAL) |
| | except Exception: |
| | pass |
| |
|
| | def _dbg(tag: str, data=None) -> None: |
| | if not TP_DEBUG: |
| | return |
| | try: |
| | if data is None: |
| | print(f'[TextPhantom][dbg] {tag}') |
| | else: |
| | s = json.dumps(data, ensure_ascii=False) |
| | if len(s) > 2000: |
| | s = s[:2000] + '…' |
| | print(f'[TextPhantom][dbg] {tag} {s}') |
| | except Exception: |
| | try: |
| | print(f'[TextPhantom][dbg] {tag} {data}') |
| | except Exception: |
| | pass |
| |
|
| | def _tree_stats(tree) -> dict: |
| | if not isinstance(tree, dict): |
| | return {'paras': 0, 'items': 0, 'spans': 0} |
| | paras = tree.get('paragraphs') or [] |
| | if not isinstance(paras, list): |
| | return {'paras': 0, 'items': 0, 'spans': 0} |
| | items = 0 |
| | spans = 0 |
| | for p in paras: |
| | if not isinstance(p, dict): |
| | continue |
| | its = p.get('items') or [] |
| | if not isinstance(its, list): |
| | continue |
| | items += len(its) |
| | for it in its: |
| | if not isinstance(it, dict): |
| | continue |
| | sp = it.get('spans') or [] |
| | if isinstance(sp, list): |
| | spans += len(sp) |
| | return {'paras': len(paras), 'items': items, 'spans': spans} |
| |
|
| | def _tree_to_paragraph_texts(tree: Any) -> List[str]: |
| | if not isinstance(tree, dict): |
| | return [] |
| | paras = tree.get('paragraphs') or [] |
| | if not isinstance(paras, list) or not paras: |
| | return [] |
| | out: List[str] = [] |
| | for p in paras: |
| | if not isinstance(p, dict): |
| | out.append('') |
| | continue |
| | t = str(p.get('text') or '').strip() |
| | if not t: |
| | items = p.get('items') or [] |
| | if isinstance(items, list) and items: |
| | t = ' '.join(str(it.get('text') or '').strip() for it in items if isinstance( |
| | it, dict) and str(it.get('text') or '').strip()) |
| | out.append(t) |
| | return out |
| |
|
| | def _apply_para_markers(paras: List[str]) -> str: |
| | if not paras: |
| | return '' |
| | parts: List[str] = [] |
| | for i, t in enumerate(paras): |
| | parts.append( |
| | f"{TP_PARA_MARKER_PREFIX}{i}{TP_PARA_MARKER_SUFFIX}\n{(t or '').strip()}") |
| | return '\n\n'.join(parts) |
| |
|
| | def _clamp_runaway_repeats(s: str, max_repeat: int = 12) -> str: |
| | if not s: |
| | return '' |
| | pat = re.compile(r"(.)\1{" + str(max_repeat) + r",}") |
| | return pat.sub(lambda m: m.group(1) * max_repeat, s) |
| |
|
| | def _extract_marker_indices(s: str) -> set[int]: |
| | if not s: |
| | return set() |
| | out: set[int] = set() |
| | for m in re.finditer(r"<<TP_P(\d+)>>", s): |
| | try: |
| | out.add(int(m.group(1))) |
| | except Exception: |
| | continue |
| | return out |
| |
|
| | def _needs_ai_retry(ai_text_full: str, expected_paras: int) -> bool: |
| | if expected_paras <= 0: |
| | return False |
| | idx = _extract_marker_indices(ai_text_full) |
| | if len(idx) >= expected_paras: |
| | return False |
| |
|
| | if (TP_PARA_MARKER_PREFIX in (ai_text_full or '')) and (TP_PARA_MARKER_SUFFIX not in (ai_text_full or '')): |
| | return True |
| | return True |
| |
|
| | def _now() -> float: |
| | return time.time() |
| |
|
| | def _lru_get(cache: OrderedDict, lock: Lock, key: str) -> Optional[Dict[str, Any]]: |
| | if not key: |
| | return None |
| | with lock: |
| | v = cache.get(key) |
| | if v is None: |
| | return None |
| | cache.move_to_end(key) |
| | return copy.deepcopy(v) |
| |
|
| | def _lru_set(cache: OrderedDict, lock: Lock, key: str, value: Dict[str, Any], max_items: int) -> None: |
| | if not key or not isinstance(value, dict) or max_items <= 0: |
| | return |
| | with lock: |
| | cache[key] = copy.deepcopy(value) |
| | cache.move_to_end(key) |
| | while len(cache) > max_items: |
| | cache.popitem(last=False) |
| |
|
| | def _sha256_hex(blob: bytes) -> str: |
| | return hashlib.sha256(blob).hexdigest() if blob else '' |
| |
|
| | def _ai_prompt_sig(s: str) -> str: |
| | t = (s or '').strip() |
| | if not t: |
| | return '' |
| | return hashlib.sha256(t.encode('utf-8')).hexdigest()[:12] |
| |
|
| | def _build_cache_key(img_hash: str, lang: str, mode: str, source: str, ai_cfg: Optional["AiConfig"]) -> str: |
| | parts = [img_hash, _normalize_lang( |
| | lang), (mode or '').strip(), (source or '').strip()] |
| | if ai_cfg and (source or '').strip().lower() == 'ai': |
| | parts.extend([ |
| | (ai_cfg.provider or '').strip(), |
| | (ai_cfg.model or '').strip(), |
| | (ai_cfg.base_url or '').strip(), |
| | _ai_prompt_sig(ai_cfg.prompt_editable), |
| | ]) |
| | return '|'.join([p for p in parts if p is not None]) |
| |
|
| | def _b64_to_bytes(b64: str) -> bytes: |
| | pad = '=' * ((4 - (len(b64) % 4)) % 4) |
| | return base64.b64decode(b64 + pad) |
| |
|
| | def _datauri_to_bytes(data_uri: str) -> tuple[bytes, str]: |
| | s = (data_uri or '').strip() |
| | if not s.startswith('data:'): |
| | return b'', '' |
| | head, _, b64 = s.partition(',') |
| | mime = '' |
| | if ';' in head: |
| | mime = head[5:head.index(';')] |
| | return _b64_to_bytes(b64), mime or 'application/octet-stream' |
| |
|
| | def _bytes_to_datauri(blob: bytes, mime: str) -> str: |
| | b64 = base64.b64encode(blob).decode('ascii') |
| | return f"data:{mime};base64,{b64}" |
| |
|
| | def _download_bytes(url: str, referer: str = '') -> tuple[bytes, str]: |
| | u = (url or '').strip() |
| | if not u: |
| | return b'', '' |
| | headers = { |
| | 'user-agent': 'Mozilla/5.0 (TextPhantomOCR; +https://huggingface.co/spaces)', |
| | } |
| | ref = (referer or '').strip() |
| | if ref: |
| | headers['referer'] = ref |
| |
|
| | with httpx.Client(timeout=HTTP_TIMEOUT_SEC, follow_redirects=True, headers=headers) as client: |
| | r = client.get(u) |
| | r.raise_for_status() |
| | ct = (r.headers.get('content-type') or '').split(';')[0].strip() |
| | return r.content, ct |
| |
|
| | def _detect_provider_from_key(api_key: str) -> str: |
| | return core._canonical_provider(core._detect_ai_provider_from_key(api_key)) |
| |
|
| | def _resolve_provider_defaults(provider: str) -> dict: |
| | return (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get(provider, {}) |
| |
|
| | def _resolve_model(provider: str, model: str) -> str: |
| | return core._resolve_model(provider, model) |
| |
|
| | def _has_meaningful_text(s: str) -> bool: |
| | t = _tp_marker_re.sub('', str(s or '')) |
| | return bool(t.strip()) |
| |
|
| | def _is_hf_provider(provider: str, base_url: str) -> bool: |
| | p = (provider or '').strip().lower() |
| | b = (base_url or '').strip().lower() |
| | return p == 'huggingface' or 'router.huggingface.co' in b |
| |
|
| | def _is_hf_rate_limited_error(msg: str) -> bool: |
| | t = (msg or '').lower() |
| | if 'rate limit' in t or 'ratelimit' in t or 'too many requests' in t: |
| | return True |
| | if 'http 429' in t or ' 429' in t: |
| | return True |
| | if 'http 503' in t or ' 503' in t or 'overloaded' in t or 'temporarily' in t: |
| | return True |
| | return False |
| |
|
| | def _hf_throttle_before_call() -> None: |
| | if HF_AI_MIN_INTERVAL_SEC <= 0: |
| | return |
| | global _hf_ai_last_ts |
| | with _hf_ai_lock: |
| | now = _now() |
| | dt = now - float(_hf_ai_last_ts or 0.0) |
| | wait = HF_AI_MIN_INTERVAL_SEC - dt |
| | if wait > 0: |
| | time.sleep(wait) |
| | _hf_ai_last_ts = _now() |
| |
|
| | def _openai_compat_generate_with_hf_backoff(api_key: str, base_url: str, model: str, system_text: str, user_parts: List[str]): |
| | last_err: Optional[Exception] = None |
| | for attempt in range(int(HF_AI_MAX_RETRIES)): |
| | try: |
| | with _hf_ai_sem: |
| | _hf_throttle_before_call() |
| | return core._openai_compat_generate_json(api_key, base_url, model, system_text, user_parts) |
| | except Exception as e: |
| | last_err = e |
| | if not _is_hf_rate_limited_error(str(e)): |
| | raise |
| | delay = min(15.0, max(float(HF_AI_MIN_INTERVAL_SEC), float( |
| | HF_AI_RETRY_BASE_SEC) * (2 ** min(attempt, 4)))) |
| | _dbg('ai.hf.backoff', { |
| | 'attempt': attempt + 1, 'delay_sec': round(delay, 2), 'err': str(e)[:240]}) |
| | time.sleep(delay) |
| | continue |
| | if last_err is not None: |
| | raise last_err |
| | raise Exception('hf_backoff_failed') |
| |
|
| | def _normalize_lang(lang: str) -> str: |
| | return core._normalize_lang(lang) |
| |
|
| | @dataclass |
| | class AiConfig: |
| | api_key: str |
| | model: str = 'auto' |
| | provider: str = 'auto' |
| | base_url: str = 'auto' |
| | prompt_editable: str = '' |
| |
|
| | def _collapse_ws(text: str) -> str: |
| | return re.sub(r"\s+", " ", str(text or "")).strip() |
| |
|
| | def _sanitize_marked_text(marked_text: str) -> str: |
| | t = str(marked_text or "") |
| | if not t: |
| | return "" |
| | t = t.replace("\r\n", "\n").replace("\r", "\n") |
| | t = re.sub(r"<<TP_P(?!\d+>>)[^\s>]*>?", "", t) |
| | t = re.sub(r"(?m)^\s*(<<TP_P\d+>>)\s*(\S)", r"\1\n\2", t) |
| |
|
| | lines = t.split("\n") |
| | out0: List[str] = [] |
| | for line in lines: |
| | if "<<TP_P" not in line: |
| | out0.append(line) |
| | continue |
| | m = re.match(r"^\s*(<<TP_P\d+>>)\s*$", line) |
| | if m: |
| | out0.append(m.group(1)) |
| | continue |
| | m2 = re.match(r"^\s*(<<TP_P\d+>>)\s*(.*)$", line) |
| | if m2: |
| | out0.append(m2.group(1)) |
| | rest = (m2.group(2) or "").strip() |
| | if rest: |
| | out0.append(rest) |
| | continue |
| | out0.append(re.sub(r"<<TP_P\d+>>", "", line)) |
| | t = "\n".join(out0) |
| |
|
| | indices = sorted(_extract_marker_indices(t)) |
| | if not indices: |
| | return _collapse_ws(t) |
| | out_lines: List[str] = [] |
| | for idx in indices: |
| | marker = f"<<TP_P{idx}>>" |
| | m = re.search( |
| | rf"{re.escape(marker)}\s*([\s\S]*?)(?=<<TP_P\d+>>|\Z)", t) |
| | seg = m.group(1) if m else "" |
| | seg = _collapse_ws(seg) |
| | out_lines.append(marker) |
| | out_lines.append(seg) |
| | out_lines.append("") |
| | return "\n".join(out_lines).strip("\n") |
| |
|
| |
|
| | def _has_complete_marker_sequence(ai_text_full: str, expected_paras: int) -> bool: |
| | if expected_paras <= 0: |
| | return True |
| | t = str(ai_text_full or "") |
| | need = list(range(int(expected_paras))) |
| | idx = sorted(_extract_marker_indices(t)) |
| | if len(idx) < len(need): |
| | return False |
| | if idx[:len(need)] != need: |
| | return False |
| | last = -1 |
| | for i in need: |
| | m = f"<<TP_P{i}>>" |
| | p = t.find(m) |
| | if p < 0 or p <= last: |
| | return False |
| | last = p |
| | return True |
| |
|
| | def _build_ai_prompt_packet_custom(target_lang: str, original_text_full: str, prompt_editable: str, is_retry: bool = False) -> tuple[str, List[str]]: |
| | lang = _normalize_lang(target_lang) |
| |
|
| | base = (getattr(core, "AI_PROMPT_SYSTEM_BASE", "") or "").strip() |
| |
|
| | style = (prompt_editable or "").strip() |
| | if not style: |
| | style = ( |
| | (getattr(core, "AI_LANG_STYLE", {}) or {}).get(lang) |
| | or (getattr(core, "AI_LANG_STYLE", {}) or {}).get("default") |
| | or "" |
| | ).strip() |
| |
|
| | contract_parts: List[str] = [ |
| | "Output ONLY the translated text (no JSON, no markdown, no extra commentary).", |
| | "Markers: Keep every paragraph marker like <<TP_P0>> unchanged and in order. Do not remove, rename, or add markers.", |
| | "For each marker, output the marker followed by that paragraph's translated text.", |
| | ] |
| | if is_retry: |
| | contract_parts.append( |
| | "Retry: You MUST output ALL markers from the first to the last marker in the input." |
| | ) |
| |
|
| | system_text = "\n\n".join( |
| | [p for p in [base, style, "\n".join(contract_parts)] if p] |
| | ) |
| |
|
| | user_parts: List[str] = ["Input:\n" + str(original_text_full or "")] |
| | return system_text, user_parts |
| |
|
| | def ai_translate_text(original_text_full: str, target_lang: str, ai: AiConfig, is_retry: bool = False) -> dict: |
| | if not _has_meaningful_text(original_text_full): |
| | return { |
| | 'aiTextFull': '', |
| | 'meta': { |
| | 'skipped': True, |
| | 'skipped_reason': 'no_text', |
| | }, |
| | } |
| |
|
| | api_key = (ai.api_key or '').strip() |
| | if not api_key: |
| | raise Exception('AI api_key is required') |
| |
|
| | provider = core._canonical_provider((ai.provider or 'auto')) |
| | if provider in ('', 'auto'): |
| | provider = _detect_provider_from_key(api_key) |
| |
|
| | preset = _resolve_provider_defaults(provider) or {} |
| |
|
| | model = _resolve_model(provider, (ai.model or 'auto')) |
| |
|
| | base_url = (ai.base_url or 'auto').strip() |
| | if base_url in ('', 'auto'): |
| | base_url = (preset.get('base_url') or '').strip() |
| |
|
| | if provider not in ('gemini', 'anthropic'): |
| | if not base_url: |
| | base_url = (_resolve_provider_defaults('openai') or {}).get( |
| | 'base_url') or 'https://api.openai.com/v1' |
| |
|
| | system_text, user_parts = _build_ai_prompt_packet_custom( |
| | target_lang, original_text_full, ai.prompt_editable, is_retry=is_retry |
| | ) |
| |
|
| | started = _now() |
| | used_model = model |
| | if provider == 'gemini': |
| | raw = core._gemini_generate_json( |
| | api_key, model, system_text, user_parts) |
| | elif provider == 'anthropic': |
| | raw = core._anthropic_generate_json( |
| | api_key, model, system_text, user_parts) |
| | else: |
| | if _is_hf_provider(provider, base_url): |
| | raw, used_model = _openai_compat_generate_with_hf_backoff( |
| | api_key, base_url, model, system_text, user_parts) |
| | else: |
| | raw, used_model = core._openai_compat_generate_json( |
| | api_key, base_url, model, system_text, user_parts) |
| |
|
| | ai_text_full = core._parse_ai_textfull_only( |
| | raw) if core.DO_AI_JSON else core._parse_ai_textfull_text_only(raw) |
| |
|
| | ai_text_full = _sanitize_marked_text(ai_text_full) |
| |
|
| | return { |
| | 'aiTextFull': ai_text_full, |
| | 'meta': { |
| | 'model': used_model, |
| | 'provider': provider, |
| | 'base_url': base_url, |
| | 'latency_sec': round(_now() - started, 3), |
| | }, |
| | } |
| |
|
| | def process_image_path(image_path: str, lang: str, mode: str, ai_cfg: Optional[AiConfig]) -> dict: |
| | mode_id = (mode or '').strip() |
| | if mode_id not in SUPPORTED_MODES: |
| | mode_id = 'lens_images' |
| |
|
| | target_lang = _normalize_lang(lang) |
| |
|
| | data = core.get_lens_data_from_image( |
| | image_path, getattr(core, 'FIREBASE_URL', ''), target_lang) |
| | img = core.Image.open(image_path).convert('RGB') |
| | W, H = img.size |
| |
|
| | thai_font = getattr(core, 'FONT_THAI_PATH', 'NotoSansThai-Regular.ttf') |
| | latin_font = getattr(core, 'FONT_LATIN_PATH', 'NotoSans-Regular.ttf') |
| |
|
| | if target_lang == 'ja': |
| | latin_font = getattr(core, 'FONT_JA_PATH', latin_font) |
| | elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): |
| | latin_font = getattr(core, 'FONT_ZH_SC_PATH', latin_font) |
| | elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): |
| | latin_font = getattr(core, 'FONT_ZH_TC_PATH', latin_font) |
| |
|
| | if getattr(core, 'FONT_DOWNLOD', True): |
| | thai_font = core.ensure_font( |
| | thai_font, getattr(core, 'FONT_THAI_URLS', [])) |
| | if target_lang == 'ja': |
| | latin_font = core.ensure_font( |
| | latin_font, getattr(core, 'FONT_JA_URLS', [])) |
| | elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): |
| | latin_font = core.ensure_font( |
| | latin_font, getattr(core, 'FONT_ZH_SC_URLS', [])) |
| | elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): |
| | latin_font = core.ensure_font( |
| | latin_font, getattr(core, 'FONT_ZH_TC_URLS', [])) |
| | else: |
| | latin_font = core.ensure_font( |
| | latin_font, getattr(core, 'FONT_LATIN_URLS', [])) |
| |
|
| | image_url = data.get('imageUrl') if isinstance(data, dict) else None |
| |
|
| | out: Dict[str, Any] = { |
| | 'mode': mode_id, |
| | 'imageUrl': image_url, |
| | 'imageDataUri': '', |
| | 'originalContentLanguage': data.get('originalContentLanguage') if isinstance(data, dict) else None, |
| | 'originalTextFull': data.get('originalTextFull') if isinstance(data, dict) else None, |
| | 'translatedTextFull': data.get('translatedTextFull') if isinstance(data, dict) else None, |
| | 'AiTextFull': '', |
| | 'originalParagraphs': (data.get('originalParagraphs') or []) if isinstance(data, dict) else [], |
| | 'translatedParagraphs': (data.get('translatedParagraphs') or []) if isinstance(data, dict) else [], |
| | 'original': {}, |
| | 'translated': {}, |
| | 'Ai': {}, |
| | } |
| |
|
| | if mode_id == 'lens_images': |
| | if image_url: |
| | decoded = core.decode_imageurl_to_datauri(str(image_url)) |
| | if decoded: |
| | out['imageDataUri'] = decoded |
| | elif isinstance(image_url, str) and image_url.startswith(('http://', 'https://')): |
| | blob, mime2 = _download_bytes(image_url) |
| | out['imageDataUri'] = _bytes_to_datauri( |
| | blob, mime2 or 'image/jpeg') |
| |
|
| | if not out.get('imageDataUri'): |
| | with open(image_path, 'rb') as f: |
| | blob = f.read() |
| | out['imageDataUri'] = _bytes_to_datauri(blob, 'image/jpeg') |
| | return out |
| |
|
| | original_span_tokens = None |
| | original_tree = None |
| | translated_tree = None |
| |
|
| | def _base_img_for_overlay() -> core.Image.Image: |
| | if not (getattr(core, 'ERASE_OLD_TEXT_WITH_ORIGINAL_BOXES', True) and original_span_tokens): |
| | return img |
| | return core.erase_text_with_boxes( |
| | img, |
| | original_span_tokens, |
| | pad_px=getattr(core, 'ERASE_PADDING_PX', 2), |
| | sample_margin_px=getattr(core, 'ERASE_SAMPLE_MARGIN_PX', 6), |
| | ) |
| |
|
| | if getattr(core, 'DO_ORIGINAL', True): |
| | tree, _ = core.decode_tree( |
| | out.get('originalParagraphs') or [], |
| | out.get('originalTextFull') or '', |
| | 'original', |
| | W, |
| | H, |
| | want_raw=False, |
| | ) |
| | original_tree = tree |
| | original_span_tokens = core.flatten_tree_spans(tree) |
| | _dbg('tree.original', _tree_stats(original_tree)) |
| | out['original'] = { |
| | 'originalTree': tree, |
| | 'originalTextFull': out.get('originalTextFull') or '', |
| | } |
| |
|
| | if getattr(core, 'DO_TRANSLATED', True): |
| | tree, _ = core.decode_tree( |
| | out.get('translatedParagraphs') or [], |
| | out.get('translatedTextFull') or '', |
| | 'translated', |
| | W, |
| | H, |
| | want_raw=False, |
| | ) |
| | translated_tree = tree |
| | translated_span_tokens = core.flatten_tree_spans(tree) |
| | _dbg('tree.translated', _tree_stats(translated_tree)) |
| | out['translated'] = { |
| | 'translatedTree': tree, |
| | 'translatedTextFull': out.get('translatedTextFull') or '', |
| | } |
| |
|
| | def _tree_score(tree: Any) -> int: |
| | if not isinstance(tree, dict): |
| | return -1 |
| | paragraphs = tree.get('paragraphs') or [] |
| | if not isinstance(paragraphs, list) or not paragraphs: |
| | return -1 |
| |
|
| | para_count = len(paragraphs) |
| | item_count = 0 |
| | span_count = 0 |
| | for p in paragraphs: |
| | if not isinstance(p, dict): |
| | continue |
| | items = p.get('items') or [] |
| | if not isinstance(items, list): |
| | continue |
| | item_count += len(items) |
| | for it in items: |
| | if not isinstance(it, dict): |
| | continue |
| | spans = it.get('spans') or [] |
| | if isinstance(spans, list): |
| | span_count += len(spans) |
| |
|
| | return item_count * 10000 + para_count * 100 + span_count |
| |
|
| | def _pick_ai_template_tree() -> Optional[Dict[str, Any]]: |
| | tr_score = _tree_score(translated_tree) |
| | og_score = _tree_score(original_tree) |
| |
|
| | if tr_score < 0 and og_score < 0: |
| | return None |
| | if og_score > tr_score: |
| | return original_tree |
| | return translated_tree or original_tree |
| |
|
| | ai_tree = None |
| | if ai_cfg and (ai_cfg.api_key or '').strip() and getattr(core, 'DO_AI', True): |
| | src_paras = _tree_to_paragraph_texts(original_tree or {}) |
| | src_text = _apply_para_markers(src_paras) if src_paras else str( |
| | out.get('originalTextFull') or '') |
| | if not _has_meaningful_text(src_text): |
| | out['AiTextFull'] = '' |
| | out['Ai'] = { |
| | 'meta': { |
| | 'skipped': True, |
| | 'skipped_reason': 'no_text', |
| | } |
| | } |
| | else: |
| | ai = ai_translate_text(src_text, target_lang, ai_cfg) |
| | if src_paras and _needs_ai_retry(str(ai.get('aiTextFull') or ''), len(src_paras)): |
| | _dbg('ai.retry', { |
| | 'expected_paras': len(src_paras), |
| | 'found_markers': len(_extract_marker_indices(str(ai.get('aiTextFull') or ''))), |
| | }) |
| | retry_paras = [_clamp_runaway_repeats(p) for p in src_paras] |
| | retry_text = _apply_para_markers(retry_paras) or src_text |
| | ai = ai_translate_text( |
| | retry_text, target_lang, ai_cfg, is_retry=True) |
| |
|
| | ai_text_full = str(ai.get('aiTextFull') or '') |
| | meta0 = ai.get('meta') or {} |
| | if src_paras: |
| | expected = len(src_paras) |
| | if not _has_complete_marker_sequence(ai_text_full, expected): |
| | fallback_paras = _tree_to_paragraph_texts(translated_tree or {}) |
| | if len(fallback_paras) < expected: |
| | fallback_paras = (fallback_paras + src_paras)[:expected] |
| | else: |
| | fallback_paras = fallback_paras[:expected] |
| |
|
| | found = sorted(_extract_marker_indices(ai_text_full)) |
| | seg_map: Dict[int, str] = {} |
| | for idx in found: |
| | if idx < 0 or idx >= expected: |
| | continue |
| | marker = f"<<TP_P{idx}>>" |
| | m = re.search(rf"{re.escape(marker)}\s*([\s\S]*?)(?=<<TP_P\d+>>|\Z)", ai_text_full) |
| | seg = _collapse_ws(m.group(1) if m else '') |
| | if seg and idx not in seg_map: |
| | seg_map[idx] = seg |
| |
|
| | missing = 0 |
| | out_lines: List[str] = [] |
| | for i in range(expected): |
| | seg = seg_map.get(i) or _collapse_ws(fallback_paras[i] if i < len(fallback_paras) else '') |
| | if not seg_map.get(i): |
| | missing += 1 |
| | out_lines.append(f"<<TP_P{i}>>") |
| | out_lines.append(seg) |
| | out_lines.append('') |
| | ai_text_full = "\n".join(out_lines).strip("\n") |
| | _dbg('ai.marker.repaired', { |
| | 'expected_paras': expected, |
| | 'found_markers': len(seg_map), |
| | 'missing': missing, |
| | }) |
| |
|
| | meta0 = { |
| | **meta0, |
| | 'marker_repaired': True, |
| | 'marker_expected': expected, |
| | 'marker_found': len(seg_map), |
| | 'marker_missing': missing, |
| | } |
| |
|
| | template_tree = _pick_ai_template_tree() |
| | _dbg('ai.template.pick', { |
| | 'score_original': _tree_score(original_tree), |
| | 'score_translated': _tree_score(translated_tree), |
| | 'picked': 'original' if template_tree is original_tree else ('translated' if template_tree is translated_tree else 'none'), |
| | }) |
| | if not isinstance(template_tree, dict): |
| | template_tree = original_tree if isinstance(original_tree, dict) else ( |
| | translated_tree if isinstance(translated_tree, dict) else {}) |
| | patched = core.patch( |
| | {'Ai': {'aiTextFull': str( |
| | ai_text_full or ''), 'aiTree': template_tree}}, |
| | W, |
| | H, |
| | thai_font or '', |
| | latin_font or '', |
| | lang=target_lang, |
| | ) |
| | ai_tree = (patched.get('Ai') or {}).get('aiTree') or {} |
| | _dbg('ai.patched', { |
| | 'ai_text_len': len(ai_text_full), |
| | 'stats_ai': _tree_stats(ai_tree), |
| | 'stats_original': _tree_stats(original_tree or {}), |
| | 'stats_translated': _tree_stats(translated_tree or {}), |
| | 'mode': mode_id, |
| | 'lang': target_lang, |
| | }) |
| |
|
| | shared_para_sizes = core._compute_shared_para_sizes( |
| | [original_tree or {}, translated_tree or {}, ai_tree or {}], |
| | thai_font or '', |
| | latin_font or '', |
| | W, |
| | H, |
| | ) |
| | core._apply_para_font_size(original_tree or {}, shared_para_sizes) |
| | core._apply_para_font_size( |
| | translated_tree or {}, shared_para_sizes) |
| | core._apply_para_font_size(ai_tree or {}, shared_para_sizes) |
| | core._rebuild_ai_spans_after_font_resize( |
| | ai_tree or {}, W, H, thai_font or '', latin_font or '', lang=target_lang) |
| |
|
| | out['AiTextFull'] = ai_text_full |
| | out['Ai'] = { |
| | 'aiTextFull': ai_text_full, |
| | 'aiTree': ai_tree, |
| | 'meta': meta0, |
| | } |
| | if getattr(core, 'DO_AI_HTML', True): |
| | core.fit_tree_font_sizes_for_tp_html( |
| | ai_tree, thai_font or '', latin_font or '', W, H) |
| | out['Ai']['aihtml'] = core.ai_tree_to_tp_html(ai_tree, W, H) |
| | out['Ai']['aihtmlMeta'] = { |
| | 'baseW': int(W), |
| | 'baseH': int(H), |
| | 'format': 'tp', |
| | } |
| |
|
| | if getattr(core, 'DO_ORIGINAL', True) and getattr(core, 'DO_ORIGINAL_HTML', True) and isinstance(original_tree, dict): |
| | core.fit_tree_font_sizes_for_tp_html( |
| | original_tree, thai_font or '', latin_font or '', W, H) |
| | if isinstance(out.get('original'), dict): |
| | out['original']['originalhtml'] = core.ai_tree_to_tp_html( |
| | original_tree or {}, W, H) |
| |
|
| | if getattr(core, 'DO_TRANSLATED', True) and getattr(core, 'DO_TRANSLATED_HTML', True) and isinstance(translated_tree, dict): |
| | core.fit_tree_font_sizes_for_tp_html( |
| | translated_tree, thai_font or '', latin_font or '', W, H) |
| | if isinstance(out.get('translated'), dict): |
| | out['translated']['translatedhtml'] = core.ai_tree_to_tp_html( |
| | translated_tree or {}, W, H) |
| |
|
| | if getattr(core, 'HTML_INCLUDE_CSS', True) and (getattr(core, 'DO_ORIGINAL_HTML', True) or getattr(core, 'DO_TRANSLATED_HTML', True) or getattr(core, 'DO_AI_HTML', True)): |
| | out['htmlCss'] = core.tp_overlay_css() |
| | out['htmlMeta'] = { |
| | 'baseW': int(W), |
| | 'baseH': int(H), |
| | 'format': 'tp', |
| | } |
| | base_img = _base_img_for_overlay() |
| | buf = io.BytesIO() |
| | base_img.save(buf, format='PNG') |
| | out['imageDataUri'] = _bytes_to_datauri(buf.getvalue(), 'image/png') |
| |
|
| | return out |
| |
|
| | app = FastAPI(title='TextPhantom OCR API', version='1.0') |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=['*'], |
| | allow_credentials=True, |
| | allow_methods=['*'], |
| | allow_headers=['*'], |
| | ) |
| |
|
| | @app.middleware("http") |
| | async def _tp_access_log(request: Request, call_next): |
| | resp = await call_next(request) |
| | if TP_ACCESS_LOG_MODE in ('uvicorn', 'off', 'none'): |
| | return resp |
| | try: |
| | path = request.url.path |
| | if request.method == 'GET' and path.startswith("/translate/"): |
| | client = request.client |
| | host = client.host if client else "-" |
| | port = client.port if client else 0 |
| | ver = request.scope.get("http_version") or "1.1" |
| | phrase = HTTPStatus(resp.status_code).phrase |
| | print(f'{host}:{port} - "{request.method} {path} HTTP/{ver}" {resp.status_code} {phrase}', flush=True) |
| | except Exception: |
| | pass |
| | return resp |
| |
|
| | async def _cleanup_jobs_loop(): |
| | while True: |
| | await asyncio.sleep(60) |
| | cutoff = _now() - JOB_TTL_SEC |
| | dead = [jid for jid, j in _jobs.items() if float( |
| | j.get('ts', 0)) < cutoff] |
| | for jid in dead: |
| | _jobs.pop(jid, None) |
| |
|
| | async def _worker_loop(worker_id: int): |
| | while True: |
| | jid, payload = await _job_queue.get() |
| | try: |
| | _jobs[jid] = {'status': 'running', 'ts': _now()} |
| | result = await asyncio.to_thread(_process_payload, payload) |
| | _jobs[jid] = {'status': 'done', 'result': result, 'ts': _now()} |
| | except Exception as e: |
| | _jobs[jid] = {'status': 'error', 'result': str(e), 'ts': _now()} |
| | finally: |
| | _job_queue.task_done() |
| |
|
| | def _process_payload(payload: dict) -> dict: |
| | t_all = time.perf_counter() |
| | mode = (payload.get('mode') or 'lens_images') |
| | lang = (payload.get('lang') or 'en') |
| |
|
| | context = payload.get('context') if isinstance( |
| | payload.get('context'), dict) else {} |
| | page_url = str((context or {}).get('page_url') or '').strip() |
| |
|
| | src = (payload.get('src') or '').strip() |
| | img_bytes = b'' |
| | mime = '' |
| |
|
| | if payload.get('imageDataUri'): |
| | img_bytes, mime = _datauri_to_bytes(payload.get('imageDataUri')) |
| | elif src.startswith('data:'): |
| | img_bytes, mime = _datauri_to_bytes(src) |
| | else: |
| | img_bytes, mime = _download_bytes(src, page_url) |
| |
|
| | t_img = time.perf_counter() |
| |
|
| | if not img_bytes: |
| | raise Exception('No image data') |
| |
|
| | ai_cfg = None |
| | ai = payload.get('ai') or None |
| | source = str(payload.get('source') or '').strip().lower() or 'translated' |
| | if mode == 'lens_text' and source == 'ai' and isinstance(ai, dict): |
| | api_key = str(ai.get('api_key') or '').strip() or ( |
| | os.getenv('AI_API_KEY') or '').strip() |
| | ai_cfg = AiConfig( |
| | api_key=api_key, |
| | model=str(ai.get('model') or 'auto').strip() or 'auto', |
| | provider=str(ai.get('provider') or 'auto').strip() or 'auto', |
| | base_url=str(ai.get('base_url') or 'auto').strip() or 'auto', |
| | prompt_editable=str(ai.get('prompt') or '').strip(), |
| | ) |
| |
|
| | core.DO_AI_JSON = False |
| |
|
| | img_hash = _sha256_hex(img_bytes) |
| | cache_key = '' |
| | if mode == 'lens_text' and img_hash: |
| | cache_source = 'ai' if source == 'ai' else 'text' |
| | cache_key = _build_cache_key( |
| | img_hash, lang, mode, cache_source, ai_cfg) |
| | cached = None |
| | if source == 'ai': |
| | cached = _lru_get(_ai_result_cache, _ai_cache_lock, cache_key) |
| | else: |
| | cached = _lru_get(_result_cache, _result_cache_lock, cache_key) |
| | if cached: |
| | cached['perf'] = { |
| | 'cache': 'hit', |
| | 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), |
| | 'img_ms': round((t_img - t_all) * 1000, 1), |
| | } |
| | return cached |
| |
|
| | suffix = '.png' if (mime or '').endswith('png') else '.jpg' |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f: |
| | f.write(img_bytes) |
| | tmp_path = f.name |
| | t_tmp = time.perf_counter() |
| | try: |
| | out = process_image_path(tmp_path, lang, mode, ai_cfg) |
| | out['perf'] = { |
| | 'cache': 'miss' if cache_key else 'off', |
| | 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), |
| | 'img_ms': round((t_img - t_all) * 1000, 1), |
| | 'tmp_ms': round((t_tmp - t_img) * 1000, 1), |
| | } |
| | if cache_key and isinstance(out, dict): |
| | if source == 'ai': |
| | _lru_set(_ai_result_cache, _ai_cache_lock, |
| | cache_key, out, TP_AI_RESULT_CACHE_MAX) |
| | else: |
| | _lru_set(_result_cache, _result_cache_lock, |
| | cache_key, out, TP_RESULT_CACHE_MAX) |
| | return out |
| | finally: |
| | try: |
| | os.unlink(tmp_path) |
| | except Exception: |
| | pass |
| |
|
| | @app.on_event('startup') |
| | async def _startup(): |
| | print( |
| | f'[TextPhantom][api] starting build={BUILD_ID} workers={SERVER_MAX_WORKERS}') |
| | for i in range(max(1, SERVER_MAX_WORKERS)): |
| | asyncio.create_task(_worker_loop(i)) |
| | asyncio.create_task(_cleanup_jobs_loop()) |
| |
|
| | @app.get('/health') |
| | async def health(): |
| | return {'ok': True, 'build': BUILD_ID} |
| |
|
| | @app.get('/version') |
| | async def version(): |
| | return {'ok': True, 'build': BUILD_ID, 'core': 'lens_core'} |
| |
|
| | @app.get('/warmup') |
| | async def warmup(lang: str = TP_WARMUP_LANG): |
| | t0 = time.perf_counter() |
| | r = core.warmup(lang) |
| | return {'ok': True, 'build': BUILD_ID, 'dt_ms': round((time.perf_counter() - t0) * 1000, 1), 'result': r} |
| |
|
| | @app.get('/meta') |
| | async def meta(): |
| | langs = getattr(core, 'UI_LANGUAGES', None) or [] |
| | sources = [ |
| | {'id': 'original', 'name': 'Original'}, |
| | {'id': 'translated', 'name': 'Translated'}, |
| | {'id': 'ai', 'name': 'Ai'}, |
| | ] |
| | env_key = (os.getenv('AI_API_KEY') or '').strip() |
| | return {'ok': True, 'languages': langs, 'sources': sources, 'has_env_ai_key': bool(env_key)} |
| |
|
| | @app.post('/translate') |
| | async def translate(payload: Dict[str, Any]): |
| | jid = str(uuid.uuid4()) |
| | _dbg('rest.enqueue', { |
| | 'id': jid, |
| | 'mode': str(payload.get('mode') or ''), |
| | 'lang': str(payload.get('lang') or ''), |
| | 'source': str(payload.get('source') or ''), |
| | 'has_datauri': bool(payload.get('imageDataUri')), |
| | 'has_src': bool(payload.get('src')), |
| | }) |
| | _jobs[jid] = {'status': 'queued', 'ts': _now()} |
| | await _job_queue.put((jid, payload)) |
| | return {'id': jid} |
| |
|
| | @app.get('/translate/{job_id}') |
| | async def translate_status(job_id: str): |
| | j = _jobs.get(job_id) |
| | if not j: |
| | return {'status': 'error', 'result': 'job_not_found'} |
| | return j |
| |
|
| | @app.post('/ai/resolve') |
| | async def ai_resolve(payload: Dict[str, Any]): |
| | api_key = str(payload.get('api_key') or '').strip() or ( |
| | os.getenv('AI_API_KEY') or '').strip() |
| | lang = _normalize_lang(str(payload.get('lang') or 'en')) |
| | style_default = ((getattr(core, 'AI_LANG_STYLE', {}) or {}).get(lang) or (getattr(core, 'AI_LANG_STYLE', {}) or {}).get('default') or '').strip() |
| | if not api_key: |
| | return { |
| | 'ok': False, |
| | 'error': 'missing_api_key', |
| | 'provider': '', |
| | 'default_model': '', |
| | 'models': [], |
| | 'lang': lang, |
| | 'prompt_editable_default': style_default, |
| | } |
| |
|
| | provider = core._canonical_provider(str(payload.get('provider') or 'auto')) |
| | if provider in ('', 'auto'): |
| | provider = _detect_provider_from_key(api_key) |
| |
|
| | preset = _resolve_provider_defaults(provider) or {} |
| | requested_model = str(payload.get('model') or 'auto').strip() or 'auto' |
| | resolved_model = _resolve_model(provider, requested_model) |
| |
|
| | models: List[str] = [] |
| | base_url = (str(payload.get('base_url') or 'auto')).strip() |
| | if base_url in ('', 'auto'): |
| | base_url = (preset.get('base_url') or '').strip() |
| |
|
| | if provider == 'huggingface': |
| | if base_url: |
| | models = core._hf_router_available_models(api_key, base_url) |
| | if requested_model.lower() in ('', 'auto'): |
| | fallback = core._pick_hf_fallback_model(models) |
| | if fallback: |
| | resolved_model = fallback |
| |
|
| | elif provider == 'gemini': |
| | models = getattr(core, '_gemini_available_models', |
| | lambda _k: [])(api_key) |
| | if not models: |
| | models = ['gemini-2.5-flash', 'gemini-2.5-flash-lite', 'gemini-2.5-pro', |
| | 'gemini-2.0-flash', 'gemini-3-flash-preview', 'gemini-3-pro-preview'] |
| |
|
| | elif provider == 'anthropic': |
| | models = getattr(core, '_anthropic_available_models', |
| | lambda _k, _b=None: [])(api_key, base_url) |
| |
|
| | else: |
| | if not base_url: |
| | base_url = (core.AI_PROVIDER_DEFAULTS.get('openai') or {}).get( |
| | 'base_url') or 'https://api.openai.com/v1' |
| | models = getattr(core, '_openai_compat_available_models', |
| | lambda _k, _b: [])(api_key, base_url) |
| |
|
| | if provider == 'huggingface' and not models: |
| | models = [ |
| | 'google/gemma-3-27b-it:featherless-a', |
| | 'google/gemma-3-27b-it', |
| | 'google/gemma-2-2b-it', |
| | 'google/gemma-2-9b-it', |
| | ] |
| |
|
| | if provider != 'huggingface' and not models: |
| | fallback_models: List[str] = [] |
| | preset_model = str(preset.get('model') or '').strip() |
| | if preset_model: |
| | fallback_models.append(preset_model) |
| |
|
| | provider_defaults = (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get( |
| | provider, {}) or {} |
| | provider_model = str(provider_defaults.get('model') or '').strip() |
| | if provider_model: |
| | fallback_models.append(provider_model) |
| |
|
| | if provider == 'gemini': |
| | fallback_models.extend([ |
| | 'gemini-2.5-flash', |
| | 'gemini-2.5-flash-lite', |
| | 'gemini-2.5-pro', |
| | 'gemini-2.0-flash', |
| | 'gemini-3-flash-preview', |
| | 'gemini-3-pro-preview', |
| | ]) |
| |
|
| | models = sorted(set([m for m in fallback_models if m]), key=str.lower) |
| |
|
| | if not models: |
| | all_models: List[str] = [] |
| | for _, v in (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).items(): |
| | m2 = str((v or {}).get('model') or '').strip() |
| | if m2: |
| | all_models.append(m2) |
| | models = sorted(set(all_models), key=str.lower) |
| |
|
| | if models: |
| | models = sorted( |
| | {m.strip() for m in models if isinstance(m, str) and m.strip()}, |
| | key=str.lower, |
| | ) |
| |
|
| | if models and resolved_model not in models: |
| | resolved_model = models[0] |
| |
|
| | prompt_default = style_default |
| |
|
| | return { |
| | 'ok': True, |
| | 'provider': provider, |
| | 'base_url': base_url, |
| | 'default_model': (preset.get('model') or ''), |
| | 'model': resolved_model, |
| | 'models': models, |
| | 'prompt_editable_default': prompt_default, |
| | } |
| |
|
| | @app.get('/ai/prompt/default') |
| | async def ai_prompt_default(lang: str = 'en'): |
| | l = _normalize_lang(lang) |
| | base = (getattr(core, 'AI_PROMPT_SYSTEM_BASE', '') or '').strip() |
| | style = (getattr(core, 'AI_LANG_STYLE', {}) or {}).get(l) or ( |
| | getattr(core, 'AI_LANG_STYLE', {}) or {}).get('default') or '' |
| | style = (style or '').strip() |
| | contract = "\n".join([ |
| | 'Return ONLY valid JSON (no markdown, no extra text).', |
| | 'Output JSON MUST have exactly one key: "aiTextFull".', |
| | 'Schema example: {"aiTextFull":"..."}', |
| | 'Markers: Keep every paragraph marker like <<TP_P0>> unchanged and in order. Do not remove or add markers.', |
| | "aiTextFull must include all markers, each followed by that paragraph's translated text.", |
| | ]) |
| | system_text = "\n\n".join([p for p in [base, style, contract] if p]) |
| | return { |
| | 'ok': True, |
| | 'lang': l, |
| | 'prompt_editable_default': style, |
| | 'lang_style': style, |
| | 'system_base': base, |
| | 'contract': contract, |
| | 'system_text': system_text, |
| | } |
| |
|
| | @app.websocket('/ws') |
| | async def ws_endpoint(ws: WebSocket): |
| | await ws.accept() |
| | await ws.send_text(json.dumps({'type': 'ack'})) |
| | try: |
| | while True: |
| | msg = await ws.receive_text() |
| | data = json.loads(msg) |
| | if data.get('type') != 'job': |
| | continue |
| | jid = str(data.get('id') or '') |
| | payload = data.get('payload') or {} |
| | _dbg('ws.job', { |
| | 'id': jid, |
| | 'mode': str(payload.get('mode') or ''), |
| | 'lang': str(payload.get('lang') or ''), |
| | 'source': str(payload.get('source') or ''), |
| | 'has_datauri': bool(payload.get('imageDataUri')), |
| | 'has_src': bool(payload.get('src')), |
| | }) |
| | try: |
| | result = await asyncio.to_thread(_process_payload, payload) |
| | try: |
| | await ws.send_text(json.dumps({'type': 'result', 'id': jid, 'result': result})) |
| | except WebSocketDisconnect: |
| | return |
| | except Exception as e: |
| | try: |
| | await ws.send_text(json.dumps({'type': 'error', 'id': jid, 'error': str(e)})) |
| | except (WebSocketDisconnect, RuntimeError): |
| | return |
| | except WebSocketDisconnect: |
| | return |
| |
|
| | def main(): |
| | image_path = getattr(core, 'IMAGE_PATH', '') |
| | lang = getattr(core, 'LANG', 'en') |
| | mode = os.environ.get('MODE', 'lens_text') |
| | ai_key = os.environ.get('AI_API_KEY', getattr(core, 'AI_API_KEY', '')) |
| | ai_model = os.environ.get('AI_MODEL', getattr(core, 'AI_MODEL', 'auto')) |
| | ai_prompt = os.environ.get('AI_PROMPT', '') |
| |
|
| | ai_cfg = AiConfig(api_key=ai_key, model=ai_model, |
| | prompt_editable=ai_prompt) if ai_key and mode == 'lens_text' else None |
| | out = process_image_path(image_path, lang, mode, ai_cfg) |
| | print(json.dumps(out, ensure_ascii=False, indent=2)) |
| |
|
| | if __name__ == '__main__': |
| | main() |
| |
|