test / main_backup.py
vfven's picture
Upload 6 files
f38a870 verified
import os
import asyncio
import httpx
import json
import re
import uuid
import base64
from io import BytesIO
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime
from pathlib import Path
# python-docx imports
from docx import Document as DocxDocument
from docx.shared import Inches, Pt, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "") # free at pexels.com/api
HF_API_KEY = os.getenv("HF_TOKEN", "") # HuggingFace token for image gen
DOCS_DIR = Path("docs")
DOCS_DIR.mkdir(exist_ok=True)
PROVIDERS = {
"gemini": {
"name": "Google Gemini", "type": "gemini", "key": GOOGLE_API_KEY,
},
"openrouter": {
"name": "OpenRouter", "type": "openai_compat", "key": OPENROUTER_API_KEY,
"base_url": "https://openrouter.ai/api/v1/chat/completions",
"headers": {
"HTTP-Referer": "https://huggingface.co/spaces/vfven/mission-control-ui",
"X-Title": "Mission Control AI",
},
},
"groq": {
"name": "Groq", "type": "openai_compat", "key": GROQ_API_KEY,
"base_url": "https://api.groq.com/openai/v1/chat/completions",
"headers": {},
},
}
# ── DEFAULT AGENTS (can be extended via UI) ───────────────────────────────
DEFAULT_AGENTS = [
{
"key": "manager", "name": "Manager", "provider": "gemini",
"role": "Gerente de proyecto experto en coordinar equipos y planificar estrategias. Cuando el usuario pide un documento o informe, DEBES indicar en tu respuesta qué agentes necesitas activar usando el formato JSON: {\"delegate\": [\"writer\", \"analyst\"]} al final de tu respuesta.",
"models": ["gemini-2.5-flash-preview-04-17", "gemini-2.0-flash", "gemini-1.5-flash"],
},
{
"key": "developer", "name": "Developer", "provider": "openrouter",
"role": "Programador senior especialista en crear aplicaciones y soluciones técnicas.",
"models": ["qwen/qwen3-4b:free", "meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free", "google/gemma-3-12b-it:free"],
},
{
"key": "analyst", "name": "Analyst", "provider": "openrouter",
"role": "Analista de negocios experto en evaluar viabilidad, riesgos y oportunidades. También revisa y critica documentos formales.",
"models": ["meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free", "google/gemma-3-27b-it:free", "qwen/qwen3-4b:free"],
},
{
"key": "writer", "name": "Writer", "provider": "openrouter",
"role": "Especialista en redacción de documentos formales, informes ejecutivos y reportes técnicos. Escribe en formato estructurado con secciones claras usando ### para títulos y ** para subtítulos.",
"models": ["meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free", "qwen/qwen3-4b:free", "google/gemma-3-12b-it:free"],
},
{
"key": "image_agent", "name": "ImageAgent", "provider": "gemini",
"role": "Agente especializado en buscar y proveer imágenes relevantes. Cuando se te pida imágenes sobre un tema, responde con una lista JSON de términos de búsqueda en inglés: {\"image_queries\": [\"term1\", \"term2\", \"term3\"]}",
"models": ["gemini-2.0-flash", "gemini-1.5-flash"],
},
]
SUBSTITUTE_MODELS = {
"groq": ["llama-3.3-70b-versatile", "llama3-70b-8192", "gemma2-9b-it"],
}
# Runtime agent registry (can be extended)
agent_registry = {a["key"]: dict(a) for a in DEFAULT_AGENTS}
mission_history = []
# ── LLM CALLERS ───────────────────────────────────────────────────────────
async def call_gemini(model: str, system: str, user: str, key: str) -> str:
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={key}"
payload = {
"contents": [{"role": "user", "parts": [{"text": f"{system}\n\n{user}"}]}],
"generationConfig": {"maxOutputTokens": 2048, "temperature": 0.7},
}
async with httpx.AsyncClient(timeout=90) as client:
r = await client.post(url, json=payload)
r.raise_for_status()
return r.json()["candidates"][0]["content"]["parts"][0]["text"]
async def call_openai_compat(base_url: str, model: str, system: str, user: str,
key: str, extra_headers: dict) -> str:
headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json", **extra_headers}
payload = {
"model": model,
"messages": [{"role": "system", "content": system}, {"role": "user", "content": user}],
"max_tokens": 2048, "temperature": 0.7,
}
async with httpx.AsyncClient(timeout=90) as client:
r = await client.post(base_url, json=payload, headers=headers)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
async def call_agent_llm(agent: dict, task: str) -> str:
prov_key = agent["provider"]
provider = PROVIDERS[prov_key]
system = f"Eres {agent['name']}. {agent['role']} Responde en español. Sé conciso y profesional."
last_err = None
for model in agent["models"]:
try:
if provider["type"] == "gemini":
return await call_gemini(model, system, task, provider["key"])
else:
return await call_openai_compat(
provider["base_url"], model, system, task,
provider["key"], provider.get("headers", {}))
except Exception as e:
last_err = str(e)
# Groq fallback
if GROQ_API_KEY:
groq = PROVIDERS["groq"]
for m in SUBSTITUTE_MODELS["groq"]:
try:
return await call_openai_compat(
groq["base_url"], m, system, task, groq["key"], {})
except Exception as e:
last_err = str(e)
raise Exception(f"All providers failed: {last_err}")
# ── IMAGE FETCHING ─────────────────────────────────────────────────────────
async def fetch_pexels_image(query: str) -> bytes | None:
if not PEXELS_API_KEY:
return None
try:
async with httpx.AsyncClient(timeout=20) as client:
r = await client.get(
"https://api.pexels.com/v1/search",
params={"query": query, "per_page": 1, "orientation": "landscape"},
headers={"Authorization": PEXELS_API_KEY}
)
data = r.json()
if data.get("photos"):
img_url = data["photos"][0]["src"]["medium"]
img_r = await client.get(img_url)
return img_r.content
except Exception:
pass
return None
async def generate_hf_image(prompt: str) -> bytes | None:
if not HF_API_KEY:
return None
try:
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(
"https://api-inference.huggingface.co/models/stabilityai/stable-diffusion-xl-base-1.0",
headers={"Authorization": f"Bearer {HF_API_KEY}"},
json={"inputs": prompt, "parameters": {"width": 512, "height": 384}},
)
if r.status_code == 200 and r.headers.get("content-type", "").startswith("image"):
return r.content
except Exception:
pass
return None
async def get_image_for_query(query: str) -> bytes | None:
img = await fetch_pexels_image(query)
if img:
return img
return await generate_hf_image(query)
# ── DOCX BUILDER ──────────────────────────────────────────────────────────
def build_docx(title: str, sections: dict, images: list[bytes], analyst_review: str) -> bytes:
doc = DocxDocument()
# Page margins
for section in doc.sections:
section.top_margin = Inches(1)
section.bottom_margin = Inches(1)
section.left_margin = Inches(1.2)
section.right_margin = Inches(1.2)
# Title
title_para = doc.add_heading(title, 0)
title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = title_para.runs[0]
run.font.color.rgb = RGBColor(0x1a, 0x56, 0xdb)
# Date + subtitle
sub = doc.add_paragraph()
sub.alignment = WD_ALIGN_PARAGRAPH.CENTER
sub.add_run(f"Mission Control AI — {datetime.now().strftime('%B %d, %Y')}").italic = True
doc.add_paragraph()
# Horizontal rule
p = doc.add_paragraph()
pPr = p._p.get_or_add_pPr()
pBdr = OxmlElement("w:pBdr")
bottom = OxmlElement("w:bottom")
bottom.set(qn("w:val"), "single")
bottom.set(qn("w:sz"), "6")
bottom.set(qn("w:color"), "1a56db")
pBdr.append(bottom)
pPr.append(pBdr)
img_index = 0
# Writer content sections
writer_text = sections.get("writer", "")
current_lines = []
def flush_lines():
nonlocal current_lines
if current_lines:
para_text = " ".join(current_lines).strip()
if para_text:
p = doc.add_paragraph(para_text)
p.paragraph_format.space_after = Pt(6)
current_lines = []
for line in writer_text.split("\n"):
stripped = line.strip()
if not stripped:
flush_lines()
continue
if stripped.startswith("### "):
flush_lines()
h = doc.add_heading(stripped[4:], level=2)
# Insert image after each major section heading if available
if img_index < len(images) and images[img_index]:
try:
img_stream = BytesIO(images[img_index])
doc.add_picture(img_stream, width=Inches(5))
last_para = doc.paragraphs[-1]
last_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
cap = doc.add_paragraph(f"Figure {img_index + 1}")
cap.alignment = WD_ALIGN_PARAGRAPH.CENTER
cap.runs[0].italic = True
cap.runs[0].font.size = Pt(9)
img_index += 1
except Exception:
pass
elif stripped.startswith("## "):
flush_lines()
doc.add_heading(stripped[3:], level=1)
elif stripped.startswith("**") and stripped.endswith("**"):
flush_lines()
p = doc.add_paragraph()
run = p.add_run(stripped.strip("**"))
run.bold = True
elif stripped.startswith("- ") or stripped.startswith("* "):
flush_lines()
doc.add_paragraph(stripped[2:], style="List Bullet")
else:
current_lines.append(stripped)
flush_lines()
# Remaining images
while img_index < len(images):
if images[img_index]:
try:
img_stream = BytesIO(images[img_index])
doc.add_picture(img_stream, width=Inches(5))
last_para = doc.paragraphs[-1]
last_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
img_index += 1
except Exception:
img_index += 1
else:
img_index += 1
# Analyst review section
if analyst_review:
doc.add_page_break()
doc.add_heading("Análisis y Revisión", level=1)
for line in analyst_review.split("\n"):
if line.strip():
doc.add_paragraph(line.strip())
# Footer
doc.add_paragraph()
footer_p = doc.add_paragraph()
footer_p.alignment = WD_ALIGN_PARAGRAPH.CENTER
footer_run = footer_p.add_run("— Generado por Mission Control AI —")
footer_run.italic = True
footer_run.font.size = Pt(9)
footer_run.font.color.rgb = RGBColor(0x6b, 0x72, 0x80)
buf = BytesIO()
doc.save(buf)
buf.seek(0)
return buf.read()
# ── PARSE MANAGER DELEGATION ───────────────────────────────────────────────
def parse_delegation(manager_text: str) -> list[str]:
match = re.search(r'\{[^}]*"delegate"\s*:\s*\[([^\]]*)\][^}]*\}', manager_text)
if match:
raw = match.group(1)
keys = re.findall(r'"(\w+)"', raw)
return keys
# Heuristic fallback: detect keywords in manager response
delegates = []
lower = manager_text.lower()
if any(w in lower for w in ["informe", "documento", "redact", "escrib", "report", "word"]):
delegates.extend(["writer"])
if any(w in lower for w in ["imagen", "image", "foto", "visual", "ilustr"]):
delegates.extend(["image_agent"])
if any(w in lower for w in ["analiz", "revisar", "evalua", "critic"]):
delegates.extend(["analyst"])
return list(dict.fromkeys(delegates)) # deduplicate
def parse_image_queries(image_agent_text: str) -> list[str]:
match = re.search(r'"image_queries"\s*:\s*\[([^\]]*)\]', image_agent_text)
if match:
return re.findall(r'"([^"]+)"', match.group(1))
return []
def clean_text(text: str) -> str:
# Remove JSON blocks from display text
text = re.sub(r'\{[^}]*"delegate"[^}]*\}', '', text)
text = re.sub(r'\{[^}]*"image_queries"[^}]*\}', '', text)
return text.strip()
# ── ROUTES ─────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def root():
return HTMLResponse(Path("templates/index.html").read_text())
@app.get("/api/agents")
async def get_agents():
return {"agents": [
{"key": a["key"], "name": a["name"], "role": a["role"]}
for a in agent_registry.values()
]}
@app.post("/api/agents/add")
async def add_agent(request: Request):
body = await request.json()
key = re.sub(r'\W+', '_', body.get("key", "").lower().strip())
if not key:
return JSONResponse({"error": "key required"}, status_code=400)
agent_registry[key] = {
"key": key,
"name": body.get("name", key.capitalize()),
"role": body.get("role", "Agente de propósito general."),
"provider": body.get("provider", "openrouter"),
"models": body.get("models", ["meta-llama/llama-3.3-70b-instruct:free"]),
}
return {"success": True, "agent": agent_registry[key]}
@app.delete("/api/agents/{key}")
async def delete_agent(key: str):
if key in ("manager", "developer", "analyst"):
return JSONResponse({"error": "Cannot delete core agents"}, status_code=400)
if key in agent_registry:
del agent_registry[key]
return {"success": True}
@app.get("/api/history")
async def get_history():
return {"history": mission_history[-20:]}
@app.get("/api/docs/{filename}")
async def download_doc(filename: str):
path = DOCS_DIR / filename
if not path.exists() or not filename.endswith(".docx"):
return JSONResponse({"error": "File not found"}, status_code=404)
return FileResponse(
path,
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
filename=filename,
)
@app.post("/api/mission")
async def run_mission(request: Request):
body = await request.json()
task = body.get("task", "").strip()
if not task:
return JSONResponse({"error": "No task provided"}, status_code=400)
started_at = datetime.now().isoformat()
results = {}
doc_file = None
events = [] # orchestration event log
def log(msg: str):
events.append({"time": datetime.now().strftime("%H:%M:%S"), "msg": msg})
# ── STEP 1: Manager plans ──────────────────────────────────────────────
log("Manager analyzing task...")
manager_agent = agent_registry["manager"]
try:
manager_raw = await call_agent_llm(manager_agent, task)
delegates = parse_delegation(manager_raw)
manager_msg = clean_text(manager_raw)
results["manager"] = {
"status": "active", "message": manager_msg,
"model": manager_agent["models"][0],
"delegates": delegates,
}
log(f"Manager delegated to: {delegates or 'none'}")
except Exception as e:
results["manager"] = {"status": "resting", "message": str(e), "model": ""}
delegates = []
log(f"Manager failed: {e}")
# ── STEP 2: Run delegated agents ───────────────────────────────────────
image_bytes = []
writer_text = ""
analyst_text = ""
async def run_delegated(key: str):
nonlocal writer_text, analyst_text
if key not in agent_registry:
log(f"Agent '{key}' not found in registry")
return
agent = agent_registry[key]
log(f"{agent['name']} starting...")
if key == "image_agent":
try:
img_prompt = f"Proporciona consultas de búsqueda de imágenes en inglés para ilustrar: {task}"
raw = await call_agent_llm(agent, img_prompt)
queries = parse_image_queries(raw)
if not queries:
queries = [task[:40]]
log(f"ImageAgent searching: {queries}")
imgs = await asyncio.gather(*[get_image_for_query(q) for q in queries[:3]])
for img in imgs:
if img:
image_bytes.append(img)
results["image_agent"] = {
"status": "active",
"message": f"Encontradas {len(image_bytes)} imágenes para: {', '.join(queries[:3])}",
"model": agent["models"][0],
}
log(f"ImageAgent: {len(image_bytes)} images found")
except Exception as e:
results["image_agent"] = {"status": "resting", "message": str(e), "model": ""}
log(f"ImageAgent failed: {e}")
elif key == "writer":
try:
writer_prompt = (
f"Redacta un informe formal y completo sobre: {task}\n\n"
"Usa este formato:\n"
"## Resumen Ejecutivo\n[contenido]\n\n"
"### Introducción\n[contenido]\n\n"
"### Desarrollo\n[contenido con subsecciones]\n\n"
"### Conclusiones\n[contenido]\n\n"
"### Recomendaciones\n[contenido]\n\n"
"Sé formal, detallado y profesional."
)
writer_text = await call_agent_llm(agent, writer_prompt)
results["writer"] = {
"status": "active",
"message": writer_text[:200] + "..." if len(writer_text) > 200 else writer_text,
"model": agent["models"][0],
}
log("Writer completed document")
except Exception as e:
results["writer"] = {"status": "resting", "message": str(e), "model": ""}
log(f"Writer failed: {e}")
elif key == "analyst":
try:
content_to_review = writer_text or manager_msg
analyst_prompt = (
f"Revisa y analiza el siguiente contenido sobre: {task}\n\n"
f"Contenido:\n{content_to_review[:1000]}\n\n"
"Proporciona: 1) Evaluación de calidad, 2) Puntos fuertes, "
"3) Áreas de mejora, 4) Conclusión final."
)
analyst_text = await call_agent_llm(agent, analyst_prompt)
results["analyst"] = {
"status": "active",
"message": analyst_text[:200] + "..." if len(analyst_text) > 200 else analyst_text,
"model": agent["models"][0],
}
log("Analyst review completed")
except Exception as e:
results["analyst"] = {"status": "resting", "message": str(e), "model": ""}
log(f"Analyst failed: {e}")
else:
# Generic agent
try:
raw = await call_agent_llm(agent, task)
results[key] = {"status": "active", "message": raw, "model": agent["models"][0]}
log(f"{agent['name']} completed")
except Exception as e:
results[key] = {"status": "resting", "message": str(e), "model": ""}
# Run image_agent and writer in parallel, then analyst
parallel_first = [k for k in delegates if k in ("writer", "image_agent")]
sequential_after = [k for k in delegates if k == "analyst"]
other = [k for k in delegates if k not in ("writer", "image_agent", "analyst")]
if parallel_first or other:
await asyncio.gather(*[run_delegated(k) for k in parallel_first + other])
if sequential_after:
for k in sequential_after:
await run_delegated(k)
# ── STEP 3: Build .docx if writer was involved ─────────────────────────
if writer_text:
log("Assembling Word document...")
try:
doc_bytes = build_docx(task, {"writer": writer_text}, image_bytes, analyst_text)
safe_name = re.sub(r'[^\w\-]', '_', task[:40])
doc_filename = f"{safe_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
(DOCS_DIR / doc_filename).write_bytes(doc_bytes)
doc_file = doc_filename
results["manager"]["doc_file"] = doc_filename
log(f"Document ready: {doc_filename}")
except Exception as e:
log(f"Document build failed: {e}")
# Mark idle agents
for key in agent_registry:
if key not in results:
results[key] = {"status": "idle", "message": "", "model": ""}
final = results.get("manager", {}).get("message", "")[:300]
entry = {
"id": len(mission_history) + 1,
"task": task,
"started_at": started_at,
"ended_at": datetime.now().isoformat(),
"results": results,
"final": final,
"doc_file": doc_file,
"events": events,
}
mission_history.append(entry)
return JSONResponse({
"success": True, "task": task,
"results": results, "final": final,
"doc_file": doc_file, "events": events,
"mission_id": entry["id"],
})
@app.get("/api/health")
async def health():
return {
"status": "ok",
"providers": {
"gemini": "ok" if GOOGLE_API_KEY else "missing",
"openrouter": "ok" if OPENROUTER_API_KEY else "missing",
"groq": "ok" if GROQ_API_KEY else "missing",
"pexels": "ok" if PEXELS_API_KEY else "missing (optional)",
"hf_images": "ok" if HF_API_KEY else "missing (optional)",
}
}