from flask import Flask, request, jsonify, redirect, Response, abort, send_from_directory import os import requests import subprocess import sys import random import tempfile import json app = Flask(__name__) @app.route('/favicon.ico') def favicon(): return '', 204 @app.route("/") def index(): return "Hello world" processes = { "yt": { "file": "yt.py", "proc": None, }, "news": { "file": "news.py", "proc": None, }, "chatgpt": { "file": "chatgpt.py", "proc": None, }, "jihou": { "file": "jihou.py", "proc": None, }, "join": { "file": "join.py", "proc": None, }, "ngwords": { "file": "ngwords.py", "proc": None, }, "orion-join": { "file": "orion-server/join.py", "proc": None, }, "turbowarp-server-gpt": { "file": "turbowarp-server/gpt.py", "proc": None, }, "turbowarp-server-qr": { "file": "turbowarp-server/qr-converter.py", "proc": None, }, "mmnga": { "file": "mmng.py", "proc": None, }, } def start_processes(): for name, info in processes.items(): proc = info["proc"] if proc is None or proc.poll() is not None: info["proc"] = subprocess.Popen( [sys.executable, info["file"]], stdout=sys.stdout, stderr=sys.stderr, env=os.environ, ) print(f"{info['file']} を起動しました") #---------- @app.route("/xe.js") def xe_js(): return send_from_directory( directory="xv", path="xe.js", mimetype="application/javascript" ) @app.route("/drive.com/files", strict_slashes=False) def drive(): ip = request.remote_addr print(f"アクセスIP: {ip}") return redirect("https://drive.google.com/") @app.route("/scratch_login", methods=["POST"], strict_slashes=False) def scratch_login(): data = request.json username = data.get("username") password = data.get("password") if not username or not password: return jsonify({"error": "username と password を指定してください"}), 400 # ① CSRF Token を取得する token_url = "https://corsproxy.io?url=https://scratch.mit.edu/csrf_token/" session = requests.Session() token_resp = session.get(token_url) if token_resp.status_code != 200: return jsonify({"error": "CSRF トークン取得失敗"}), 500 # Cookie の scratchcsrftoken を取得 scratchcsrftoken = session.cookies.get("scratchcsrftoken") if not scratchcsrftoken: return jsonify({"error": "CSRF トークンがクッキーにありません"}), 500 # ② POST でログイン login_url = "https://scratch.mit.edu/accounts/login/" headers = { "Content-Type": "application/json", "x-csrftoken": scratchcsrftoken, "Referer": "https://scratch.mit.edu/", "User-Agent": "Mozilla/5.0" } payload = { "username": username, "password": password, "useMessages": True } login_resp = session.post(login_url, json=payload, headers=headers) try: result_json = login_resp.json() except Exception: result_json = {"error": "JSON パース失敗", "text": login_resp.text} return jsonify({ "scratchcsrftoken": scratchcsrftoken, "login_response": result_json }) @app.route("/channel-io-managers") def get_managers(): limit = request.args.get("limit") since = request.args.get("since") params = {} if limit: params["limit"] = limit if since: params["since"] = since channel_id = request.args.get("channelid", "200605") url = f"https://desk-api.channel.io/desk/channels/{channel_id}/managers" headers = { "accept": "application/json", "x-account": os.getenv("channeliotokenmain"), } res = requests.get(url, headers=headers, params=params) if res.status_code != 200: return jsonify({"error": res.text}), res.status_code return jsonify(res.json().get("managers", [])) FORWARD_HEADERS = { "User-Agent", "Accept", "Content-Type", "Authorization", } @app.route("/cors-proxy", methods=["GET", "POST", "PATCH", "PUT", "DELETE", "OPTIONS"], strict_slashes=False) def cors_proxy(): # Preflight 対応 if request.method == "OPTIONS": return Response( "", 204, headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", "Access-Control-Allow-Methods": "GET, POST, PATCH, PUT, DELETE, OPTIONS", }, ) # cookie パラメータ取得 cookie_param = request.args.get("cookie") cookies = None if cookie_param: cookies = {} for item in cookie_param.split(";"): if "=" in item: k, v = item.split("=", 1) cookies[k.strip()] = v.strip() url = request.args.get("url") if not url: return "url パラメータが必要です", 400 if not url.startswith(("http://", "https://")): return "http または https のURLのみ使用できます", 400 # 追加転送ヘッダ取得(例: send=x-games-flags,x-games-sdk-auth) extra_headers = set() send_param = request.args.get("send") if send_param: extra_headers = {h.strip() for h in send_param.split(",") if h.strip()} # 転送対象ヘッダを動的に構築(小文字比較で安全に) allowed_headers = {h.lower() for h in FORWARD_HEADERS} allowed_headers.update(h.lower() for h in extra_headers) headers = { k: v for k, v in request.headers.items() if k.lower() in allowed_headers } # gzip 問題回避 headers.pop("Accept-Encoding", None) # url パラメータと send パラメータは転送しない forward_params = request.args.to_dict(flat=True) forward_params.pop("url", None) forward_params.pop("send", None) forward_params.pop("cookie", None) resp = requests.request( method=request.method, url=url, headers=headers, data=request.get_data(), params=forward_params, cookies=cookies, # ←追加 timeout=300, ) response = Response(resp.content, resp.status_code) # CORS ヘッダ付与 response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Headers"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, PATCH, PUT, DELETE, OPTIONS" # Content-Type は元のまま if "Content-Type" in resp.headers: response.headers["Content-Type"] = resp.headers["Content-Type"] return response @app.errorhandler(404) def cors_proxy_404(e): baseurl = request.args.get("baseurl23896") if not baseurl: return abort(404) # baseurl23896 以外のパラメータをそのまま渡す forward_params = { k: v for k, v in request.args.items() if k != "baseurl23896" } cookie_param = request.args.get("cookie") cookies = None if cookie_param: cookies = {} for item in cookie_param.split(";"): if "=" in item: k, v = item.split("=", 1) cookies[k.strip()] = v.strip() # 転送先URLを組み立て target_url = baseurl.rstrip("/") + request.path if forward_params: target_url += "?" + urlencode(forward_params, doseq=True) try: resp = requests.request( method=request.method, url=target_url, headers={ k: v for k, v in request.headers if k.lower() not in ["host", "content-length"] }, data=request.get_data(), cookies=cookies, # ←追加 allow_redirects=False, timeout=10, ) except requests.RequestException: return abort(502) excluded_headers = [ "content-encoding", "content-length", "transfer-encoding", "connection", ] headers = [ (k, v) for k, v in resp.headers.items() if k.lower() not in excluded_headers ] return Response(resp.content, resp.status_code, headers) #------------------ def generate_random_user(): return f"user_{random.randint(1000,9999)}" @app.route("/scratch/", methods=["GET"], strict_slashes=False) def scratch_embed(project_id): # Build options dictionary matching the Node.js packager format options = {} # Basic options options["turbo"] = request.args.get("turbo", "false") == "true" options["interpolation"] = request.args.get("completion", "false") == "true" options["highQualityPen"] = request.args.get("pen_smooth", "true") == "true" options["maxClones"] = int(request.args.get("max_clones", "300")) options["stageWidth"] = int(request.args.get("stage_width", "480")) options["stageHeight"] = int(request.args.get("stage_height", "360")) options["resizeMode"] = request.args.get("stretch", "preserve-ratio") options["autoplay"] = request.args.get("autostart", "true") == "true" options["username"] = request.args.get("username", generate_random_user()) options["closeWhenStopped"] = request.args.get("close_on_stop", "false") == "true" # Custom options (css/js) custom_css = request.args.get("custom_css", "") custom_js = request.args.get("custom_js", "") if custom_css or custom_js: options["custom"] = {} if custom_css: options["custom"]["css"] = custom_css if custom_js: options["custom"]["js"] = custom_js # Appearance options background = request.args.get("background_color") foreground = request.args.get("primary_color") accent = request.args.get("accent_color") if background or foreground or accent: options["appearance"] = {} if background: options["appearance"]["background"] = background if foreground: options["appearance"]["foreground"] = foreground if accent: options["appearance"]["accent"] = accent # Loading screen options progress_bar = request.args.get("show_progress_bar") loading_text = request.args.get("loading_text") loading_image = request.args.get("loading_image") if progress_bar is not None or loading_text or loading_image: options["loadingScreen"] = {} if progress_bar is not None: options["loadingScreen"]["progressBar"] = progress_bar == "true" if loading_text: options["loadingScreen"]["text"] = loading_text if loading_image: options["loadingScreen"]["image"] = loading_image # Controls options show_green_flag = request.args.get("show_green_flag") show_stop_button = request.args.get("show_stop_button") show_pause_button = request.args.get("show_pause_button") show_fullscreen_button = request.args.get("show_fullscreen_button") if any([show_green_flag is not None, show_stop_button is not None, show_pause_button is not None, show_fullscreen_button is not None]): options["controls"] = {} if show_green_flag is not None: options["controls"]["greenFlag"] = {"enabled": show_green_flag == "true"} if show_stop_button is not None: options["controls"]["stopAll"] = {"enabled": show_stop_button == "true"} if show_pause_button is not None: options["controls"]["pause"] = {"enabled": show_pause_button == "true"} if show_fullscreen_button is not None: options["controls"]["fullscreen"] = {"enabled": show_fullscreen_button == "true"} # Monitors options editable_lists = request.args.get("editable_lists") variable_color = request.args.get("variable_color") list_color = request.args.get("list_color") if editable_lists is not None or variable_color or list_color: options["monitors"] = {} if editable_lists is not None: options["monitors"]["editableLists"] = editable_lists == "true" if variable_color: options["monitors"]["variableColor"] = variable_color if list_color: options["monitors"]["listColor"] = list_color # Compiler options enable_compiler = request.args.get("enable_compiler") warp_timer = request.args.get("warp_timer") if enable_compiler is not None or warp_timer is not None: options["compiler"] = {} if enable_compiler is not None: options["compiler"]["enabled"] = enable_compiler == "true" if warp_timer is not None: options["compiler"]["warpTimer"] = warp_timer == "true" # App options icon = request.args.get("icon") window_title = request.args.get("title") if icon or window_title: options["app"] = {} if icon: options["app"]["icon"] = icon if window_title: options["app"]["windowTitle"] = window_title # Chunks options enable_gamepad = request.args.get("enable_gamepad") lock_mouse = request.args.get("lock_mouse") if enable_gamepad is not None or lock_mouse is not None: options["chunks"] = {} if enable_gamepad is not None: options["chunks"]["gamepad"] = enable_gamepad == "true" if lock_mouse is not None: options["chunks"]["pointerlock"] = lock_mouse == "true" # Cloud variables options cloud_mode = request.args.get("cloud_mode") cloud_host = request.args.get("cloud_host") htmlifier_cloud = request.args.get("htmlifier_cloud") unsafe_cloud = request.args.get("unsafe_cloud") if any([cloud_mode, cloud_host, htmlifier_cloud is not None, unsafe_cloud is not None]): options["cloudVariables"] = {} if cloud_mode: options["cloudVariables"]["mode"] = cloud_mode if cloud_host: options["cloudVariables"]["cloudHost"] = cloud_host if htmlifier_cloud is not None: options["cloudVariables"]["specialCloudBehaviors"] = htmlifier_cloud == "true" if unsafe_cloud is not None: options["cloudVariables"]["unsafeCloudBehaviors"] = unsafe_cloud == "true" # Extensions and bakeExtensions options["extensions"] = [] # You can modify this to accept extensions from query params if needed options["bakeExtensions"] = request.args.get("cache_extensions", "false") == "true" # Remove any None values to keep the JSON clean def clean_dict(d): if isinstance(d, dict): return {k: clean_dict(v) for k, v in d.items() if v is not None} elif isinstance(d, list): return [clean_dict(item) for item in d if item is not None] else: return d options = clean_dict(options) # Write options to temporary file with tempfile.NamedTemporaryFile("w+", delete=False, suffix=".json") as tmp: json.dump(options, tmp, indent=2) tmp.flush() tmp_path = tmp.name # Call Node.js script try: result = subprocess.run( ["node", "generate_html.cjs", project_id, tmp_path], capture_output=True, text=True, check=True ) html_content = result.stdout return Response(html_content, mimetype="text/html") except subprocess.CalledProcessError as e: return jsonify({"error": e.stderr}), 500 finally: # Clean up temporary file import os try: os.unlink(tmp_path) except: pass if __name__ == "__main__": if os.environ.get("WERKZEUG_RUN_MAIN") != "true": start_processes() app.run(host="0.0.0.0", port=7860)