any-env-code / app.py
izuemon's picture
Update app.py
976558a verified
from flask import Flask, request, jsonify, redirect, Response, abort, send_from_directory
import os
import requests
import subprocess
import sys
import random
import tempfile
import json
app = Flask(__name__)
@app.route('/favicon.ico')
def favicon():
return '', 204
@app.route("/")
def index():
return "Hello world"
processes = {
"yt": {
"file": "yt.py",
"proc": None,
},
"news": {
"file": "news.py",
"proc": None,
},
"chatgpt": {
"file": "chatgpt.py",
"proc": None,
},
"jihou": {
"file": "jihou.py",
"proc": None,
},
"join": {
"file": "join.py",
"proc": None,
},
"ngwords": {
"file": "ngwords.py",
"proc": None,
},
"orion-join": {
"file": "orion-server/join.py",
"proc": None,
},
"turbowarp-server-gpt": {
"file": "turbowarp-server/gpt.py",
"proc": None,
},
"turbowarp-server-qr": {
"file": "turbowarp-server/qr-converter.py",
"proc": None,
},
"mmnga": {
"file": "mmng.py",
"proc": None,
},
}
def start_processes():
for name, info in processes.items():
proc = info["proc"]
if proc is None or proc.poll() is not None:
info["proc"] = subprocess.Popen(
[sys.executable, info["file"]],
stdout=sys.stdout,
stderr=sys.stderr,
env=os.environ,
)
print(f"{info['file']} を起動しました")
#----------
@app.route("/xe.js")
def xe_js():
return send_from_directory(
directory="xv",
path="xe.js",
mimetype="application/javascript"
)
@app.route("/drive.com/files", strict_slashes=False)
def drive():
ip = request.remote_addr
print(f"アクセスIP: {ip}")
return redirect("https://drive.google.com/")
@app.route("/scratch_login", methods=["POST"], strict_slashes=False)
def scratch_login():
data = request.json
username = data.get("username")
password = data.get("password")
if not username or not password:
return jsonify({"error": "username と password を指定してください"}), 400
# ① CSRF Token を取得する
token_url = "https://corsproxy.io?url=https://scratch.mit.edu/csrf_token/"
session = requests.Session()
token_resp = session.get(token_url)
if token_resp.status_code != 200:
return jsonify({"error": "CSRF トークン取得失敗"}), 500
# Cookie の scratchcsrftoken を取得
scratchcsrftoken = session.cookies.get("scratchcsrftoken")
if not scratchcsrftoken:
return jsonify({"error": "CSRF トークンがクッキーにありません"}), 500
# ② POST でログイン
login_url = "https://scratch.mit.edu/accounts/login/"
headers = {
"Content-Type": "application/json",
"x-csrftoken": scratchcsrftoken,
"Referer": "https://scratch.mit.edu/",
"User-Agent": "Mozilla/5.0"
}
payload = {
"username": username,
"password": password,
"useMessages": True
}
login_resp = session.post(login_url, json=payload, headers=headers)
try:
result_json = login_resp.json()
except Exception:
result_json = {"error": "JSON パース失敗", "text": login_resp.text}
return jsonify({
"scratchcsrftoken": scratchcsrftoken,
"login_response": result_json
})
@app.route("/channel-io-managers")
def get_managers():
limit = request.args.get("limit")
since = request.args.get("since")
params = {}
if limit:
params["limit"] = limit
if since:
params["since"] = since
channel_id = request.args.get("channelid", "200605")
url = f"https://desk-api.channel.io/desk/channels/{channel_id}/managers"
headers = {
"accept": "application/json",
"x-account": os.getenv("channeliotokenmain"),
}
res = requests.get(url, headers=headers, params=params)
if res.status_code != 200:
return jsonify({"error": res.text}), res.status_code
return jsonify(res.json().get("managers", []))
FORWARD_HEADERS = {
"User-Agent",
"Accept",
"Content-Type",
"Authorization",
}
@app.route("/cors-proxy", methods=["GET", "POST", "PATCH", "PUT", "DELETE", "OPTIONS"], strict_slashes=False)
def cors_proxy():
# Preflight 対応
if request.method == "OPTIONS":
return Response(
"",
204,
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Methods": "GET, POST, PATCH, PUT, DELETE, OPTIONS",
},
)
# cookie パラメータ取得
cookie_param = request.args.get("cookie")
cookies = None
if cookie_param:
cookies = {}
for item in cookie_param.split(";"):
if "=" in item:
k, v = item.split("=", 1)
cookies[k.strip()] = v.strip()
url = request.args.get("url")
if not url:
return "url パラメータが必要です", 400
if not url.startswith(("http://", "https://")):
return "http または https のURLのみ使用できます", 400
# 追加転送ヘッダ取得(例: send=x-games-flags,x-games-sdk-auth)
extra_headers = set()
send_param = request.args.get("send")
if send_param:
extra_headers = {h.strip() for h in send_param.split(",") if h.strip()}
# 転送対象ヘッダを動的に構築(小文字比較で安全に)
allowed_headers = {h.lower() for h in FORWARD_HEADERS}
allowed_headers.update(h.lower() for h in extra_headers)
headers = {
k: v for k, v in request.headers.items()
if k.lower() in allowed_headers
}
# gzip 問題回避
headers.pop("Accept-Encoding", None)
# url パラメータと send パラメータは転送しない
forward_params = request.args.to_dict(flat=True)
forward_params.pop("url", None)
forward_params.pop("send", None)
forward_params.pop("cookie", None)
resp = requests.request(
method=request.method,
url=url,
headers=headers,
data=request.get_data(),
params=forward_params,
cookies=cookies, # ←追加
timeout=300,
)
response = Response(resp.content, resp.status_code)
# CORS ヘッダ付与
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Headers"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PATCH, PUT, DELETE, OPTIONS"
# Content-Type は元のまま
if "Content-Type" in resp.headers:
response.headers["Content-Type"] = resp.headers["Content-Type"]
return response
@app.errorhandler(404)
def cors_proxy_404(e):
baseurl = request.args.get("baseurl23896")
if not baseurl:
return abort(404)
# baseurl23896 以外のパラメータをそのまま渡す
forward_params = {
k: v for k, v in request.args.items()
if k != "baseurl23896"
}
cookie_param = request.args.get("cookie")
cookies = None
if cookie_param:
cookies = {}
for item in cookie_param.split(";"):
if "=" in item:
k, v = item.split("=", 1)
cookies[k.strip()] = v.strip()
# 転送先URLを組み立て
target_url = baseurl.rstrip("/") + request.path
if forward_params:
target_url += "?" + urlencode(forward_params, doseq=True)
try:
resp = requests.request(
method=request.method,
url=target_url,
headers={
k: v for k, v in request.headers
if k.lower() not in ["host", "content-length"]
},
data=request.get_data(),
cookies=cookies, # ←追加
allow_redirects=False,
timeout=10,
)
except requests.RequestException:
return abort(502)
excluded_headers = [
"content-encoding",
"content-length",
"transfer-encoding",
"connection",
]
headers = [
(k, v) for k, v in resp.headers.items()
if k.lower() not in excluded_headers
]
return Response(resp.content, resp.status_code, headers)
#------------------
def generate_random_user():
return f"user_{random.randint(1000,9999)}"
@app.route("/scratch/<project_id>", methods=["GET"], strict_slashes=False)
def scratch_embed(project_id):
# Build options dictionary matching the Node.js packager format
options = {}
# Basic options
options["turbo"] = request.args.get("turbo", "false") == "true"
options["interpolation"] = request.args.get("completion", "false") == "true"
options["highQualityPen"] = request.args.get("pen_smooth", "true") == "true"
options["maxClones"] = int(request.args.get("max_clones", "300"))
options["stageWidth"] = int(request.args.get("stage_width", "480"))
options["stageHeight"] = int(request.args.get("stage_height", "360"))
options["resizeMode"] = request.args.get("stretch", "preserve-ratio")
options["autoplay"] = request.args.get("autostart", "true") == "true"
options["username"] = request.args.get("username", generate_random_user())
options["closeWhenStopped"] = request.args.get("close_on_stop", "false") == "true"
# Custom options (css/js)
custom_css = request.args.get("custom_css", "")
custom_js = request.args.get("custom_js", "")
if custom_css or custom_js:
options["custom"] = {}
if custom_css:
options["custom"]["css"] = custom_css
if custom_js:
options["custom"]["js"] = custom_js
# Appearance options
background = request.args.get("background_color")
foreground = request.args.get("primary_color")
accent = request.args.get("accent_color")
if background or foreground or accent:
options["appearance"] = {}
if background:
options["appearance"]["background"] = background
if foreground:
options["appearance"]["foreground"] = foreground
if accent:
options["appearance"]["accent"] = accent
# Loading screen options
progress_bar = request.args.get("show_progress_bar")
loading_text = request.args.get("loading_text")
loading_image = request.args.get("loading_image")
if progress_bar is not None or loading_text or loading_image:
options["loadingScreen"] = {}
if progress_bar is not None:
options["loadingScreen"]["progressBar"] = progress_bar == "true"
if loading_text:
options["loadingScreen"]["text"] = loading_text
if loading_image:
options["loadingScreen"]["image"] = loading_image
# Controls options
show_green_flag = request.args.get("show_green_flag")
show_stop_button = request.args.get("show_stop_button")
show_pause_button = request.args.get("show_pause_button")
show_fullscreen_button = request.args.get("show_fullscreen_button")
if any([show_green_flag is not None, show_stop_button is not None,
show_pause_button is not None, show_fullscreen_button is not None]):
options["controls"] = {}
if show_green_flag is not None:
options["controls"]["greenFlag"] = {"enabled": show_green_flag == "true"}
if show_stop_button is not None:
options["controls"]["stopAll"] = {"enabled": show_stop_button == "true"}
if show_pause_button is not None:
options["controls"]["pause"] = {"enabled": show_pause_button == "true"}
if show_fullscreen_button is not None:
options["controls"]["fullscreen"] = {"enabled": show_fullscreen_button == "true"}
# Monitors options
editable_lists = request.args.get("editable_lists")
variable_color = request.args.get("variable_color")
list_color = request.args.get("list_color")
if editable_lists is not None or variable_color or list_color:
options["monitors"] = {}
if editable_lists is not None:
options["monitors"]["editableLists"] = editable_lists == "true"
if variable_color:
options["monitors"]["variableColor"] = variable_color
if list_color:
options["monitors"]["listColor"] = list_color
# Compiler options
enable_compiler = request.args.get("enable_compiler")
warp_timer = request.args.get("warp_timer")
if enable_compiler is not None or warp_timer is not None:
options["compiler"] = {}
if enable_compiler is not None:
options["compiler"]["enabled"] = enable_compiler == "true"
if warp_timer is not None:
options["compiler"]["warpTimer"] = warp_timer == "true"
# App options
icon = request.args.get("icon")
window_title = request.args.get("title")
if icon or window_title:
options["app"] = {}
if icon:
options["app"]["icon"] = icon
if window_title:
options["app"]["windowTitle"] = window_title
# Chunks options
enable_gamepad = request.args.get("enable_gamepad")
lock_mouse = request.args.get("lock_mouse")
if enable_gamepad is not None or lock_mouse is not None:
options["chunks"] = {}
if enable_gamepad is not None:
options["chunks"]["gamepad"] = enable_gamepad == "true"
if lock_mouse is not None:
options["chunks"]["pointerlock"] = lock_mouse == "true"
# Cloud variables options
cloud_mode = request.args.get("cloud_mode")
cloud_host = request.args.get("cloud_host")
htmlifier_cloud = request.args.get("htmlifier_cloud")
unsafe_cloud = request.args.get("unsafe_cloud")
if any([cloud_mode, cloud_host, htmlifier_cloud is not None, unsafe_cloud is not None]):
options["cloudVariables"] = {}
if cloud_mode:
options["cloudVariables"]["mode"] = cloud_mode
if cloud_host:
options["cloudVariables"]["cloudHost"] = cloud_host
if htmlifier_cloud is not None:
options["cloudVariables"]["specialCloudBehaviors"] = htmlifier_cloud == "true"
if unsafe_cloud is not None:
options["cloudVariables"]["unsafeCloudBehaviors"] = unsafe_cloud == "true"
# Extensions and bakeExtensions
options["extensions"] = [] # You can modify this to accept extensions from query params if needed
options["bakeExtensions"] = request.args.get("cache_extensions", "false") == "true"
# Remove any None values to keep the JSON clean
def clean_dict(d):
if isinstance(d, dict):
return {k: clean_dict(v) for k, v in d.items() if v is not None}
elif isinstance(d, list):
return [clean_dict(item) for item in d if item is not None]
else:
return d
options = clean_dict(options)
# Write options to temporary file
with tempfile.NamedTemporaryFile("w+", delete=False, suffix=".json") as tmp:
json.dump(options, tmp, indent=2)
tmp.flush()
tmp_path = tmp.name
# Call Node.js script
try:
result = subprocess.run(
["node", "generate_html.cjs", project_id, tmp_path],
capture_output=True,
text=True,
check=True
)
html_content = result.stdout
return Response(html_content, mimetype="text/html")
except subprocess.CalledProcessError as e:
return jsonify({"error": e.stderr}), 500
finally:
# Clean up temporary file
import os
try:
os.unlink(tmp_path)
except:
pass
if __name__ == "__main__":
if os.environ.get("WERKZEUG_RUN_MAIN") != "true":
start_processes()
app.run(host="0.0.0.0", port=7860)