Spaces:
Paused
Paused
| # app/core/model_loader.py | |
| import os | |
| import json | |
| import logging | |
| import time | |
| from datetime import datetime | |
| import google.generativeai as genai | |
| from google.generativeai.types import HarmCategory, HarmBlockThreshold | |
| from dotenv import load_dotenv | |
| # Force load .env immediately upon module import | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| STATS_FILE = "usage_stats.json" | |
| class LLMSingleton: | |
| _instance = None | |
| def get_instance(cls): | |
| if cls._instance is None: | |
| cls._instance = cls() | |
| return cls._instance | |
| def __init__(self): | |
| if self._instance is not None: | |
| raise Exception("Singleton instance already exists!") | |
| self.api_key = os.getenv("GEMINI_API_KEY") | |
| if not self.api_key: | |
| logger.warning("⚠️ GEMINI_API_KEY not found in environment variables. AI features will fail.") | |
| else: | |
| genai.configure(api_key=self.api_key) | |
| # Gemini Flash Latest: Stable alias (likely 1.5 Flash) | |
| # Explicitly pinning to 1.5-flash to guarantee the 1500 RPD free tier | |
| self.model_name = "gemini-flash-latest" | |
| self.generation_config = { | |
| "temperature": 0.3, | |
| "top_p": 0.95, | |
| "top_k": 64, | |
| "max_output_tokens": 8192, | |
| "response_mime_type": "application/json", | |
| } | |
| self.safety_settings = { | |
| HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, | |
| } | |
| # Load persistent stats | |
| self.stats = self._load_stats() | |
| self._check_daily_reset() | |
| # RPM Tracking | |
| self.rpm_limit = 15 | |
| self.minute_window_start = time.time() | |
| self.requests_this_minute = 0 | |
| def _load_stats(self): | |
| default_stats = { | |
| "total_requests": 0, | |
| "successful_requests": 0, | |
| "rate_limit_hits": 0, | |
| "input_tokens": 0, | |
| "output_tokens": 0, | |
| "errors": 0, | |
| "local_model_requests": 0, | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "daily_requests_count": 0 | |
| } | |
| if os.path.exists(STATS_FILE): | |
| try: | |
| with open(STATS_FILE, "r") as f: | |
| data = json.load(f) | |
| # Merge defaults for backward compatibility | |
| return {**default_stats, **data} | |
| except Exception as e: | |
| logger.error(f"Failed to load stats: {e}") | |
| return default_stats | |
| def _save_stats(self): | |
| try: | |
| with open(STATS_FILE, "w") as f: | |
| json.dump(self.stats, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save stats: {e}") | |
| def _check_daily_reset(self): | |
| today = datetime.now().strftime("%Y-%m-%d") | |
| if self.stats.get("date") != today: | |
| logger.info("📅 New day detected. Resetting daily AI quotas.") | |
| self.stats["date"] = today | |
| self.stats["daily_requests_count"] = 0 | |
| # We don't reset total_requests to keep lifetime stats, or we could. | |
| # Let's keep lifetime stats in 'total_requests' and daily in 'daily_requests_count'. | |
| self._save_stats() | |
| def _check_rpm_window(self): | |
| """ | |
| Resets the minute counter if 60 seconds have passed. | |
| """ | |
| now = time.time() | |
| if now - self.minute_window_start >= 60: | |
| self.minute_window_start = now | |
| self.requests_this_minute = 0 | |
| def get_usage_stats(self): | |
| self._check_daily_reset() | |
| self._check_rpm_window() | |
| stats = self.stats.copy() | |
| # Limits for Gemini 3 Flash (Free Tier) | |
| daily_limit = 500 | |
| stats["limits"] = { | |
| "requests_per_minute": self.rpm_limit, | |
| "requests_per_day": daily_limit | |
| } | |
| stats["remaining_daily_requests"] = max(0, daily_limit - stats["daily_requests_count"]) | |
| stats["remaining_rpm"] = max(0, self.rpm_limit - self.requests_this_minute) | |
| return stats | |
| def track_local_usage(self, input_chars: int = 0): | |
| """ | |
| Track usage of local models (like CodeBERT). | |
| """ | |
| self.stats["local_model_requests"] += 1 | |
| self.stats["input_tokens"] += input_chars // 4 | |
| self._save_stats() | |
| def generate(self, prompt: str, max_tokens: int = 8192) -> str: | |
| import random | |
| self._check_daily_reset() | |
| self._check_rpm_window() | |
| if not self.api_key: | |
| logger.error("Cannot generate: Missing GEMINI_API_KEY") | |
| return "" | |
| # Check daily quota before sending (Gemini 3 Flash Limit) | |
| if self.stats["daily_requests_count"] >= 500: | |
| logger.error("❌ Daily Quota Exceeded (500 requests). Request blocked.") | |
| return "" | |
| # Track attempt | |
| self.requests_this_minute += 1 | |
| self.stats["total_requests"] += 1 | |
| self.stats["daily_requests_count"] += 1 | |
| self._save_stats() | |
| # Log the prompt source (first 50 chars) to identify the caller | |
| logger.info(f"🤖 Generating with Gemini. Prompt start: {prompt[:50]}...") | |
| model = genai.GenerativeModel( | |
| model_name=self.model_name, | |
| generation_config=self.generation_config, | |
| safety_settings=self.safety_settings | |
| ) | |
| retries = 0 | |
| max_retries = 5 | |
| base_delay = 2 | |
| while retries <= max_retries: | |
| try: | |
| # Estimate input tokens (rough approximation: 4 chars/token) | |
| self.stats["input_tokens"] += len(prompt) // 4 | |
| response = model.generate_content(prompt) | |
| # Update stats | |
| self.stats["successful_requests"] += 1 | |
| if response.usage_metadata: | |
| if response.text: | |
| self.stats["output_tokens"] += len(response.text) // 4 | |
| else: | |
| if response.text: | |
| self.stats["output_tokens"] += len(response.text) // 4 | |
| self._save_stats() | |
| return response.text.strip() | |
| except Exception as e: | |
| error_str = str(e) | |
| if "429" in error_str or "quota" in error_str.lower(): | |
| self.stats["rate_limit_hits"] += 1 | |
| self._save_stats() | |
| wait_time = (base_delay * (2 ** retries)) + random.uniform(0, 1) | |
| logger.warning(f"⚠️ Rate limit hit. Retrying in {wait_time:.2f}s... (Attempt {retries+1}/{max_retries})") | |
| time.sleep(wait_time) | |
| retries += 1 | |
| else: | |
| self.stats["errors"] += 1 | |
| self._save_stats() | |
| logger.error(f"Gemini generation failed: {e}") | |
| return "" | |
| self.stats["errors"] += 1 | |
| self._save_stats() | |
| logger.error("❌ Max retries reached. Request failed.") | |
| return "" | |
| def generate_text(self, prompt: str) -> str: | |
| """ | |
| Helper for non-JSON text generation (like Guides). | |
| """ | |
| self._check_daily_reset() | |
| self._check_rpm_window() | |
| if not self.api_key: | |
| return "Error: Missing API Key." | |
| if self.stats["daily_requests_count"] >= 500: | |
| return "Error: Daily Quota Exceeded." | |
| try: | |
| self.requests_this_minute += 1 | |
| self.stats["total_requests"] += 1 | |
| self.stats["daily_requests_count"] += 1 | |
| self._save_stats() | |
| # Override config for text | |
| config = self.generation_config.copy() | |
| config["response_mime_type"] = "text/plain" | |
| model = genai.GenerativeModel( | |
| model_name=self.model_name, | |
| generation_config=config, | |
| safety_settings=self.safety_settings | |
| ) | |
| response = model.generate_content(prompt) | |
| return response.text | |
| except Exception as e: | |
| logger.error(f"Gemini text generation failed: {e}") | |
| return f"Error generating content: {str(e)}" | |
| llm_engine = LLMSingleton.get_instance() |