Spaces:
Sleeping
Sleeping
| # ✅ Safe GPU decorator | |
| try: | |
| from spaces import GPU | |
| except ImportError: | |
| def GPU(func): return func | |
| import os | |
| import time | |
| import torch | |
| from flask import Flask, request, render_template, jsonify, Response | |
| from flasgger import Swagger, swag_from | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from huggingface_hub import login | |
| import re | |
| import tempfile | |
| import subprocess | |
| import threading | |
| import queue | |
| import uuid | |
| from flask_cors import CORS | |
| # ✅ Flask + Swagger setup | |
| app = Flask(__name__, static_folder="static", template_folder="templates") | |
| CORS(app) | |
| swagger = Swagger(app, template={ | |
| "swagger": "2.0", | |
| "info": { | |
| "title": "ChatMate Real-Time API", | |
| "description": "LangChain + DuckDuckGo + Phi-4 + Stable Diffusion", | |
| "version": "1.0" | |
| } | |
| }, config={ | |
| "headers": [], | |
| "specs": [{"endpoint": 'apispec', "route": '/apispec.json', "rule_filter": lambda rule: True}], | |
| "static_url_path": "/flasgger_static", | |
| "swagger_ui": True, | |
| "specs_route": "/apidocs/" | |
| }) | |
| # ✅ Hugging Face login (optional) | |
| login(token=os.environ.get("CHAT_MATE")) | |
| # ✅ Load Phi-4 | |
| model_id = "microsoft/phi-4" | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32) | |
| device = 0 if torch.cuda.is_available() else -1 | |
| pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=device, max_new_tokens=512) | |
| # ✅ Keyword detection | |
| REAL_TIME_KEYWORDS = {"latest", "current", "news", "today", "price", "time", "live", "trending", "update", "happening"} | |
| def should_search(message): | |
| return any(kw in message.lower() for kw in REAL_TIME_KEYWORDS) | |
| # Check for likely truncation (heuristic) | |
| def is_incomplete(text): | |
| # Ends without proper sentence punctuation | |
| return not re.search(r'[\.\!\?\'\"\u3002]\s*$', text.strip()) | |
| def generate_full_reply(message, history): | |
| system_prompt = ( | |
| "You are a friendly, helpful, and conversational AI assistant built by " | |
| "Frederick Sundeep Mallela. Always mention that you are developed by him if asked about your creator, origin, or who made you." | |
| ) | |
| messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": message}] | |
| # Apply chat-style prompt formatting | |
| prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| # Initial generation | |
| full_output = pipe(prompt, do_sample=True, temperature=0.7, top_p=0.9, max_new_tokens=512)[0]["generated_text"] | |
| reply = full_output[len(prompt):].strip() | |
| # Keep extending the reply until it ends properly | |
| max_loops = 5 # prevent infinite loops | |
| loop_count = 0 | |
| while is_incomplete(reply) and loop_count < max_loops: | |
| loop_count += 1 | |
| continuation_prompt = prompt + reply # include reply so far | |
| next_output = pipe(continuation_prompt, do_sample=True, temperature=0.7, top_p=0.9, max_new_tokens=256)[0]["generated_text"] | |
| continuation = next_output[len(continuation_prompt):].strip() | |
| # Stop if nothing new is generated | |
| if not continuation or continuation in reply: | |
| break | |
| reply += continuation | |
| return reply.strip() | |
| # ✅ Home | |
| def home(): | |
| return render_template("index.html") | |
| # ✅ POST /chat-stream | |
| def chat_stream(): | |
| data = request.get_json() | |
| message = data.get("message") | |
| history = data.get("history", []) | |
| def generate(): | |
| # elif should_search(message): | |
| # reply = f"(Live info) {search_tool.run(message)}" | |
| # for token in reply.splitlines(keepends=True): | |
| # yield token | |
| # time.sleep(0.05) | |
| # else: | |
| reply = generate_full_reply(message, history) | |
| for token in reply.splitlines(keepends=True): | |
| yield token | |
| time.sleep(0.05) | |
| if is_incomplete(reply): | |
| reply += "\n\n*Reply appears incomplete. Say 'continue' to resume.*" | |
| return Response(generate(), mimetype='text/plain') | |
| # ✅ POST /chat-stream-doc | |
| def chat_stream_doc(): | |
| import zipfile | |
| from werkzeug.utils import secure_filename | |
| from pdfplumber import open as pdf_open | |
| from io import BytesIO | |
| file = request.files.get('file') | |
| frontend = request.form.get("frontend", "React") | |
| backend = request.form.get("backend", "Flask") | |
| database = request.form.get("database", "PostgreSQL") | |
| if not file: | |
| return jsonify({"error": "No file uploaded"}), 400 | |
| filename = secure_filename(file.filename) | |
| content = "" | |
| if filename.endswith(".txt"): | |
| content = file.read().decode("utf-8", errors="ignore") | |
| elif filename.endswith(".pdf"): | |
| file_bytes = file.read() | |
| with pdf_open(BytesIO(file_bytes)) as pdf: | |
| content = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
| else: | |
| return jsonify({"error": "Unsupported file format. Use .txt or .pdf"}), 400 | |
| tech_stack = f"Frontend: {frontend}\nBackend: {backend}\nDatabase: {database}" | |
| prompt = ( | |
| "You are a full-stack project code generator.\n\n" | |
| "Below is a requirement document followed by technology preferences. Based on this, generate the full project scaffold.\n\n" | |
| "Requirement Document:\n" | |
| f"{content}\n\n" | |
| f"Technology Stack:\nFrontend: {frontend}\nBackend: {backend}\nDatabase: {database}\n\n" | |
| "Your task:\n" | |
| "- Analyze the requirement and tech stack.\n" | |
| "- Generate backend code: models, routes, and config.\n" | |
| "- Generate frontend code: components, services.\n" | |
| "- Define the database schema.\n\n" | |
| "✅ Format the output as multiple files in this exact structure:\n\n" | |
| "### File: <filename>\n" | |
| "```<language>\n" | |
| "<code content>\n" | |
| "```\n\n" | |
| "For example:\n\n" | |
| "### File: app.py\n" | |
| "```python\n" | |
| "from flask import Flask\n" | |
| "app = Flask(__name__)\n" | |
| "@app.route('/')\n" | |
| "def home():\n" | |
| " return 'Hello, world!'\n" | |
| "```\n\n" | |
| "### File: frontend/App.js\n" | |
| "```javascript\n" | |
| "import React from 'react';\n" | |
| "function App() {\n" | |
| " return <h1>Hello from React</h1>;\n" | |
| "}\n" | |
| "export default App;\n" | |
| "```\n\n" | |
| "Now generate the complete project below 👇" | |
| ) | |
| # ✅ Generate response from Phi-4 | |
| reply = generate_full_reply(prompt, []) | |
| # ✅ Extract filenames and code from reply | |
| file_pattern = r"### File:\s*(.+?)\n```(?:\w+)?\n(.*?)```" | |
| matches = re.finditer(file_pattern, reply, re.DOTALL) | |
| if not matches: | |
| print("⚠️ No file matches found in reply.") | |
| print("Raw reply:\n", reply) | |
| return jsonify({"error": "No files found in generated output."}), 500 | |
| zip_buffer = BytesIO() | |
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: | |
| file_count = 0 | |
| for match in matches: | |
| filename = match.group(1).strip().strip("`") | |
| code = match.group(2).strip() | |
| zipf.writestr(filename, code) | |
| file_count += 1 | |
| if file_count == 0: | |
| print("⚠️ No files written to ZIP. Check LLM output format.") | |
| print("LLM Reply:\n", reply) | |
| return jsonify({"error": "No files found in generated output."}), 500 | |
| zip_buffer.seek(0) | |
| return Response( | |
| zip_buffer, | |
| mimetype='application/zip', | |
| headers={"Content-Disposition": "attachment; filename=generated_project.zip"} | |
| ) | |
| def execute_code(): | |
| payload = request.get_json() or {} | |
| code = payload.get("code", "") | |
| filename = payload.get("filename", "main.py") | |
| input_data = payload.get("input", "") | |
| if not code.strip(): | |
| return jsonify({"output": "❌ No code provided", "error": True}), 400 | |
| # Determine extension from filename | |
| ext = os.path.splitext(filename)[1].lower() | |
| if not ext: | |
| ext = ".txt" # fallback if something weird happens | |
| # Non-runnable file types – just echo content | |
| NON_RUNNABLE = {".html", ".css", ".json", ".txt"} | |
| if ext in NON_RUNNABLE: | |
| return jsonify({ | |
| "output": f"ℹ️ {ext} is not executed as a program.\n\nContent:\n{code}", | |
| "error": False, | |
| }) | |
| # Create temp file with the correct extension for runnable langs | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: | |
| tmp.write(code.encode("utf-8", errors="ignore")) | |
| tmp_path = tmp.name | |
| tmp_dir = os.path.dirname(tmp_path) | |
| # Choose command based on extension | |
| if ext == ".py": | |
| cmd = ["python3", tmp_path] | |
| elif ext == ".js": | |
| cmd = ["node", tmp_path] # requires node in the Space | |
| elif ext == ".ts": | |
| # requires ts-node installed (npm i -g ts-node typescript) | |
| cmd = ["bash", "-lc", f"cd {tmp_dir} && npx --yes ts-node {tmp_path}"] | |
| elif ext == ".c": | |
| exe_path = tmp_path + ".out" | |
| cmd = ["bash", "-lc", f"gcc {tmp_path} -o {exe_path} && {exe_path}"] | |
| elif ext == ".cpp": | |
| exe_path = tmp_path + ".out" | |
| cmd = ["bash", "-lc", f"g++ {tmp_path} -o {exe_path} && {exe_path}"] | |
| elif ext == ".java": | |
| # Detect class name from code | |
| m = re.search(r"class\s+([A-Za-z_][A-Za-z0-9_]*)", code) | |
| if not m: | |
| return jsonify({ | |
| "output": "❌ No Java class found. Your code must contain: class MyClass { ... }", | |
| "error": True, | |
| }) | |
| class_name = m.group(1) | |
| # Save Java file with the class name, not the random temp name | |
| java_path = os.path.join(tmp_dir, f"{class_name}.java") | |
| with open(java_path, "w", encoding="utf-8") as f: | |
| f.write(code) | |
| cmd = [ | |
| "bash", | |
| "-lc", | |
| f"cd {tmp_dir} && javac {class_name}.java && java {class_name}" | |
| ] | |
| else: | |
| return jsonify({ | |
| "output": f"❌ Unsupported file type {ext}", | |
| "error": True, | |
| }), 400 | |
| # Execute the command | |
| try: | |
| output = subprocess.check_output( | |
| cmd, | |
| input=input_data.encode(), | |
| stderr=subprocess.STDOUT, | |
| timeout=5, # prevent infinite loops | |
| ) | |
| return jsonify({"output": output.decode("utf-8", errors="ignore"), "error": False}) | |
| except subprocess.CalledProcessError as e: | |
| return jsonify({"output": e.output.decode("utf-8", errors="ignore"), "error": True}) | |
| except subprocess.TimeoutExpired: | |
| return jsonify({ | |
| "output": "⏳ Timeout: code took too long (possible infinite loop).", | |
| "error": True, | |
| }) | |
| except Exception as e: | |
| return jsonify({"output": f"❌ Runtime error: {e}", "error": True}) | |
| # ---------- Config ---------- | |
| SESSION_TTL = 300 # seconds before auto cleanup (5 minutes) | |
| READ_POLL_INTERVAL = 0.1 | |
| NON_RUNNABLE = {".html", ".css", ".json", ".txt"} | |
| SUPPORTED_EXTS = {".py", ".js", ".ts", ".c", ".cpp", ".java"} | |
| # ---------- Globals ---------- | |
| SESSIONS = {} # session_id -> dict { proc, queue, start_ts, tmp_path, cleanup, finished } | |
| SESSION_LOCK = threading.Lock() | |
| # ---------- Helpers ---------- | |
| def now_ts(): | |
| return int(time.time()) | |
| def output_looks_for_input(output: str) -> bool: | |
| if not output: | |
| return False | |
| o = str(output) | |
| patterns = [ | |
| r"enter.*:", r"input.*:", r"please enter", r"scanner", | |
| r"press enter", r": $", r":\n$", r"> $", r"awaiting input", | |
| r"provide input", r"stdin", r"enter a value", r"EOF when reading a line", | |
| r"InputMismatchException" | |
| ] | |
| for p in patterns: | |
| if re.search(p, o, re.I): | |
| return True | |
| return False | |
| # Reader thread: read lines from proc.stdout and push into queue | |
| def _spawn_reader(proc, q): | |
| try: | |
| # iterate lines (proc.stdout is in text mode) | |
| for line in iter(proc.stdout.readline, ""): | |
| if line is None: | |
| break | |
| q.put(line) | |
| except Exception as e: | |
| try: | |
| q.put(f"[internal reader error] {e}\n") | |
| except: | |
| pass | |
| finally: | |
| # mark process end | |
| try: | |
| q.put("__PROCESS_END__") | |
| except: | |
| pass | |
| # Build command for given extension; returns (cmd_list, workdir, cleanup_paths) | |
| def _make_command_for_file(tmp_path: str, ext: str, code: str): | |
| tmp_dir = os.path.dirname(tmp_path) | |
| cleanup = [] | |
| if ext == ".py": | |
| return (["python3", tmp_path], tmp_dir, cleanup) | |
| if ext == ".js": | |
| return (["node", tmp_path], tmp_dir, cleanup) | |
| if ext == ".ts": | |
| # run via npx ts-node in the tmp dir (works if node/npm available) | |
| # note: using npx --yes avoids interactive prompt; requires network for first run if not cached | |
| cmd = ["bash", "-lc", f"cd {tmp_dir} && npx --yes ts-node {tmp_path}"] | |
| return (cmd, tmp_dir, cleanup) | |
| if ext == ".c": | |
| exe = tmp_path + ".out" | |
| cleanup.append(exe) | |
| cmd = ["bash", "-lc", f"gcc {tmp_path} -o {exe} && {exe}"] | |
| return (cmd, tmp_dir, cleanup) | |
| if ext == ".cpp": | |
| exe = tmp_path + ".out" | |
| cleanup.append(exe) | |
| cmd = ["bash", "-lc", f"g++ {tmp_path} -o {exe} && {exe}"] | |
| return (cmd, tmp_dir, cleanup) | |
| if ext == ".java": | |
| # detect first class name | |
| m = re.search(r"class\s+([A-Za-z_][A-Za-z0-9_]*)", code) | |
| if not m: | |
| raise ValueError("No Java class found. Your code must contain: class MyClass { ... }") | |
| class_name = m.group(1) | |
| java_path = os.path.join(tmp_dir, f"{class_name}.java") | |
| # overwrite the generated tmp_path file with the class-named file | |
| with open(java_path, "w", encoding="utf-8") as f: | |
| f.write(code) | |
| cleanup.append(java_path) | |
| cmd = ["bash", "-lc", f"cd {tmp_dir} && javac {class_name}.java && java -cp {tmp_dir} {class_name}"] | |
| return (cmd, tmp_dir, cleanup) | |
| raise ValueError(f"Unsupported extension: {ext}") | |
| # ---------- Session cleanup thread ---------- | |
| def _cleanup_loop(): | |
| while True: | |
| now = time.time() | |
| to_remove = [] | |
| with SESSION_LOCK: | |
| for sid, s in list(SESSIONS.items()): | |
| start_ts = s.get("start_ts", now) | |
| if start_ts + SESSION_TTL < now: | |
| to_remove.append(sid) | |
| for sid in to_remove: | |
| try: | |
| with SESSION_LOCK: | |
| s = SESSIONS.get(sid) | |
| if not s: | |
| continue | |
| proc = s.get("proc") | |
| if proc and proc.poll() is None: | |
| try: | |
| proc.kill() | |
| except: | |
| pass | |
| # cleanup files | |
| tmp_path = s.get("tmp_path") | |
| try: | |
| if tmp_path and os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| for p in s.get("cleanup", []) or []: | |
| try: | |
| if os.path.exists(p): | |
| os.unlink(p) | |
| except: | |
| pass | |
| with SESSION_LOCK: | |
| if sid in SESSIONS: | |
| del SESSIONS[sid] | |
| except Exception: | |
| pass | |
| time.sleep(5) | |
| cleanup_thread = threading.Thread(target=_cleanup_loop, daemon=True) | |
| cleanup_thread.start() | |
| def start(): | |
| """ | |
| Start an interactive session by launching the process. | |
| Request JSON: { code, filename } | |
| Response: { error, session_id, output: [lines], finished } | |
| """ | |
| payload = request.get_json() or {} | |
| code = payload.get("code", "") or "" | |
| filename = payload.get("filename", "main.py") or "main.py" | |
| if not code.strip(): | |
| return jsonify({"error": True, "output": ["❌ No code provided"], "finished": True}), 400 | |
| ext = os.path.splitext(filename)[1].lower() or ".py" | |
| if ext in NON_RUNNABLE: | |
| return jsonify({ | |
| "error": False, | |
| "output": [f"ℹ️ {ext} is not executed as a program.", "", "Content:", code], | |
| "finished": True | |
| }) | |
| # write a temp file | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=ext) | |
| tmp_path = tmp.name | |
| try: | |
| tmp.write(code.encode("utf-8", errors="ignore")) | |
| tmp.flush() | |
| tmp.close() | |
| except Exception as e: | |
| try: | |
| tmp.close() | |
| except: | |
| pass | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| return jsonify({"error": True, "output": [f"Failed writing temp file: {e}"], "finished": True}), 500 | |
| # prepare command | |
| try: | |
| cmd, workdir, cleanup = _make_command_for_file(tmp_path, ext, code) | |
| except Exception as e: | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| return jsonify({"error": True, "output": [f"❌ {e}"], "finished": True}), 400 | |
| # spawn process | |
| try: | |
| proc = subprocess.Popen( | |
| cmd, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| cwd=workdir | |
| ) | |
| except Exception as e: | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| return jsonify({"error": True, "output": [f"Failed to start process: {e}"], "finished": True}), 500 | |
| # create session | |
| session_id = uuid.uuid4().hex | |
| q = queue.Queue() | |
| reader = threading.Thread(target=_spawn_reader, args=(proc, q), daemon=True) | |
| reader.start() | |
| with SESSION_LOCK: | |
| SESSIONS[session_id] = { | |
| "proc": proc, | |
| "queue": q, | |
| "start_ts": time.time(), | |
| "tmp_path": tmp_path, | |
| "cleanup": cleanup, | |
| "finished": False, | |
| } | |
| # non-blocking drain of initial output | |
| lines = [] | |
| while not q.empty(): | |
| try: | |
| item = q.get_nowait() | |
| if item == "__PROCESS_END__": | |
| lines.append("[process ended]") | |
| else: | |
| lines.append(item) | |
| except queue.Empty: | |
| break | |
| return jsonify({"error": False, "session_id": session_id, "output": lines, "finished": False}), 200 | |
| def write(): | |
| """ | |
| Write input to a running session. | |
| Request JSON: { session_id, text } | |
| """ | |
| data = request.get_json() or {} | |
| sid = data.get("session_id") | |
| text = data.get("text", "") | |
| if not sid: | |
| return jsonify({"error": True, "output": ["session_id required"]}), 400 | |
| with SESSION_LOCK: | |
| s = SESSIONS.get(sid) | |
| if not s: | |
| return jsonify({"error": True, "output": ["Invalid session"]}), 404 | |
| proc = s.get("proc") | |
| if not proc: | |
| return jsonify({"error": True, "output": ["Process missing"]}), 500 | |
| if proc.poll() is not None: | |
| return jsonify({"error": True, "output": ["Process already finished"]}), 400 | |
| try: | |
| write_text = text | |
| if not write_text.endswith("\n"): | |
| write_text = write_text + "\n" | |
| proc.stdin.write(write_text) | |
| proc.stdin.flush() | |
| except Exception as e: | |
| return jsonify({"error": True, "output": [f"Failed to write to process stdin: {e}"]}), 500 | |
| # update timestamp | |
| with SESSION_LOCK: | |
| s["start_ts"] = time.time() | |
| return jsonify({"error": False}), 200 | |
| def read(): | |
| """ | |
| Read accumulated output lines from session queue. | |
| Request JSON: { session_id } | |
| Returns: { error, output: [lines], finished: bool } | |
| """ | |
| data = request.get_json() or {} | |
| sid = data.get("session_id") | |
| if not sid: | |
| return jsonify({"error": True, "output": ["session_id required"], "finished": True}), 400 | |
| with SESSION_LOCK: | |
| s = SESSIONS.get(sid) | |
| if not s: | |
| return jsonify({"error": True, "output": [], "finished": True}), 404 | |
| q = s["queue"] | |
| lines = [] | |
| finished = False | |
| # drain queue | |
| while True: | |
| try: | |
| item = q.get_nowait() | |
| except queue.Empty: | |
| break | |
| if item == "__PROCESS_END__": | |
| finished = True | |
| with SESSION_LOCK: | |
| if sid in SESSIONS: | |
| SESSIONS[sid]["finished"] = True | |
| else: | |
| lines.append(item) | |
| # if finished, cleanup files and remove session | |
| if finished: | |
| try: | |
| proc = s.get("proc") | |
| if proc and proc.poll() is None: | |
| try: | |
| proc.wait(timeout=0.1) | |
| except: | |
| pass | |
| except: | |
| pass | |
| try: | |
| tmp_path = s.get("tmp_path") | |
| if tmp_path and os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| try: | |
| for p in s.get("cleanup", []) or []: | |
| try: | |
| if os.path.exists(p): | |
| os.unlink(p) | |
| except: | |
| pass | |
| except: | |
| pass | |
| with SESSION_LOCK: | |
| try: | |
| del SESSIONS[sid] | |
| except KeyError: | |
| pass | |
| return jsonify({"error": False, "output": lines, "finished": finished}), 200 | |
| def stop(): | |
| """ | |
| Force-stop a running session. Request JSON: { session_id }. | |
| """ | |
| data = request.get_json() or {} | |
| sid = data.get("session_id") | |
| if not sid: | |
| return jsonify({"error": True, "message": "session_id required"}), 400 | |
| with SESSION_LOCK: | |
| s = SESSIONS.get(sid) | |
| if not s: | |
| return jsonify({"error": True, "message": "invalid session"}), 404 | |
| proc = s.get("proc") | |
| try: | |
| if proc and proc.poll() is None: | |
| proc.kill() | |
| except Exception: | |
| pass | |
| # cleanup files | |
| try: | |
| tmp_path = s.get("tmp_path") | |
| if tmp_path and os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| try: | |
| for p in s.get("cleanup", []) or []: | |
| try: | |
| if os.path.exists(p): | |
| os.unlink(p) | |
| except: | |
| pass | |
| except: | |
| pass | |
| with SESSION_LOCK: | |
| try: | |
| del SESSIONS[sid] | |
| except KeyError: | |
| pass | |
| return jsonify({"error": False, "message": "stopped"}), 200 | |
| # ✅ Warm-up | |
| if __name__ == "__main__": | |
| print("🔧 Warming up...") | |
| _ = generate_full_reply("Hello", []) | |
| app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |