| import os, shutil, tempfile, re |
| from pathlib import Path |
| import gradio as gr |
| from git import Repo |
| import requests |
|
|
| |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") |
| OPENROUTER_MODEL = "nvidia/nemotron-nano-12b-v2-vl:free" |
| OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" |
| HEADERS = { |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", |
| "Content-Type": "application/json", |
| } |
|
|
| ALLOWED_EXT = { |
| ".py", ".ipynb", ".md", ".txt", ".js", ".ts", ".tsx", ".jsx", ".java", |
| ".kt", ".c", ".cpp", ".cs", ".go", ".rs", ".rb", ".php", ".sql", ".html", |
| ".css", ".yml", ".yaml", ".toml", ".ini", ".json" |
| } |
| SKIP_DIRS = { |
| "node_modules", ".git", "dist", "build", "out", "venv", ".venv", |
| "__pycache__", ".next", ".cache", "target", "bin", "obj", ".idea", ".vscode" |
| } |
| MAX_FILE_BYTES = 800_000 |
|
|
| |
| def clone_repo(url: str) -> Path: |
| d = Path(tempfile.mkdtemp(prefix=".tmp_repo_")).resolve() |
| Repo.clone_from(url, d, depth=1) |
| return d |
|
|
| def read_repo_text(repo_dir: Path) -> str: |
| buf = [] |
| for root, dirs, files in os.walk(repo_dir): |
| dirs[:] = [x for x in dirs if x not in SKIP_DIRS] |
| for f in files: |
| p = Path(root) / f |
| if p.suffix.lower() in ALLOWED_EXT and p.stat().st_size <= MAX_FILE_BYTES: |
| try: |
| txt = p.read_text(encoding="utf-8", errors="ignore") |
| if txt.strip(): |
| rel = str(p.relative_to(repo_dir)) |
| buf.append(f"\n=== FILE: {rel} ===\n{txt}") |
| except Exception: |
| pass |
| return "\n".join(buf) |
|
|
| def analyze_repo(url: str): |
| if not url or not re.match(r"^https?://", url.strip()): |
| return None, "β Invalid URL" |
| repo_dir = None |
| try: |
| repo_dir = clone_repo(url.strip()) |
| text = read_repo_text(repo_dir) |
| if not text.strip(): |
| return None, "β οΈ No readable text files found" |
| kb_size = len(text) // 1000 |
| return text, f"β
Repo loaded successfully ({kb_size} KB of text)" |
| except Exception as e: |
| return None, f"β Error: {e}" |
| finally: |
| if repo_dir and Path(repo_dir).exists(): |
| shutil.rmtree(repo_dir, ignore_errors=True) |
|
|
| |
| def openrouter_chat(system_prompt, user_prompt, context=""): |
| messages = [{"role": "system", "content": system_prompt}] |
| if context: |
| messages.append({"role": "system", "content": f"Repository context:\n{context}"}) |
| messages.append({"role": "user", "content": user_prompt}) |
|
|
| payload = {"model": OPENROUTER_MODEL, "messages": messages} |
| try: |
| r = requests.post(OPENROUTER_URL, headers=HEADERS, json=payload, timeout=120) |
| r.raise_for_status() |
| obj = r.json() |
| if "choices" in obj and obj["choices"]: |
| msg = obj["choices"][0]["message"]["content"] |
| return msg.strip() |
| return "[OpenRouter] Unexpected response format." |
| except Exception as e: |
| return f"[OpenRouter error] {e}" |
|
|
| |
| SYSTEM_PROMPT = ( |
| "You are an expert developer assistant. You help users explore and understand " |
| "a GitHub repository. Base every response strictly on the repo's content and structure. " |
| "If unsure, say so. Explain clearly and concisely. Avoid hallucinating." |
| ) |
|
|
| def chat_repo(user_msg, chat_history, repo_text): |
| if not repo_text: |
| chat_history.append({"role": "assistant", "content": "β Please analyze a repository first."}) |
| return chat_history, "" |
| |
| context = repo_text[:120000] |
| response = openrouter_chat(SYSTEM_PROMPT, user_msg, context) |
| chat_history.append({"role": "user", "content": user_msg}) |
| chat_history.append({"role": "assistant", "content": response}) |
| return chat_history, "" |
|
|
| |
| with gr.Blocks(title="Repo Chatbot Β· OpenRouter") as demo: |
| gr.Markdown( |
| """ |
| # π€ Repo Chatbot β powered by OpenRouter |
| Chat with your GitHub repository! |
| Upload a repo URL and ask anything about its **code, structure, or design**. |
| _(No embeddings, just pure context reasoning.)_ |
| """ |
| ) |
|
|
| repo_state = gr.State() |
| chat_history = gr.State([]) |
|
|
| with gr.Row(): |
| repo_url = gr.Textbox( |
| label="GitHub repo URL", |
| placeholder="https://github.com/owner/repo", |
| scale=4 |
| ) |
| analyze_btn = gr.Button("π Analyze Repo", scale=1) |
|
|
| status_box = gr.Markdown() |
|
|
| |
| chatbot = gr.Chatbot(label="Repo Chatbot", height=500, type="messages") |
| user_box = gr.Textbox(label="Ask something about the repo...") |
|
|
| clear_btn = gr.Button("π§Ή Clear Chat") |
|
|
| |
| def analyze_repo_cb(url): |
| text, status = analyze_repo(url) |
| return text, status |
|
|
| analyze_btn.click(analyze_repo_cb, inputs=[repo_url], outputs=[repo_state, status_box]) |
| user_box.submit(chat_repo, inputs=[user_box, chat_history, repo_state], |
| outputs=[chatbot, user_box]) |
| clear_btn.click(lambda: ([], ""), None, [chatbot, user_box]) |
|
|
| demo.queue() |
| demo.launch(server_name="0.0.0.0", server_port=7860,mcp_server=True) |
|
|