| | import gradio as gr |
| | from faster_whisper import WhisperModel |
| | from transformers import AutoTokenizer, AutoModelForCausalLM |
| | import torch |
| | import requests |
| | import base64 |
| | import tempfile |
| | import os |
| | import logging |
| | import time |
| | import json |
| | from datetime import datetime |
| | from html.parser import HTMLParser |
| | from fastapi import FastAPI, Request, Query |
| | from fastapi.responses import JSONResponse |
| | from fastapi.middleware.cors import CORSMiddleware |
| | import uvicorn |
| |
|
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | logger.info("Loading models...") |
| | whisper_model = WhisperModel("tiny", device="cpu", compute_type="int8") |
| | model_name = "HuggingFaceTB/SmolLM2-360M-Instruct" |
| | tokenizer = AutoTokenizer.from_pretrained(model_name) |
| | model = AutoModelForCausalLM.from_pretrained( |
| | model_name, |
| | torch_dtype=torch.float32, |
| | device_map="cpu", |
| | low_cpu_mem_usage=True |
| | ) |
| | logger.info("Models loaded!") |
| |
|
| | def search_parallel(query): |
| | """DuckDuckGo search""" |
| | logger.info(f"[SEARCH] Query: {query}") |
| | try: |
| | response = requests.get( |
| | 'https://html.duckduckgo.com/html/', |
| | params={'q': query}, |
| | headers={'User-Agent': 'Mozilla/5.0'}, |
| | timeout=1.5 |
| | ) |
| | if response.status_code == 200: |
| | class DDGParser(HTMLParser): |
| | def __init__(self): |
| | super().__init__() |
| | self.results = [] |
| | self.in_result = False |
| | self.current_text = "" |
| | |
| | def handle_starttag(self, tag, attrs): |
| | if tag == 'a' and any(k == 'class' and 'result__a' in v for k, v in attrs): |
| | self.in_result = True |
| | |
| | def handle_data(self, data): |
| | if self.in_result and data.strip(): |
| | self.current_text += data.strip() + " " |
| | |
| | def handle_endtag(self, tag): |
| | if tag == 'a' and self.in_result: |
| | if self.current_text: |
| | self.results.append(self.current_text.strip()[:120]) |
| | self.current_text = "" |
| | self.in_result = False |
| | |
| | parser = DDGParser() |
| | parser.feed(response.text) |
| | result = "\n".join([f"• {r}" for r in parser.results[:2]]) if parser.results else "No results" |
| | logger.info(f"[SEARCH] ✓ Found {len(parser.results)} results") |
| | return result, "DuckDuckGo" |
| | except Exception as e: |
| | logger.error(f"[SEARCH] Error: {str(e)}") |
| | return "No search results", "None" |
| |
|
| | def generate_answer(text_input): |
| | """Generate answer""" |
| | logger.info(f"[AI] Question: {text_input}") |
| | |
| | try: |
| | if not text_input or not text_input.strip(): |
| | return "No input provided" |
| | |
| | current_date = datetime.now().strftime("%B %d, %Y") |
| | |
| | search_start = time.time() |
| | search_results, search_engine = search_parallel(text_input) |
| | search_time = time.time() - search_start |
| | logger.info(f"[AI] Search: {search_time:.2f}s") |
| | |
| | messages = [ |
| | {"role": "system", "content": f"Today is {current_date}. Answer briefly (60-80 words)."}, |
| | {"role": "user", "content": f"Search:\n{search_results}\n\nQ: {text_input}\nA:"} |
| | ] |
| | |
| | prompt = f"<|im_start|>system\n{messages[0]['content']}<|im_end|>\n<|im_start|>user\n{messages[1]['content']}<|im_end|>\n<|im_start|>assistant\n" |
| | |
| | gen_start = time.time() |
| | inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=800) |
| | |
| | with torch.no_grad(): |
| | outputs = model.generate( |
| | **inputs, |
| | max_new_tokens=80, |
| | temperature=0.7, |
| | do_sample=True, |
| | top_p=0.9, |
| | top_k=40, |
| | repetition_penalty=1.15, |
| | pad_token_id=tokenizer.eos_token_id |
| | ) |
| | |
| | answer = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True).strip() |
| | gen_time = time.time() - gen_start |
| | logger.info(f"[AI] Gen: {gen_time:.2f}s") |
| | logger.info(f"[AI] Answer: {answer[:100]}...") |
| | |
| | return f"{answer}\n\n**Source:** {search_engine}" |
| | |
| | except Exception as e: |
| | logger.error(f"[AI] Error: {str(e)}") |
| | return f"Error: {str(e)}" |
| |
|
| | |
| | app = FastAPI() |
| |
|
| | |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | @app.middleware("http") |
| | async def log_requests(request: Request, call_next): |
| | """Log all requests""" |
| | logger.info("="*80) |
| | logger.info(f"[REQUEST] Method: {request.method}") |
| | logger.info(f"[REQUEST] URL: {request.url}") |
| | logger.info(f"[REQUEST] Headers: {dict(request.headers)}") |
| | logger.info(f"[REQUEST] Query params: {dict(request.query_params)}") |
| | |
| | |
| | if request.method == "POST": |
| | body = await request.body() |
| | logger.info(f"[REQUEST] Raw body ({len(body)} bytes): {body}") |
| | try: |
| | body_str = body.decode('utf-8') |
| | logger.info(f"[REQUEST] Body as string: {body_str}") |
| | body_json = json.loads(body_str) |
| | logger.info(f"[REQUEST] Body as JSON: {body_json}") |
| | except Exception as e: |
| | logger.error(f"[REQUEST] Body parse error: {str(e)}") |
| | |
| | response = await call_next(request) |
| | logger.info(f"[RESPONSE] Status: {response.status_code}") |
| | logger.info("="*80) |
| | return response |
| |
|
| | @app.post("/api/ai") |
| | async def api_ai_post(request: Request): |
| | """AI endpoint - POST""" |
| | try: |
| | body = await request.body() |
| | |
| | if not body: |
| | return JSONResponse({"error": "Empty body"}, status_code=400) |
| | |
| | data = json.loads(body.decode('utf-8')) |
| | logger.info(f"[API POST] Parsed: {data}") |
| | |
| | question = data.get("text", "") |
| | if not question: |
| | return JSONResponse({"error": "No 'text' field"}, status_code=400) |
| | |
| | answer = generate_answer(question) |
| | return JSONResponse({"answer": answer}) |
| | |
| | except Exception as e: |
| | logger.error(f"[API POST] Error: {str(e)}") |
| | return JSONResponse({"error": str(e)}, status_code=500) |
| |
|
| | @app.get("/api/ai") |
| | async def api_ai_get(text: str = Query(default="", description="Question")): |
| | """AI endpoint - GET""" |
| | try: |
| | logger.info(f"[API GET] text param: '{text}'") |
| | |
| | if not text: |
| | return JSONResponse({"error": "No text parameter"}, status_code=400) |
| | |
| | answer = generate_answer(text) |
| | return JSONResponse({"answer": answer}) |
| | |
| | except Exception as e: |
| | logger.error(f"[API GET] Error: {str(e)}") |
| | return JSONResponse({"error": str(e)}, status_code=500) |
| |
|
| | @app.get("/health") |
| | async def health(): |
| | return {"status": "ok", "model": "SmolLM2-360M", "endpoints": ["/api/ai (GET/POST)"]} |
| |
|
| | |
| | with gr.Blocks(title="Fast Q&A") as demo: |
| | gr.Markdown(""" |
| | # ⚡ Fast Q&A - SmolLM2-360M |
| | |
| | ## 🎯 Pluely Configuration |
| | |
| | ### Method 1: GET Request (RECOMMENDED - Works with Pluely) |
| | |
| | **Curl Command for Pluely:** |
| | ``` |
| | curl https://archcoder-basic-app.hf.space/api/ai?text={{TEXT}} |
| | ``` |
| | |
| | **Response Path:** `answer` |
| | |
| | **Streaming:** OFF |
| | |
| | --- |
| | |
| | ### Method 2: POST Request (Alternative) |
| | |
| | **Curl Command for Pluely:** |
| | ``` |
| | curl -X POST https://archcoder-basic-app.hf.space/api/ai -H "Content-Type: application/json" -d {\"text\":\"{{TEXT}}\"} |
| | ``` |
| | |
| | **Response Path:** `answer` |
| | |
| | **Streaming:** OFF |
| | |
| | --- |
| | |
| | ## 🧪 Test Manually |
| | |
| | **Windows CMD:** |
| | ``` |
| | curl "https://archcoder-basic-app.hf.space/api/ai?text=Who+is+the+president" |
| | ``` |
| | |
| | **PowerShell:** |
| | ``` |
| | Invoke-RestMethod -Uri "https://archcoder-basic-app.hf.space/api/ai?text=Who is the president" |
| | ``` |
| | |
| | **Browser:** |
| | ``` |
| | https://archcoder-basic-app.hf.space/api/ai?text=Who is the president |
| | ``` |
| | """) |
| | |
| | with gr.Tab("Test"): |
| | test_input = gr.Textbox(label="Question", placeholder="Ask anything...") |
| | test_btn = gr.Button("🚀 Test") |
| | test_output = gr.Textbox(label="Answer", lines=8) |
| | test_btn.click(fn=generate_answer, inputs=[test_input], outputs=[test_output]) |
| | |
| | with gr.Tab("Logs"): |
| | gr.Markdown(""" |
| | ## How to Check Logs |
| | |
| | 1. Go to your Hugging Face Space |
| | 2. Click on **"Logs"** tab at the top |
| | 3. You'll see all requests with: |
| | - Request method and URL |
| | - Headers |
| | - Body content |
| | - Response |
| | |
| | This helps debug what Pluely is actually sending! |
| | """) |
| |
|
| | app = gr.mount_gradio_app(app, demo, path="/") |
| |
|
| | if __name__ == "__main__": |
| | uvicorn.run(app, host="0.0.0.0", port=7860) |
| |
|