Spaces:
Running
Running
| from flask import Flask, request, jsonify, redirect, Response, abort, send_from_directory | |
| import os | |
| import requests | |
| import subprocess | |
| import sys | |
| import random | |
| import tempfile | |
| import json | |
| app = Flask(__name__) | |
| def favicon(): | |
| return '', 204 | |
| def index(): | |
| return "Hello world" | |
| processes = { | |
| "yt": { | |
| "file": "yt.py", | |
| "proc": None, | |
| }, | |
| "news": { | |
| "file": "news.py", | |
| "proc": None, | |
| }, | |
| "chatgpt": { | |
| "file": "chatgpt.py", | |
| "proc": None, | |
| }, | |
| "jihou": { | |
| "file": "jihou.py", | |
| "proc": None, | |
| }, | |
| "join": { | |
| "file": "join.py", | |
| "proc": None, | |
| }, | |
| "ngwords": { | |
| "file": "ngwords.py", | |
| "proc": None, | |
| }, | |
| "orion-join": { | |
| "file": "orion-server/join.py", | |
| "proc": None, | |
| }, | |
| "turbowarp-server-gpt": { | |
| "file": "turbowarp-server/gpt.py", | |
| "proc": None, | |
| }, | |
| "turbowarp-server-qr": { | |
| "file": "turbowarp-server/qr-converter.py", | |
| "proc": None, | |
| }, | |
| "mmnga": { | |
| "file": "mmng.py", | |
| "proc": None, | |
| }, | |
| } | |
| def start_processes(): | |
| for name, info in processes.items(): | |
| proc = info["proc"] | |
| if proc is None or proc.poll() is not None: | |
| info["proc"] = subprocess.Popen( | |
| [sys.executable, info["file"]], | |
| stdout=sys.stdout, | |
| stderr=sys.stderr, | |
| env=os.environ, | |
| ) | |
| print(f"{info['file']} を起動しました") | |
| #---------- | |
| def xe_js(): | |
| return send_from_directory( | |
| directory="xv", | |
| path="xe.js", | |
| mimetype="application/javascript" | |
| ) | |
| def drive(): | |
| ip = request.remote_addr | |
| print(f"アクセスIP: {ip}") | |
| return redirect("https://drive.google.com/") | |
| def scratch_login(): | |
| data = request.json | |
| username = data.get("username") | |
| password = data.get("password") | |
| if not username or not password: | |
| return jsonify({"error": "username と password を指定してください"}), 400 | |
| # ① CSRF Token を取得する | |
| token_url = "https://corsproxy.io?url=https://scratch.mit.edu/csrf_token/" | |
| session = requests.Session() | |
| token_resp = session.get(token_url) | |
| if token_resp.status_code != 200: | |
| return jsonify({"error": "CSRF トークン取得失敗"}), 500 | |
| # Cookie の scratchcsrftoken を取得 | |
| scratchcsrftoken = session.cookies.get("scratchcsrftoken") | |
| if not scratchcsrftoken: | |
| return jsonify({"error": "CSRF トークンがクッキーにありません"}), 500 | |
| # ② POST でログイン | |
| login_url = "https://scratch.mit.edu/accounts/login/" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "x-csrftoken": scratchcsrftoken, | |
| "Referer": "https://scratch.mit.edu/", | |
| "User-Agent": "Mozilla/5.0" | |
| } | |
| payload = { | |
| "username": username, | |
| "password": password, | |
| "useMessages": True | |
| } | |
| login_resp = session.post(login_url, json=payload, headers=headers) | |
| try: | |
| result_json = login_resp.json() | |
| except Exception: | |
| result_json = {"error": "JSON パース失敗", "text": login_resp.text} | |
| return jsonify({ | |
| "scratchcsrftoken": scratchcsrftoken, | |
| "login_response": result_json | |
| }) | |
| def get_managers(): | |
| limit = request.args.get("limit") | |
| since = request.args.get("since") | |
| params = {} | |
| if limit: | |
| params["limit"] = limit | |
| if since: | |
| params["since"] = since | |
| channel_id = request.args.get("channelid", "200605") | |
| url = f"https://desk-api.channel.io/desk/channels/{channel_id}/managers" | |
| headers = { | |
| "accept": "application/json", | |
| "x-account": os.getenv("channeliotokenmain"), | |
| } | |
| res = requests.get(url, headers=headers, params=params) | |
| if res.status_code != 200: | |
| return jsonify({"error": res.text}), res.status_code | |
| return jsonify(res.json().get("managers", [])) | |
| FORWARD_HEADERS = { | |
| "User-Agent", | |
| "Accept", | |
| "Content-Type", | |
| "Authorization", | |
| } | |
| def cors_proxy(): | |
| # Preflight 対応 | |
| if request.method == "OPTIONS": | |
| return Response( | |
| "", | |
| 204, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Headers": "*", | |
| "Access-Control-Allow-Methods": "GET, POST, PATCH, PUT, DELETE, OPTIONS", | |
| }, | |
| ) | |
| # cookie パラメータ取得 | |
| cookie_param = request.args.get("cookie") | |
| cookies = None | |
| if cookie_param: | |
| cookies = {} | |
| for item in cookie_param.split(";"): | |
| if "=" in item: | |
| k, v = item.split("=", 1) | |
| cookies[k.strip()] = v.strip() | |
| url = request.args.get("url") | |
| if not url: | |
| return "url パラメータが必要です", 400 | |
| if not url.startswith(("http://", "https://")): | |
| return "http または https のURLのみ使用できます", 400 | |
| # 追加転送ヘッダ取得(例: send=x-games-flags,x-games-sdk-auth) | |
| extra_headers = set() | |
| send_param = request.args.get("send") | |
| if send_param: | |
| extra_headers = {h.strip() for h in send_param.split(",") if h.strip()} | |
| # 転送対象ヘッダを動的に構築(小文字比較で安全に) | |
| allowed_headers = {h.lower() for h in FORWARD_HEADERS} | |
| allowed_headers.update(h.lower() for h in extra_headers) | |
| headers = { | |
| k: v for k, v in request.headers.items() | |
| if k.lower() in allowed_headers | |
| } | |
| # gzip 問題回避 | |
| headers.pop("Accept-Encoding", None) | |
| # url パラメータと send パラメータは転送しない | |
| forward_params = request.args.to_dict(flat=True) | |
| forward_params.pop("url", None) | |
| forward_params.pop("send", None) | |
| forward_params.pop("cookie", None) | |
| resp = requests.request( | |
| method=request.method, | |
| url=url, | |
| headers=headers, | |
| data=request.get_data(), | |
| params=forward_params, | |
| cookies=cookies, # ←追加 | |
| timeout=300, | |
| ) | |
| response = Response(resp.content, resp.status_code) | |
| # CORS ヘッダ付与 | |
| response.headers["Access-Control-Allow-Origin"] = "*" | |
| response.headers["Access-Control-Allow-Headers"] = "*" | |
| response.headers["Access-Control-Allow-Methods"] = "GET, POST, PATCH, PUT, DELETE, OPTIONS" | |
| # Content-Type は元のまま | |
| if "Content-Type" in resp.headers: | |
| response.headers["Content-Type"] = resp.headers["Content-Type"] | |
| return response | |
| def cors_proxy_404(e): | |
| baseurl = request.args.get("baseurl23896") | |
| if not baseurl: | |
| return abort(404) | |
| # baseurl23896 以外のパラメータをそのまま渡す | |
| forward_params = { | |
| k: v for k, v in request.args.items() | |
| if k != "baseurl23896" | |
| } | |
| cookie_param = request.args.get("cookie") | |
| cookies = None | |
| if cookie_param: | |
| cookies = {} | |
| for item in cookie_param.split(";"): | |
| if "=" in item: | |
| k, v = item.split("=", 1) | |
| cookies[k.strip()] = v.strip() | |
| # 転送先URLを組み立て | |
| target_url = baseurl.rstrip("/") + request.path | |
| if forward_params: | |
| target_url += "?" + urlencode(forward_params, doseq=True) | |
| try: | |
| resp = requests.request( | |
| method=request.method, | |
| url=target_url, | |
| headers={ | |
| k: v for k, v in request.headers | |
| if k.lower() not in ["host", "content-length"] | |
| }, | |
| data=request.get_data(), | |
| cookies=cookies, # ←追加 | |
| allow_redirects=False, | |
| timeout=10, | |
| ) | |
| except requests.RequestException: | |
| return abort(502) | |
| excluded_headers = [ | |
| "content-encoding", | |
| "content-length", | |
| "transfer-encoding", | |
| "connection", | |
| ] | |
| headers = [ | |
| (k, v) for k, v in resp.headers.items() | |
| if k.lower() not in excluded_headers | |
| ] | |
| return Response(resp.content, resp.status_code, headers) | |
| #------------------ | |
| def generate_random_user(): | |
| return f"user_{random.randint(1000,9999)}" | |
| def scratch_embed(project_id): | |
| # Build options dictionary matching the Node.js packager format | |
| options = {} | |
| # Basic options | |
| options["turbo"] = request.args.get("turbo", "false") == "true" | |
| options["interpolation"] = request.args.get("completion", "false") == "true" | |
| options["highQualityPen"] = request.args.get("pen_smooth", "true") == "true" | |
| options["maxClones"] = int(request.args.get("max_clones", "300")) | |
| options["stageWidth"] = int(request.args.get("stage_width", "480")) | |
| options["stageHeight"] = int(request.args.get("stage_height", "360")) | |
| options["resizeMode"] = request.args.get("stretch", "preserve-ratio") | |
| options["autoplay"] = request.args.get("autostart", "true") == "true" | |
| options["username"] = request.args.get("username", generate_random_user()) | |
| options["closeWhenStopped"] = request.args.get("close_on_stop", "false") == "true" | |
| # Custom options (css/js) | |
| custom_css = request.args.get("custom_css", "") | |
| custom_js = request.args.get("custom_js", "") | |
| if custom_css or custom_js: | |
| options["custom"] = {} | |
| if custom_css: | |
| options["custom"]["css"] = custom_css | |
| if custom_js: | |
| options["custom"]["js"] = custom_js | |
| # Appearance options | |
| background = request.args.get("background_color") | |
| foreground = request.args.get("primary_color") | |
| accent = request.args.get("accent_color") | |
| if background or foreground or accent: | |
| options["appearance"] = {} | |
| if background: | |
| options["appearance"]["background"] = background | |
| if foreground: | |
| options["appearance"]["foreground"] = foreground | |
| if accent: | |
| options["appearance"]["accent"] = accent | |
| # Loading screen options | |
| progress_bar = request.args.get("show_progress_bar") | |
| loading_text = request.args.get("loading_text") | |
| loading_image = request.args.get("loading_image") | |
| if progress_bar is not None or loading_text or loading_image: | |
| options["loadingScreen"] = {} | |
| if progress_bar is not None: | |
| options["loadingScreen"]["progressBar"] = progress_bar == "true" | |
| if loading_text: | |
| options["loadingScreen"]["text"] = loading_text | |
| if loading_image: | |
| options["loadingScreen"]["image"] = loading_image | |
| # Controls options | |
| show_green_flag = request.args.get("show_green_flag") | |
| show_stop_button = request.args.get("show_stop_button") | |
| show_pause_button = request.args.get("show_pause_button") | |
| show_fullscreen_button = request.args.get("show_fullscreen_button") | |
| if any([show_green_flag is not None, show_stop_button is not None, | |
| show_pause_button is not None, show_fullscreen_button is not None]): | |
| options["controls"] = {} | |
| if show_green_flag is not None: | |
| options["controls"]["greenFlag"] = {"enabled": show_green_flag == "true"} | |
| if show_stop_button is not None: | |
| options["controls"]["stopAll"] = {"enabled": show_stop_button == "true"} | |
| if show_pause_button is not None: | |
| options["controls"]["pause"] = {"enabled": show_pause_button == "true"} | |
| if show_fullscreen_button is not None: | |
| options["controls"]["fullscreen"] = {"enabled": show_fullscreen_button == "true"} | |
| # Monitors options | |
| editable_lists = request.args.get("editable_lists") | |
| variable_color = request.args.get("variable_color") | |
| list_color = request.args.get("list_color") | |
| if editable_lists is not None or variable_color or list_color: | |
| options["monitors"] = {} | |
| if editable_lists is not None: | |
| options["monitors"]["editableLists"] = editable_lists == "true" | |
| if variable_color: | |
| options["monitors"]["variableColor"] = variable_color | |
| if list_color: | |
| options["monitors"]["listColor"] = list_color | |
| # Compiler options | |
| enable_compiler = request.args.get("enable_compiler") | |
| warp_timer = request.args.get("warp_timer") | |
| if enable_compiler is not None or warp_timer is not None: | |
| options["compiler"] = {} | |
| if enable_compiler is not None: | |
| options["compiler"]["enabled"] = enable_compiler == "true" | |
| if warp_timer is not None: | |
| options["compiler"]["warpTimer"] = warp_timer == "true" | |
| # App options | |
| icon = request.args.get("icon") | |
| window_title = request.args.get("title") | |
| if icon or window_title: | |
| options["app"] = {} | |
| if icon: | |
| options["app"]["icon"] = icon | |
| if window_title: | |
| options["app"]["windowTitle"] = window_title | |
| # Chunks options | |
| enable_gamepad = request.args.get("enable_gamepad") | |
| lock_mouse = request.args.get("lock_mouse") | |
| if enable_gamepad is not None or lock_mouse is not None: | |
| options["chunks"] = {} | |
| if enable_gamepad is not None: | |
| options["chunks"]["gamepad"] = enable_gamepad == "true" | |
| if lock_mouse is not None: | |
| options["chunks"]["pointerlock"] = lock_mouse == "true" | |
| # Cloud variables options | |
| cloud_mode = request.args.get("cloud_mode") | |
| cloud_host = request.args.get("cloud_host") | |
| htmlifier_cloud = request.args.get("htmlifier_cloud") | |
| unsafe_cloud = request.args.get("unsafe_cloud") | |
| if any([cloud_mode, cloud_host, htmlifier_cloud is not None, unsafe_cloud is not None]): | |
| options["cloudVariables"] = {} | |
| if cloud_mode: | |
| options["cloudVariables"]["mode"] = cloud_mode | |
| if cloud_host: | |
| options["cloudVariables"]["cloudHost"] = cloud_host | |
| if htmlifier_cloud is not None: | |
| options["cloudVariables"]["specialCloudBehaviors"] = htmlifier_cloud == "true" | |
| if unsafe_cloud is not None: | |
| options["cloudVariables"]["unsafeCloudBehaviors"] = unsafe_cloud == "true" | |
| # Extensions and bakeExtensions | |
| options["extensions"] = [] # You can modify this to accept extensions from query params if needed | |
| options["bakeExtensions"] = request.args.get("cache_extensions", "false") == "true" | |
| # Remove any None values to keep the JSON clean | |
| def clean_dict(d): | |
| if isinstance(d, dict): | |
| return {k: clean_dict(v) for k, v in d.items() if v is not None} | |
| elif isinstance(d, list): | |
| return [clean_dict(item) for item in d if item is not None] | |
| else: | |
| return d | |
| options = clean_dict(options) | |
| # Write options to temporary file | |
| with tempfile.NamedTemporaryFile("w+", delete=False, suffix=".json") as tmp: | |
| json.dump(options, tmp, indent=2) | |
| tmp.flush() | |
| tmp_path = tmp.name | |
| # Call Node.js script | |
| try: | |
| result = subprocess.run( | |
| ["node", "generate_html.cjs", project_id, tmp_path], | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| html_content = result.stdout | |
| return Response(html_content, mimetype="text/html") | |
| except subprocess.CalledProcessError as e: | |
| return jsonify({"error": e.stderr}), 500 | |
| finally: | |
| # Clean up temporary file | |
| import os | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| if __name__ == "__main__": | |
| if os.environ.get("WERKZEUG_RUN_MAIN") != "true": | |
| start_processes() | |
| app.run(host="0.0.0.0", port=7860) |