import asyncio, base64, copy, hashlib, io, json, os, re, tempfile, time, uuid, httpx from backend import lens_core as core from collections import OrderedDict from threading import Lock from dataclasses import dataclass from typing import Any, Dict, List, Optional from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware SERVER_MAX_WORKERS = int(os.environ.get('SERVER_MAX_WORKERS', '15')) JOB_TTL_SEC = int(os.environ.get('JOB_TTL_SEC', '3600')) HTTP_TIMEOUT_SEC = float(os.environ.get('HTTP_TIMEOUT_SEC', str(getattr(core, 'AI_TIMEOUT_SEC', 120)))) SUPPORTED_MODES = {"lens_images", "lens_text"} BUILD_ID = os.environ.get('TP_BUILD_ID', 'v9-backendfix-20260129') TP_DEBUG = str(os.environ.get('TP_DEBUG', '')).strip().lower() in ('1', 'true', 'yes', 'on') TP_PARA_MARKER_PREFIX = '< None: if not TP_DEBUG: return try: if data is None: print(f'[TextPhantom][dbg] {tag}') else: s = json.dumps(data, ensure_ascii=False) if len(s) > 2000: s = s[:2000] + '…' print(f'[TextPhantom][dbg] {tag} {s}') except Exception: try: print(f'[TextPhantom][dbg] {tag} {data}') except Exception: pass def _tree_stats(tree) -> dict: if not isinstance(tree, dict): return {'paras': 0, 'items': 0, 'spans': 0} paras = tree.get('paragraphs') or [] if not isinstance(paras, list): return {'paras': 0, 'items': 0, 'spans': 0} items = 0 spans = 0 for p in paras: if not isinstance(p, dict): continue its = p.get('items') or [] if not isinstance(its, list): continue items += len(its) for it in its: if not isinstance(it, dict): continue sp = it.get('spans') or [] if isinstance(sp, list): spans += len(sp) return {'paras': len(paras), 'items': items, 'spans': spans} def _tree_to_paragraph_texts(tree: Any) -> List[str]: if not isinstance(tree, dict): return [] paras = tree.get('paragraphs') or [] if not isinstance(paras, list) or not paras: return [] out: List[str] = [] for p in paras: if not isinstance(p, dict): out.append('') continue t = str(p.get('text') or '').strip() if not t: items = p.get('items') or [] if isinstance(items, list) and items: t = ' '.join(str(it.get('text') or '').strip() for it in items if isinstance( it, dict) and str(it.get('text') or '').strip()) out.append(t) return out def _apply_para_markers(paras: List[str]) -> str: if not paras: return '' parts: List[str] = [] for i, t in enumerate(paras): parts.append( f"{TP_PARA_MARKER_PREFIX}{i}{TP_PARA_MARKER_SUFFIX}\n{(t or '').strip()}") return '\n\n'.join(parts) def _clamp_runaway_repeats(s: str, max_repeat: int = 12) -> str: if not s: return '' pat = re.compile(r"(.)\1{" + str(max_repeat) + r",}") return pat.sub(lambda m: m.group(1) * max_repeat, s) def _extract_marker_indices(s: str) -> set[int]: if not s: return set() out: set[int] = set() for m in re.finditer(r"<>", s): try: out.add(int(m.group(1))) except Exception: continue return out def _needs_ai_retry(ai_text_full: str, expected_paras: int) -> bool: if expected_paras <= 0: return False idx = _extract_marker_indices(ai_text_full) if len(idx) >= expected_paras: return False if (TP_PARA_MARKER_PREFIX in (ai_text_full or '')) and (TP_PARA_MARKER_SUFFIX not in (ai_text_full or '')): return True return True def _now() -> float: return time.time() def _lru_get(cache: OrderedDict, lock: Lock, key: str) -> Optional[Dict[str, Any]]: if not key: return None with lock: v = cache.get(key) if v is None: return None cache.move_to_end(key) return copy.deepcopy(v) def _lru_set(cache: OrderedDict, lock: Lock, key: str, value: Dict[str, Any], max_items: int) -> None: if not key or not isinstance(value, dict) or max_items <= 0: return with lock: cache[key] = copy.deepcopy(value) cache.move_to_end(key) while len(cache) > max_items: cache.popitem(last=False) def _sha256_hex(blob: bytes) -> str: return hashlib.sha256(blob).hexdigest() if blob else '' def _ai_prompt_sig(s: str) -> str: t = (s or '').strip() if not t: return '' return hashlib.sha256(t.encode('utf-8')).hexdigest()[:12] def _build_cache_key(img_hash: str, lang: str, mode: str, source: str, ai_cfg: Optional["AiConfig"]) -> str: parts = [img_hash, _normalize_lang(lang), (mode or '').strip(), (source or '').strip()] if ai_cfg and (source or '').strip().lower() == 'ai': parts.extend([ (ai_cfg.provider or '').strip(), (ai_cfg.model or '').strip(), (ai_cfg.base_url or '').strip(), _ai_prompt_sig(ai_cfg.prompt_editable), ]) return '|'.join([p for p in parts if p is not None]) def _b64_to_bytes(b64: str) -> bytes: pad = '=' * ((4 - (len(b64) % 4)) % 4) return base64.b64decode(b64 + pad) def _datauri_to_bytes(data_uri: str) -> tuple[bytes, str]: s = (data_uri or '').strip() if not s.startswith('data:'): return b'', '' head, _, b64 = s.partition(',') mime = '' if ';' in head: mime = head[5:head.index(';')] return _b64_to_bytes(b64), mime or 'application/octet-stream' def _bytes_to_datauri(blob: bytes, mime: str) -> str: b64 = base64.b64encode(blob).decode('ascii') return f"data:{mime};base64,{b64}" def _download_bytes(url: str) -> tuple[bytes, str]: u = (url or '').strip() if not u: return b'', '' with httpx.Client(timeout=HTTP_TIMEOUT_SEC, follow_redirects=True) as client: r = client.get(u) r.raise_for_status() ct = (r.headers.get('content-type') or '').split(';')[0].strip() return r.content, ct def _detect_provider_from_key(api_key: str) -> str: return core._canonical_provider(core._detect_ai_provider_from_key(api_key)) def _resolve_provider_defaults(provider: str) -> dict: return (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get(provider, {}) def _resolve_model(provider: str, model: str) -> str: return core._resolve_model(provider, model) def _normalize_lang(lang: str) -> str: return core._normalize_lang(lang) @dataclass class AiConfig: api_key: str model: str = 'auto' provider: str = 'auto' base_url: str = 'auto' prompt_editable: str = '' def _collapse_ws(text: str) -> str: return re.sub(r"\s+", " ", str(text or "")).strip() def _sanitize_marked_text(marked_text: str) -> str: t = str(marked_text or "") if not t: return "" indices = _extract_marker_indices(t) if not indices: return _collapse_ws(t) out_lines: List[str] = [] for idx in indices: marker = f"<>" m = re.search( rf"{re.escape(marker)}\s*([\s\S]*?)(?=<>|\Z)", t) seg = m.group(1) if m else "" seg = _collapse_ws(seg) out_lines.append(marker) out_lines.append(seg) out_lines.append("") return "\n".join(out_lines).strip("\n") def _build_ai_prompt_packet_custom(target_lang: str, original_text_full: str, prompt_editable: str, is_retry: bool = False) -> tuple[str, List[str]]: lang = _normalize_lang(target_lang) style_prompt = (prompt_editable or "").strip() if not style_prompt: style_prompt = (getattr(core, "ai_prompt_user_default", lambda _l: "")(lang) or "").strip() input_json = json.dumps( {"target_lang": lang, "stylePrompt": style_prompt, "originalTextFull": str(original_text_full or "")}, ensure_ascii=False, ) system_parts: List[str] = [ "SYSTEM: You translate manga dialogue.", "Task: Translate originalTextFull into target_lang. Apply stylePrompt.", "Markers: Keep every paragraph marker like <> unchanged and in order. Do not remove or add markers.", "Output: Return ONLY JSON (no markdown, no extra text).", "OUTPUT_JSON schema: {\"aiTextFull\":\"...\"}", "aiTextFull must include all the same markers, each followed by that paragraph's translated text.", "Keep text concise for speech bubbles. Avoid long repeated characters (max 12).", ] if is_retry: system_parts.append( "Retry: Your previous output may have been truncated. You MUST output ALL markers from the first to the last marker in the input." ) system_text = "\n".join([p for p in system_parts if p]) user_text = ( "INPUT_JSON (json):\n```json\n" + input_json + "\n```\n\nOUTPUT_JSON (json):\n```json\n{\"aiTextFull\":\"...\"}\n```" ) return system_text, [user_text] def ai_translate_text(original_text_full: str, target_lang: str, ai: AiConfig, is_retry: bool = False) -> dict: api_key = (ai.api_key or '').strip() if not api_key: raise Exception('AI api_key is required') provider = core._canonical_provider((ai.provider or 'auto')) if provider in ('', 'auto'): provider = _detect_provider_from_key(api_key) preset = _resolve_provider_defaults(provider) or {} model = _resolve_model(provider, (ai.model or 'auto')) base_url = (ai.base_url or 'auto').strip() if base_url in ('', 'auto'): base_url = (preset.get('base_url') or '').strip() if provider not in ('gemini', 'anthropic'): if not base_url: base_url = (_resolve_provider_defaults('openai') or {}).get( 'base_url') or 'https://api.openai.com/v1' system_text, user_parts = _build_ai_prompt_packet_custom( target_lang, original_text_full, ai.prompt_editable, is_retry=is_retry) started = _now() used_model = model if provider == 'gemini': raw = core._gemini_generate_json( api_key, model, system_text, user_parts) elif provider == 'anthropic': raw = core._anthropic_generate_json( api_key, model, system_text, user_parts) else: raw, used_model = core._openai_compat_generate_json( api_key, base_url, model, system_text, user_parts) ai_text_full = core._parse_ai_textfull_only( raw) if core.DO_AI_JSON else core._parse_ai_textfull_text_only(raw) ai_text_full = _sanitize_marked_text(ai_text_full) return { 'aiTextFull': ai_text_full, 'meta': { 'model': used_model, 'provider': provider, 'base_url': base_url, 'latency_sec': round(_now() - started, 3), }, } def process_image_path(image_path: str, lang: str, mode: str, ai_cfg: Optional[AiConfig]) -> dict: mode_id = (mode or '').strip() if mode_id not in SUPPORTED_MODES: mode_id = 'lens_images' target_lang = _normalize_lang(lang) data = core.get_lens_data_from_image( image_path, getattr(core, 'FIREBASE_URL', ''), target_lang) img = core.Image.open(image_path).convert('RGB') W, H = img.size thai_font = getattr(core, 'FONT_THAI_PATH', 'NotoSansThai-Regular.ttf') latin_font = getattr(core, 'FONT_LATIN_PATH', 'NotoSans-Regular.ttf') if target_lang == 'ja': latin_font = getattr(core, 'FONT_JA_PATH', latin_font) elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): latin_font = getattr(core, 'FONT_ZH_SC_PATH', latin_font) elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): latin_font = getattr(core, 'FONT_ZH_TC_PATH', latin_font) if getattr(core, 'FONT_DOWNLOD', True): thai_font = core.ensure_font( thai_font, getattr(core, 'FONT_THAI_URLS', [])) if target_lang == 'ja': latin_font = core.ensure_font( latin_font, getattr(core, 'FONT_JA_URLS', [])) elif target_lang in ('zh', 'zh-hans', 'zh_cn', 'zh-cn', 'zh_hans'): latin_font = core.ensure_font( latin_font, getattr(core, 'FONT_ZH_SC_URLS', [])) elif target_lang in ('zh-hant', 'zh_tw', 'zh-tw', 'zh_hant'): latin_font = core.ensure_font( latin_font, getattr(core, 'FONT_ZH_TC_URLS', [])) else: latin_font = core.ensure_font( latin_font, getattr(core, 'FONT_LATIN_URLS', [])) image_url = data.get('imageUrl') if isinstance(data, dict) else None out: Dict[str, Any] = { 'mode': mode_id, 'imageUrl': image_url, 'imageDataUri': '', 'originalContentLanguage': data.get('originalContentLanguage') if isinstance(data, dict) else None, 'originalTextFull': data.get('originalTextFull') if isinstance(data, dict) else None, 'translatedTextFull': data.get('translatedTextFull') if isinstance(data, dict) else None, 'AiTextFull': '', 'originalParagraphs': (data.get('originalParagraphs') or []) if isinstance(data, dict) else [], 'translatedParagraphs': (data.get('translatedParagraphs') or []) if isinstance(data, dict) else [], 'original': {}, 'translated': {}, 'Ai': {}, } if mode_id == 'lens_images': if image_url: decoded = core.decode_imageurl_to_datauri(str(image_url)) if decoded: out['imageDataUri'] = decoded elif isinstance(image_url, str) and image_url.startswith(('http://', 'https://')): blob, mime2 = _download_bytes(image_url) out['imageDataUri'] = _bytes_to_datauri( blob, mime2 or 'image/jpeg') if not out.get('imageDataUri'): with open(image_path, 'rb') as f: blob = f.read() out['imageDataUri'] = _bytes_to_datauri(blob, 'image/jpeg') return out original_span_tokens = None original_tree = None translated_tree = None def _base_img_for_overlay() -> core.Image.Image: if not (getattr(core, 'ERASE_OLD_TEXT_WITH_ORIGINAL_BOXES', True) and original_span_tokens): return img return core.erase_text_with_boxes( img, original_span_tokens, pad_px=getattr(core, 'ERASE_PADDING_PX', 2), sample_margin_px=getattr(core, 'ERASE_SAMPLE_MARGIN_PX', 6), ) if getattr(core, 'DO_ORIGINAL', True): tree, _ = core.decode_tree( out.get('originalParagraphs') or [], out.get('originalTextFull') or '', 'original', W, H, want_raw=False, ) original_tree = tree original_span_tokens = core.flatten_tree_spans(tree) _dbg('tree.original', _tree_stats(original_tree)) out['original'] = { 'originalTree': tree, 'originalTextFull': out.get('originalTextFull') or '', } if getattr(core, 'DO_TRANSLATED', True): tree, _ = core.decode_tree( out.get('translatedParagraphs') or [], out.get('translatedTextFull') or '', 'translated', W, H, want_raw=False, ) translated_tree = tree translated_span_tokens = core.flatten_tree_spans(tree) _dbg('tree.translated', _tree_stats(translated_tree)) out['translated'] = { 'translatedTree': tree, 'translatedTextFull': out.get('translatedTextFull') or '', } def _tree_score(tree: Any) -> int: if not isinstance(tree, dict): return -1 paragraphs = tree.get('paragraphs') or [] if not isinstance(paragraphs, list) or not paragraphs: return -1 para_count = len(paragraphs) item_count = 0 span_count = 0 for p in paragraphs: if not isinstance(p, dict): continue items = p.get('items') or [] if not isinstance(items, list): continue item_count += len(items) for it in items: if not isinstance(it, dict): continue spans = it.get('spans') or [] if isinstance(spans, list): span_count += len(spans) return item_count * 10000 + para_count * 100 + span_count def _pick_ai_template_tree() -> Optional[Dict[str, Any]]: tr_score = _tree_score(translated_tree) og_score = _tree_score(original_tree) if tr_score < 0 and og_score < 0: return None if og_score > tr_score: return original_tree return translated_tree or original_tree ai_tree = None if ai_cfg and (ai_cfg.api_key or '').strip() and getattr(core, 'DO_AI', True): src_paras = _tree_to_paragraph_texts(original_tree or {}) src_text = _apply_para_markers(src_paras) if src_paras else str( out.get('originalTextFull') or '') ai = ai_translate_text(src_text, target_lang, ai_cfg) if src_paras and _needs_ai_retry(str(ai.get('aiTextFull') or ''), len(src_paras)): _dbg('ai.retry', { 'expected_paras': len(src_paras), 'found_markers': len(_extract_marker_indices(str(ai.get('aiTextFull') or ''))), }) retry_paras = [_clamp_runaway_repeats(p) for p in src_paras] retry_text = _apply_para_markers(retry_paras) or src_text ai = ai_translate_text( retry_text, target_lang, ai_cfg, is_retry=True) template_tree = _pick_ai_template_tree() _dbg('ai.template.pick', { 'score_original': _tree_score(original_tree), 'score_translated': _tree_score(translated_tree), 'picked': 'original' if template_tree is original_tree else ('translated' if template_tree is translated_tree else 'none'), }) if not isinstance(template_tree, dict): template_tree = original_tree if isinstance(original_tree, dict) else ( translated_tree if isinstance(translated_tree, dict) else {}) patched = core.patch( {'Ai': {'aiTextFull': str( ai.get('aiTextFull') or ''), 'aiTree': template_tree}}, W, H, thai_font or '', latin_font or '', lang=target_lang, ) ai_tree = (patched.get('Ai') or {}).get('aiTree') or {} _dbg('ai.patched', { 'ai_text_len': len(str(ai.get('aiTextFull') or '')), 'stats_ai': _tree_stats(ai_tree), 'stats_original': _tree_stats(original_tree or {}), 'stats_translated': _tree_stats(translated_tree or {}), 'mode': mode_id, 'lang': target_lang, }) shared_para_sizes = core._compute_shared_para_sizes( [original_tree or {}, translated_tree or {}, ai_tree or {}], thai_font or '', latin_font or '', W, H, ) core._apply_para_font_size(original_tree or {}, shared_para_sizes) core._apply_para_font_size(translated_tree or {}, shared_para_sizes) core._apply_para_font_size(ai_tree or {}, shared_para_sizes) core._rebuild_ai_spans_after_font_resize( ai_tree or {}, W, H, thai_font or '', latin_font or '', lang=target_lang) out['AiTextFull'] = str(ai.get('aiTextFull') or '') out['Ai'] = { 'aiTextFull': str(ai.get('aiTextFull') or ''), 'aiTree': ai_tree, 'meta': ai.get('meta') or {}, } if getattr(core, 'DO_AI_HTML', True): core.fit_tree_font_sizes_for_tp_html( ai_tree, thai_font or '', latin_font or '', W, H) out['Ai']['aihtml'] = core.ai_tree_to_tp_html(ai_tree, W, H) out['Ai']['aihtmlMeta'] = { 'baseW': int(W), 'baseH': int(H), 'format': 'tp', } if getattr(core, 'DO_ORIGINAL', True) and getattr(core, 'DO_ORIGINAL_HTML', True) and isinstance(original_tree, dict): core.fit_tree_font_sizes_for_tp_html( original_tree, thai_font or '', latin_font or '', W, H) if isinstance(out.get('original'), dict): out['original']['originalhtml'] = core.ai_tree_to_tp_html( original_tree or {}, W, H) if getattr(core, 'DO_TRANSLATED', True) and getattr(core, 'DO_TRANSLATED_HTML', True) and isinstance(translated_tree, dict): core.fit_tree_font_sizes_for_tp_html( translated_tree, thai_font or '', latin_font or '', W, H) if isinstance(out.get('translated'), dict): out['translated']['translatedhtml'] = core.ai_tree_to_tp_html( translated_tree or {}, W, H) if getattr(core, 'HTML_INCLUDE_CSS', True) and (getattr(core, 'DO_ORIGINAL_HTML', True) or getattr(core, 'DO_TRANSLATED_HTML', True) or getattr(core, 'DO_AI_HTML', True)): out['htmlCss'] = core.tp_overlay_css() out['htmlMeta'] = { 'baseW': int(W), 'baseH': int(H), 'format': 'tp', } base_img = _base_img_for_overlay() buf = io.BytesIO() base_img.save(buf, format='PNG') out['imageDataUri'] = _bytes_to_datauri(buf.getvalue(), 'image/png') return out app = FastAPI(title='TextPhantom OCR API', version='1.0') app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) async def _cleanup_jobs_loop(): while True: await asyncio.sleep(60) cutoff = _now() - JOB_TTL_SEC dead = [jid for jid, j in _jobs.items() if float( j.get('ts', 0)) < cutoff] for jid in dead: _jobs.pop(jid, None) async def _worker_loop(worker_id: int): while True: jid, payload = await _job_queue.get() try: _jobs[jid] = {'status': 'running', 'ts': _now()} result = await asyncio.to_thread(_process_payload, payload) _jobs[jid] = {'status': 'done', 'result': result, 'ts': _now()} except Exception as e: _jobs[jid] = {'status': 'error', 'result': str(e), 'ts': _now()} finally: _job_queue.task_done() def _process_payload(payload: dict) -> dict: t_all = time.perf_counter() mode = (payload.get('mode') or 'lens_images') lang = (payload.get('lang') or 'en') src = (payload.get('src') or '').strip() img_bytes = b'' mime = '' if payload.get('imageDataUri'): img_bytes, mime = _datauri_to_bytes(payload.get('imageDataUri')) elif src.startswith('data:'): img_bytes, mime = _datauri_to_bytes(src) else: img_bytes, mime = _download_bytes(src) t_img = time.perf_counter() if not img_bytes: raise Exception('No image data') ai_cfg = None ai = payload.get('ai') or None source = str(payload.get('source') or '').strip().lower() or 'translated' if mode == 'lens_text' and source == 'ai' and isinstance(ai, dict): api_key = str(ai.get('api_key') or '').strip() or ( os.getenv('AI_API_KEY') or '').strip() ai_cfg = AiConfig( api_key=api_key, model=str(ai.get('model') or 'auto').strip() or 'auto', provider=str(ai.get('provider') or 'auto').strip() or 'auto', base_url=str(ai.get('base_url') or 'auto').strip() or 'auto', prompt_editable=str(ai.get('prompt') or '').strip(), ) core.DO_AI_JSON = False img_hash = _sha256_hex(img_bytes) cache_key = '' if mode == 'lens_text' and img_hash: cache_key = _build_cache_key(img_hash, lang, mode, source, ai_cfg) cached = None if source == 'ai': cached = _lru_get(_ai_result_cache, _ai_cache_lock, cache_key) else: cached = _lru_get(_result_cache, _result_cache_lock, cache_key) if cached: cached['perf'] = { 'cache': 'hit', 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), 'img_ms': round((t_img - t_all) * 1000, 1), } return cached suffix = '.png' if (mime or '').endswith('png') else '.jpg' with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f: f.write(img_bytes) tmp_path = f.name t_tmp = time.perf_counter() try: out = process_image_path(tmp_path, lang, mode, ai_cfg) out['perf'] = { 'cache': 'miss' if cache_key else 'off', 'total_ms': round((time.perf_counter() - t_all) * 1000, 1), 'img_ms': round((t_img - t_all) * 1000, 1), 'tmp_ms': round((t_tmp - t_img) * 1000, 1), } if cache_key and isinstance(out, dict): if source == 'ai': _lru_set(_ai_result_cache, _ai_cache_lock, cache_key, out, TP_AI_RESULT_CACHE_MAX) else: _lru_set(_result_cache, _result_cache_lock, cache_key, out, TP_RESULT_CACHE_MAX) return out finally: try: os.unlink(tmp_path) except Exception: pass @app.on_event('startup') async def _startup(): print( f'[TextPhantom][api] starting build={BUILD_ID} workers={SERVER_MAX_WORKERS}') for i in range(max(1, SERVER_MAX_WORKERS)): asyncio.create_task(_worker_loop(i)) asyncio.create_task(_cleanup_jobs_loop()) @app.get('/health') async def health(): return {'ok': True, 'build': BUILD_ID} @app.get('/version') async def version(): return {'ok': True, 'build': BUILD_ID, 'core': 'lens_core'} @app.get('/warmup') async def warmup(lang: str = TP_WARMUP_LANG): t0 = time.perf_counter() r = core.warmup(lang) return {'ok': True, 'build': BUILD_ID, 'dt_ms': round((time.perf_counter() - t0) * 1000, 1), 'result': r} @app.get('/meta') async def meta(): langs = getattr(core, 'UI_LANGUAGES', None) or [] sources = [ {'id': 'original', 'name': 'Original'}, {'id': 'translated', 'name': 'Translated'}, {'id': 'ai', 'name': 'Ai'}, ] env_key = (os.getenv('AI_API_KEY') or '').strip() return {'ok': True, 'languages': langs, 'sources': sources, 'has_env_ai_key': bool(env_key)} @app.post('/translate') async def translate(payload: Dict[str, Any]): jid = str(uuid.uuid4()) _jobs[jid] = {'status': 'queued', 'ts': _now()} await _job_queue.put((jid, payload)) return {'id': jid} @app.get('/translate/{job_id}') async def translate_status(job_id: str): j = _jobs.get(job_id) if not j: return {'status': 'error', 'result': 'job_not_found'} return j @app.post('/ai/resolve') async def ai_resolve(payload: Dict[str, Any]): api_key = str(payload.get('api_key') or '').strip() or ( os.getenv('AI_API_KEY') or '').strip() lang = _normalize_lang(str(payload.get('lang') or 'en')) if not api_key: return { 'ok': False, 'error': 'missing_api_key', 'provider': '', 'default_model': '', 'models': [], 'lang': lang, 'prompt_editable_default': (getattr(core, 'ai_prompt_user_default', lambda _l: '')(lang) or '').strip(), } provider = core._canonical_provider(str(payload.get('provider') or 'auto')) if provider in ('', 'auto'): provider = _detect_provider_from_key(api_key) preset = _resolve_provider_defaults(provider) or {} requested_model = str(payload.get('model') or 'auto').strip() or 'auto' resolved_model = _resolve_model(provider, requested_model) models: List[str] = [] base_url = (str(payload.get('base_url') or 'auto')).strip() if base_url in ('', 'auto'): base_url = (preset.get('base_url') or '').strip() if provider == 'huggingface': if base_url: models = core._hf_router_available_models(api_key, base_url) if requested_model.lower() in ('', 'auto'): fallback = core._pick_hf_fallback_model(models) if fallback: resolved_model = fallback elif provider == 'gemini': models = getattr(core, '_gemini_available_models', lambda _k: [])(api_key) if not models: models = ['gemini-2.5-flash', 'gemini-2.5-flash-lite', 'gemini-2.5-pro', 'gemini-2.0-flash', 'gemini-3-flash-preview', 'gemini-3-pro-preview'] elif provider == 'anthropic': models = getattr(core, '_anthropic_available_models', lambda _k, _b=None: [])(api_key, base_url) else: if not base_url: base_url = (core.AI_PROVIDER_DEFAULTS.get('openai') or {}).get( 'base_url') or 'https://api.openai.com/v1' models = getattr(core, '_openai_compat_available_models', lambda _k, _b: [])(api_key, base_url) if provider == 'huggingface' and not models: models = [ 'google/gemma-3-27b-it:featherless-a', ] if not models: fallback_models: List[str] = [] preset_model = str(preset.get('model') or '').strip() if preset_model: fallback_models.append(preset_model) provider_defaults = (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).get( provider, {}) or {} provider_model = str(provider_defaults.get('model') or '').strip() if provider_model: fallback_models.append(provider_model) if provider == 'gemini': fallback_models.extend([ 'gemini-2.5-flash', 'gemini-2.5-flash-lite', 'gemini-2.5-pro', 'gemini-2.0-flash', 'gemini-3-flash-preview', 'gemini-3-pro-preview', ]) models = sorted(set([m for m in fallback_models if m]), key=str.lower) if not models: all_models: List[str] = [] for _, v in (getattr(core, 'AI_PROVIDER_DEFAULTS', {}) or {}).items(): m2 = str((v or {}).get('model') or '').strip() if m2: all_models.append(m2) models = sorted(set(all_models), key=str.lower) if models: models = sorted( {m.strip() for m in models if isinstance(m, str) and m.strip()}, key=str.lower, ) if models and requested_model.lower() in ('', 'auto') and resolved_model not in models: resolved_model = models[0] defaults = core._remote_defaults() prompt_default = (getattr(core, 'ai_prompt_user_default', lambda _l, _d=None: '')(lang, defaults) or '').strip() return { 'ok': True, 'provider': provider, 'base_url': base_url, 'default_model': (preset.get('model') or ''), 'model': resolved_model, 'models': models, 'prompt_editable_default': prompt_default, } @app.get('/ai/prompt/default') async def ai_prompt_default(lang: str = 'en'): l = _normalize_lang(lang) defaults = core._remote_defaults() styles = core.ai_lang_style_map(defaults) return { 'ok': True, 'lang': l, 'prompt_editable_default': (getattr(core, 'ai_prompt_user_default', lambda _l, _d=None: '')(l, defaults) or '').strip(), 'lang_style': styles.get(l) or styles.get('default') or '', 'system_base': core.ai_prompt_system_base(defaults).strip(), 'contract': core._active_ai_contract(), 'data_template': core._active_ai_data_template(), } @app.websocket('/ws') async def ws_endpoint(ws: WebSocket): await ws.accept() await ws.send_text(json.dumps({'type': 'ack'})) try: while True: msg = await ws.receive_text() data = json.loads(msg) if data.get('type') != 'job': continue jid = str(data.get('id') or '') payload = data.get('payload') or {} try: result = await asyncio.to_thread(_process_payload, payload) try: await ws.send_text(json.dumps({'type': 'result', 'id': jid, 'result': result})) except WebSocketDisconnect: return except Exception as e: try: await ws.send_text(json.dumps({'type': 'error', 'id': jid, 'error': str(e)})) except (WebSocketDisconnect, RuntimeError): return except WebSocketDisconnect: return def main(): image_path = getattr(core, 'IMAGE_PATH', '') lang = getattr(core, 'LANG', 'en') mode = os.environ.get('MODE', 'lens_text') ai_key = os.environ.get('AI_API_KEY', getattr(core, 'AI_API_KEY', '')) ai_model = os.environ.get('AI_MODEL', getattr(core, 'AI_MODEL', 'auto')) ai_prompt = os.environ.get('AI_PROMPT', '') ai_cfg = AiConfig(api_key=ai_key, model=ai_model, prompt_editable=ai_prompt) if ai_key and mode == 'lens_text' else None out = process_image_path(image_path, lang, mode, ai_cfg) print(json.dumps(out, ensure_ascii=False, indent=2)) if __name__ == '__main__': main()