AIChatMateDev / app.py
FrederickSundeep's picture
Update app.py
09b9f7c verified
# ✅ Safe GPU decorator
try:
from spaces import GPU
except ImportError:
def GPU(func): return func
import os
import time
import torch
from flask import Flask, request, render_template, jsonify, Response
from flasgger import Swagger, swag_from
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from huggingface_hub import login
import re
import tempfile
import subprocess
import threading
import queue
import uuid
from flask_cors import CORS
# ✅ Flask + Swagger setup
app = Flask(__name__, static_folder="static", template_folder="templates")
CORS(app)
swagger = Swagger(app, template={
"swagger": "2.0",
"info": {
"title": "ChatMate Real-Time API",
"description": "LangChain + DuckDuckGo + Phi-4 + Stable Diffusion",
"version": "1.0"
}
}, config={
"headers": [],
"specs": [{"endpoint": 'apispec', "route": '/apispec.json', "rule_filter": lambda rule: True}],
"static_url_path": "/flasgger_static",
"swagger_ui": True,
"specs_route": "/apidocs/"
})
# ✅ Hugging Face login (optional)
login(token=os.environ.get("CHAT_MATE"))
# ✅ Load Phi-4
model_id = "microsoft/phi-4"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32)
device = 0 if torch.cuda.is_available() else -1
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=device, max_new_tokens=512)
# ✅ Keyword detection
REAL_TIME_KEYWORDS = {"latest", "current", "news", "today", "price", "time", "live", "trending", "update", "happening"}
def should_search(message):
return any(kw in message.lower() for kw in REAL_TIME_KEYWORDS)
# Check for likely truncation (heuristic)
def is_incomplete(text):
# Ends without proper sentence punctuation
return not re.search(r'[\.\!\?\'\"\u3002]\s*$', text.strip())
@GPU
def generate_full_reply(message, history):
system_prompt = (
"You are a friendly, helpful, and conversational AI assistant built by "
"Frederick Sundeep Mallela. Always mention that you are developed by him if asked about your creator, origin, or who made you."
)
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": message}]
# Apply chat-style prompt formatting
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
# Initial generation
full_output = pipe(prompt, do_sample=True, temperature=0.7, top_p=0.9, max_new_tokens=512)[0]["generated_text"]
reply = full_output[len(prompt):].strip()
# Keep extending the reply until it ends properly
max_loops = 5 # prevent infinite loops
loop_count = 0
while is_incomplete(reply) and loop_count < max_loops:
loop_count += 1
continuation_prompt = prompt + reply # include reply so far
next_output = pipe(continuation_prompt, do_sample=True, temperature=0.7, top_p=0.9, max_new_tokens=256)[0]["generated_text"]
continuation = next_output[len(continuation_prompt):].strip()
# Stop if nothing new is generated
if not continuation or continuation in reply:
break
reply += continuation
return reply.strip()
# ✅ Home
@app.route("/")
def home():
return render_template("index.html")
# ✅ POST /chat-stream
@app.route("/chat-stream", methods=["POST"])
@swag_from({
'tags': ['Chat'],
'consumes': ['application/json'],
'summary': 'Stream assistant reply or image',
'description': 'Send a message and history, receive either a streamed text reply or base64-encoded image.',
'parameters': [{
'name': 'body',
'in': 'body',
'required': True,
'schema': {
'type': 'object',
'properties': {
'message': {'type': 'string', 'example': 'Draw a futuristic city.'},
'history': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'role': {'type': 'string', 'example': 'user'},
'content': {'type': 'string', 'example': 'Show me a dragon.'}
}
}
}
},
'required': ['message']
}
}],
'responses': {
200: {
'description': 'Streamed reply or image base64',
'content': {'text/plain': {}}
}
}
})
def chat_stream():
data = request.get_json()
message = data.get("message")
history = data.get("history", [])
def generate():
# elif should_search(message):
# reply = f"(Live info) {search_tool.run(message)}"
# for token in reply.splitlines(keepends=True):
# yield token
# time.sleep(0.05)
# else:
reply = generate_full_reply(message, history)
for token in reply.splitlines(keepends=True):
yield token
time.sleep(0.05)
if is_incomplete(reply):
reply += "\n\n*Reply appears incomplete. Say 'continue' to resume.*"
return Response(generate(), mimetype='text/plain')
# ✅ POST /chat-stream-doc
@app.route("/chat-stream-doc", methods=["POST"])
@swag_from({
'tags': ['Chat'],
'consumes': ['multipart/form-data'],
'summary': 'Upload requirement doc & generate downloadable project code (ZIP)',
'description': 'Upload a PDF/TXT requirement document with stack preferences, and receive the scaffolded project code as a downloadable .zip file.',
'parameters': [
{
'name': 'file',
'in': 'formData',
'type': 'file',
'required': True,
'description': 'Requirement document (PDF or TXT)'
},
{
'name': 'frontend',
'in': 'formData',
'type': 'string',
'required': True,
'description': 'Frontend tech (React, Angular, etc.)'
},
{
'name': 'backend',
'in': 'formData',
'type': 'string',
'required': True,
'description': 'Backend tech (Flask, Node.js, etc.)'
},
{
'name': 'database',
'in': 'formData',
'type': 'string',
'required': True,
'description': 'Database (MongoDB, PostgreSQL, etc.)'
}
],
'responses': {
200: {
'description': 'ZIP file of generated code',
'content': {'application/zip': {}}
}
}
})
def chat_stream_doc():
import zipfile
from werkzeug.utils import secure_filename
from pdfplumber import open as pdf_open
from io import BytesIO
file = request.files.get('file')
frontend = request.form.get("frontend", "React")
backend = request.form.get("backend", "Flask")
database = request.form.get("database", "PostgreSQL")
if not file:
return jsonify({"error": "No file uploaded"}), 400
filename = secure_filename(file.filename)
content = ""
if filename.endswith(".txt"):
content = file.read().decode("utf-8", errors="ignore")
elif filename.endswith(".pdf"):
file_bytes = file.read()
with pdf_open(BytesIO(file_bytes)) as pdf:
content = "\n".join(page.extract_text() or "" for page in pdf.pages)
else:
return jsonify({"error": "Unsupported file format. Use .txt or .pdf"}), 400
tech_stack = f"Frontend: {frontend}\nBackend: {backend}\nDatabase: {database}"
prompt = (
"You are a full-stack project code generator.\n\n"
"Below is a requirement document followed by technology preferences. Based on this, generate the full project scaffold.\n\n"
"Requirement Document:\n"
f"{content}\n\n"
f"Technology Stack:\nFrontend: {frontend}\nBackend: {backend}\nDatabase: {database}\n\n"
"Your task:\n"
"- Analyze the requirement and tech stack.\n"
"- Generate backend code: models, routes, and config.\n"
"- Generate frontend code: components, services.\n"
"- Define the database schema.\n\n"
"✅ Format the output as multiple files in this exact structure:\n\n"
"### File: <filename>\n"
"```<language>\n"
"<code content>\n"
"```\n\n"
"For example:\n\n"
"### File: app.py\n"
"```python\n"
"from flask import Flask\n"
"app = Flask(__name__)\n"
"@app.route('/')\n"
"def home():\n"
" return 'Hello, world!'\n"
"```\n\n"
"### File: frontend/App.js\n"
"```javascript\n"
"import React from 'react';\n"
"function App() {\n"
" return <h1>Hello from React</h1>;\n"
"}\n"
"export default App;\n"
"```\n\n"
"Now generate the complete project below 👇"
)
# ✅ Generate response from Phi-4
reply = generate_full_reply(prompt, [])
# ✅ Extract filenames and code from reply
file_pattern = r"### File:\s*(.+?)\n```(?:\w+)?\n(.*?)```"
matches = re.finditer(file_pattern, reply, re.DOTALL)
if not matches:
print("⚠️ No file matches found in reply.")
print("Raw reply:\n", reply)
return jsonify({"error": "No files found in generated output."}), 500
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
file_count = 0
for match in matches:
filename = match.group(1).strip().strip("`")
code = match.group(2).strip()
zipf.writestr(filename, code)
file_count += 1
if file_count == 0:
print("⚠️ No files written to ZIP. Check LLM output format.")
print("LLM Reply:\n", reply)
return jsonify({"error": "No files found in generated output."}), 500
zip_buffer.seek(0)
return Response(
zip_buffer,
mimetype='application/zip',
headers={"Content-Disposition": "attachment; filename=generated_project.zip"}
)
@app.route("/execute", methods=["POST"])
def execute_code():
payload = request.get_json() or {}
code = payload.get("code", "")
filename = payload.get("filename", "main.py")
input_data = payload.get("input", "")
if not code.strip():
return jsonify({"output": "❌ No code provided", "error": True}), 400
# Determine extension from filename
ext = os.path.splitext(filename)[1].lower()
if not ext:
ext = ".txt" # fallback if something weird happens
# Non-runnable file types – just echo content
NON_RUNNABLE = {".html", ".css", ".json", ".txt"}
if ext in NON_RUNNABLE:
return jsonify({
"output": f"ℹ️ {ext} is not executed as a program.\n\nContent:\n{code}",
"error": False,
})
# Create temp file with the correct extension for runnable langs
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp:
tmp.write(code.encode("utf-8", errors="ignore"))
tmp_path = tmp.name
tmp_dir = os.path.dirname(tmp_path)
# Choose command based on extension
if ext == ".py":
cmd = ["python3", tmp_path]
elif ext == ".js":
cmd = ["node", tmp_path] # requires node in the Space
elif ext == ".ts":
# requires ts-node installed (npm i -g ts-node typescript)
cmd = ["bash", "-lc", f"cd {tmp_dir} && npx --yes ts-node {tmp_path}"]
elif ext == ".c":
exe_path = tmp_path + ".out"
cmd = ["bash", "-lc", f"gcc {tmp_path} -o {exe_path} && {exe_path}"]
elif ext == ".cpp":
exe_path = tmp_path + ".out"
cmd = ["bash", "-lc", f"g++ {tmp_path} -o {exe_path} && {exe_path}"]
elif ext == ".java":
# Detect class name from code
m = re.search(r"class\s+([A-Za-z_][A-Za-z0-9_]*)", code)
if not m:
return jsonify({
"output": "❌ No Java class found. Your code must contain: class MyClass { ... }",
"error": True,
})
class_name = m.group(1)
# Save Java file with the class name, not the random temp name
java_path = os.path.join(tmp_dir, f"{class_name}.java")
with open(java_path, "w", encoding="utf-8") as f:
f.write(code)
cmd = [
"bash",
"-lc",
f"cd {tmp_dir} && javac {class_name}.java && java {class_name}"
]
else:
return jsonify({
"output": f"❌ Unsupported file type {ext}",
"error": True,
}), 400
# Execute the command
try:
output = subprocess.check_output(
cmd,
input=input_data.encode(),
stderr=subprocess.STDOUT,
timeout=5, # prevent infinite loops
)
return jsonify({"output": output.decode("utf-8", errors="ignore"), "error": False})
except subprocess.CalledProcessError as e:
return jsonify({"output": e.output.decode("utf-8", errors="ignore"), "error": True})
except subprocess.TimeoutExpired:
return jsonify({
"output": "⏳ Timeout: code took too long (possible infinite loop).",
"error": True,
})
except Exception as e:
return jsonify({"output": f"❌ Runtime error: {e}", "error": True})
# ---------- Config ----------
SESSION_TTL = 300 # seconds before auto cleanup (5 minutes)
READ_POLL_INTERVAL = 0.1
NON_RUNNABLE = {".html", ".css", ".json", ".txt"}
SUPPORTED_EXTS = {".py", ".js", ".ts", ".c", ".cpp", ".java"}
# ---------- Globals ----------
SESSIONS = {} # session_id -> dict { proc, queue, start_ts, tmp_path, cleanup, finished }
SESSION_LOCK = threading.Lock()
# ---------- Helpers ----------
def now_ts():
return int(time.time())
def output_looks_for_input(output: str) -> bool:
if not output:
return False
o = str(output)
patterns = [
r"enter.*:", r"input.*:", r"please enter", r"scanner",
r"press enter", r": $", r":\n$", r"> $", r"awaiting input",
r"provide input", r"stdin", r"enter a value", r"EOF when reading a line",
r"InputMismatchException"
]
for p in patterns:
if re.search(p, o, re.I):
return True
return False
# Reader thread: read lines from proc.stdout and push into queue
def _spawn_reader(proc, q):
try:
# iterate lines (proc.stdout is in text mode)
for line in iter(proc.stdout.readline, ""):
if line is None:
break
q.put(line)
except Exception as e:
try:
q.put(f"[internal reader error] {e}\n")
except:
pass
finally:
# mark process end
try:
q.put("__PROCESS_END__")
except:
pass
# Build command for given extension; returns (cmd_list, workdir, cleanup_paths)
def _make_command_for_file(tmp_path: str, ext: str, code: str):
tmp_dir = os.path.dirname(tmp_path)
cleanup = []
if ext == ".py":
return (["python3", tmp_path], tmp_dir, cleanup)
if ext == ".js":
return (["node", tmp_path], tmp_dir, cleanup)
if ext == ".ts":
# run via npx ts-node in the tmp dir (works if node/npm available)
# note: using npx --yes avoids interactive prompt; requires network for first run if not cached
cmd = ["bash", "-lc", f"cd {tmp_dir} && npx --yes ts-node {tmp_path}"]
return (cmd, tmp_dir, cleanup)
if ext == ".c":
exe = tmp_path + ".out"
cleanup.append(exe)
cmd = ["bash", "-lc", f"gcc {tmp_path} -o {exe} && {exe}"]
return (cmd, tmp_dir, cleanup)
if ext == ".cpp":
exe = tmp_path + ".out"
cleanup.append(exe)
cmd = ["bash", "-lc", f"g++ {tmp_path} -o {exe} && {exe}"]
return (cmd, tmp_dir, cleanup)
if ext == ".java":
# detect first class name
m = re.search(r"class\s+([A-Za-z_][A-Za-z0-9_]*)", code)
if not m:
raise ValueError("No Java class found. Your code must contain: class MyClass { ... }")
class_name = m.group(1)
java_path = os.path.join(tmp_dir, f"{class_name}.java")
# overwrite the generated tmp_path file with the class-named file
with open(java_path, "w", encoding="utf-8") as f:
f.write(code)
cleanup.append(java_path)
cmd = ["bash", "-lc", f"cd {tmp_dir} && javac {class_name}.java && java -cp {tmp_dir} {class_name}"]
return (cmd, tmp_dir, cleanup)
raise ValueError(f"Unsupported extension: {ext}")
# ---------- Session cleanup thread ----------
def _cleanup_loop():
while True:
now = time.time()
to_remove = []
with SESSION_LOCK:
for sid, s in list(SESSIONS.items()):
start_ts = s.get("start_ts", now)
if start_ts + SESSION_TTL < now:
to_remove.append(sid)
for sid in to_remove:
try:
with SESSION_LOCK:
s = SESSIONS.get(sid)
if not s:
continue
proc = s.get("proc")
if proc and proc.poll() is None:
try:
proc.kill()
except:
pass
# cleanup files
tmp_path = s.get("tmp_path")
try:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
except:
pass
for p in s.get("cleanup", []) or []:
try:
if os.path.exists(p):
os.unlink(p)
except:
pass
with SESSION_LOCK:
if sid in SESSIONS:
del SESSIONS[sid]
except Exception:
pass
time.sleep(5)
cleanup_thread = threading.Thread(target=_cleanup_loop, daemon=True)
cleanup_thread.start()
@app.route("/start", methods=["POST"])
def start():
"""
Start an interactive session by launching the process.
Request JSON: { code, filename }
Response: { error, session_id, output: [lines], finished }
"""
payload = request.get_json() or {}
code = payload.get("code", "") or ""
filename = payload.get("filename", "main.py") or "main.py"
if not code.strip():
return jsonify({"error": True, "output": ["❌ No code provided"], "finished": True}), 400
ext = os.path.splitext(filename)[1].lower() or ".py"
if ext in NON_RUNNABLE:
return jsonify({
"error": False,
"output": [f"ℹ️ {ext} is not executed as a program.", "", "Content:", code],
"finished": True
})
# write a temp file
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
tmp_path = tmp.name
try:
tmp.write(code.encode("utf-8", errors="ignore"))
tmp.flush()
tmp.close()
except Exception as e:
try:
tmp.close()
except:
pass
try:
os.unlink(tmp_path)
except:
pass
return jsonify({"error": True, "output": [f"Failed writing temp file: {e}"], "finished": True}), 500
# prepare command
try:
cmd, workdir, cleanup = _make_command_for_file(tmp_path, ext, code)
except Exception as e:
try:
os.unlink(tmp_path)
except:
pass
return jsonify({"error": True, "output": [f"❌ {e}"], "finished": True}), 400
# spawn process
try:
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
cwd=workdir
)
except Exception as e:
try:
os.unlink(tmp_path)
except:
pass
return jsonify({"error": True, "output": [f"Failed to start process: {e}"], "finished": True}), 500
# create session
session_id = uuid.uuid4().hex
q = queue.Queue()
reader = threading.Thread(target=_spawn_reader, args=(proc, q), daemon=True)
reader.start()
with SESSION_LOCK:
SESSIONS[session_id] = {
"proc": proc,
"queue": q,
"start_ts": time.time(),
"tmp_path": tmp_path,
"cleanup": cleanup,
"finished": False,
}
# non-blocking drain of initial output
lines = []
while not q.empty():
try:
item = q.get_nowait()
if item == "__PROCESS_END__":
lines.append("[process ended]")
else:
lines.append(item)
except queue.Empty:
break
return jsonify({"error": False, "session_id": session_id, "output": lines, "finished": False}), 200
@app.route("/write", methods=["POST"])
def write():
"""
Write input to a running session.
Request JSON: { session_id, text }
"""
data = request.get_json() or {}
sid = data.get("session_id")
text = data.get("text", "")
if not sid:
return jsonify({"error": True, "output": ["session_id required"]}), 400
with SESSION_LOCK:
s = SESSIONS.get(sid)
if not s:
return jsonify({"error": True, "output": ["Invalid session"]}), 404
proc = s.get("proc")
if not proc:
return jsonify({"error": True, "output": ["Process missing"]}), 500
if proc.poll() is not None:
return jsonify({"error": True, "output": ["Process already finished"]}), 400
try:
write_text = text
if not write_text.endswith("\n"):
write_text = write_text + "\n"
proc.stdin.write(write_text)
proc.stdin.flush()
except Exception as e:
return jsonify({"error": True, "output": [f"Failed to write to process stdin: {e}"]}), 500
# update timestamp
with SESSION_LOCK:
s["start_ts"] = time.time()
return jsonify({"error": False}), 200
@app.route("/read", methods=["POST"])
def read():
"""
Read accumulated output lines from session queue.
Request JSON: { session_id }
Returns: { error, output: [lines], finished: bool }
"""
data = request.get_json() or {}
sid = data.get("session_id")
if not sid:
return jsonify({"error": True, "output": ["session_id required"], "finished": True}), 400
with SESSION_LOCK:
s = SESSIONS.get(sid)
if not s:
return jsonify({"error": True, "output": [], "finished": True}), 404
q = s["queue"]
lines = []
finished = False
# drain queue
while True:
try:
item = q.get_nowait()
except queue.Empty:
break
if item == "__PROCESS_END__":
finished = True
with SESSION_LOCK:
if sid in SESSIONS:
SESSIONS[sid]["finished"] = True
else:
lines.append(item)
# if finished, cleanup files and remove session
if finished:
try:
proc = s.get("proc")
if proc and proc.poll() is None:
try:
proc.wait(timeout=0.1)
except:
pass
except:
pass
try:
tmp_path = s.get("tmp_path")
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
except:
pass
try:
for p in s.get("cleanup", []) or []:
try:
if os.path.exists(p):
os.unlink(p)
except:
pass
except:
pass
with SESSION_LOCK:
try:
del SESSIONS[sid]
except KeyError:
pass
return jsonify({"error": False, "output": lines, "finished": finished}), 200
@app.route("/stop", methods=["POST"])
def stop():
"""
Force-stop a running session. Request JSON: { session_id }.
"""
data = request.get_json() or {}
sid = data.get("session_id")
if not sid:
return jsonify({"error": True, "message": "session_id required"}), 400
with SESSION_LOCK:
s = SESSIONS.get(sid)
if not s:
return jsonify({"error": True, "message": "invalid session"}), 404
proc = s.get("proc")
try:
if proc and proc.poll() is None:
proc.kill()
except Exception:
pass
# cleanup files
try:
tmp_path = s.get("tmp_path")
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
except:
pass
try:
for p in s.get("cleanup", []) or []:
try:
if os.path.exists(p):
os.unlink(p)
except:
pass
except:
pass
with SESSION_LOCK:
try:
del SESSIONS[sid]
except KeyError:
pass
return jsonify({"error": False, "message": "stopped"}), 200
# ✅ Warm-up
if __name__ == "__main__":
print("🔧 Warming up...")
_ = generate_full_reply("Hello", [])
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))