Spaces:
Sleeping
Sleeping
| """ | |
| Chatbot engine with RAG pipeline and proactive/reactive logic. | |
| Adapted for the AI Trading Experiment with parameter-aware responses. | |
| Supports multiple LLM providers: | |
| - HuggingFace Inference API (free tier available) | |
| - DeepSeek API | |
| - Fallback rule-based responses | |
| """ | |
| import os | |
| import sys | |
| import random | |
| import requests | |
| from typing import Optional, List, Tuple, Dict, Any | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| # Attempt pysqlite3 workaround for HF Spaces | |
| try: | |
| __import__('pysqlite3') | |
| sys.modules['sqlite3'] = sys.modules.pop('pysqlite3') | |
| except ImportError: | |
| pass # Not in HF Spaces, use default sqlite3 | |
| # LangChain imports - using langchain_community for newer versions | |
| from langchain_community.document_loaders import TextLoader | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_text_splitters import CharacterTextSplitter | |
| from langchain_core.language_models.llms import LLM | |
| from config import Scenario, ResearcherControlledParams, ParticipantVisibleParams | |
| # Configuration | |
| KNOWLEDGE_BASE_DIR = "knowledge_base" | |
| VECTOR_DB_DIR = "db/vectorstore" | |
| class LLMProvider(Enum): | |
| """Available LLM providers.""" | |
| HUGGINGFACE = "huggingface" | |
| DEEPSEEK = "deepseek" | |
| FALLBACK = "fallback" | |
| # ==================== LLM Provider Selection ==================== | |
| # Check which API keys are available and select provider | |
| def get_llm_provider() -> LLMProvider: | |
| """Determine which LLM provider to use based on available credentials.""" | |
| if os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") or os.getenv("HF_API_KEY"): | |
| return LLMProvider.HUGGINGFACE | |
| elif os.getenv("DEEPSEEK_API_KEY"): | |
| return LLMProvider.DEEPSEEK | |
| else: | |
| print("Warning: No LLM API key found. Using fallback responses.") | |
| print("Set HF_TOKEN for HuggingFace or DEEPSEEK_API_KEY for DeepSeek.") | |
| return LLMProvider.FALLBACK | |
| # ==================== HuggingFace LLM ==================== | |
| # Recommended free/cheap models (smallest to largest): | |
| # - "microsoft/Phi-3-mini-4k-instruct" # 3.8B params, very fast | |
| # - "Qwen/Qwen2-1.5B-Instruct" # 1.5B params, smallest | |
| # - "HuggingFaceH4/zephyr-7b-beta" # 7B params, good quality | |
| # - "mistralai/Mistral-7B-Instruct-v0.2" # 7B params, popular | |
| # - "meta-llama/Llama-2-7b-chat-hf" # 7B params, requires approval | |
| DEFAULT_HF_MODEL = "HuggingFaceH4/zephyr-7b-beta" | |
| class HuggingFaceLLM(LLM): | |
| """LLM wrapper for HuggingFace Inference API (free tier available).""" | |
| api_key: str = "" | |
| model_id: str = DEFAULT_HF_MODEL | |
| temperature: float = 0.7 | |
| max_tokens: int = 512 | |
| def __init__(self, model_id: str = None, **kwargs): | |
| super().__init__(**kwargs) | |
| # Try multiple possible env var names | |
| self.api_key = ( | |
| os.getenv("HF_TOKEN") or | |
| os.getenv("HUGGINGFACE_TOKEN") or | |
| os.getenv("HF_API_KEY") or | |
| "" | |
| ) | |
| if model_id: | |
| self.model_id = model_id | |
| if self.api_key: | |
| print(f"Using HuggingFace model: {self.model_id}") | |
| else: | |
| print("Warning: No HuggingFace token found.") | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str: | |
| if not self.api_key: | |
| return self._fallback_response(prompt) | |
| # HuggingFace Inference API endpoint | |
| api_url = f"https://api-inference.huggingface.co/models/{self.model_id}" | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| # Format prompt for instruction-tuned models | |
| formatted_prompt = f"""<|system|> | |
| You are an AI trading advisor in the TradeVerse financial ecosystem. Provide helpful, concise advice.</s> | |
| <|user|> | |
| {prompt}</s> | |
| <|assistant|> | |
| """ | |
| payload = { | |
| "inputs": formatted_prompt, | |
| "parameters": { | |
| "max_new_tokens": self.max_tokens, | |
| "temperature": self.temperature, | |
| "do_sample": True, | |
| "return_full_text": False | |
| } | |
| } | |
| try: | |
| response = requests.post(api_url, headers=headers, json=payload, timeout=60) | |
| # Handle model loading (HF free tier may need to load model) | |
| if response.status_code == 503: | |
| data = response.json() | |
| wait_time = data.get("estimated_time", 20) | |
| print(f"Model loading, waiting {wait_time}s...") | |
| import time | |
| time.sleep(min(wait_time, 30)) | |
| response = requests.post(api_url, headers=headers, json=payload, timeout=60) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Handle different response formats | |
| if isinstance(data, list) and len(data) > 0: | |
| return data[0].get("generated_text", "").strip() | |
| elif isinstance(data, dict): | |
| return data.get("generated_text", "").strip() | |
| return self._fallback_response(prompt) | |
| except Exception as e: | |
| print(f"HuggingFace API error: {e}") | |
| return self._fallback_response(prompt) | |
| def _fallback_response(self, prompt: str) -> str: | |
| """Generate a basic response when API is unavailable.""" | |
| return FallbackLLM()._call(prompt) | |
| def _llm_type(self) -> str: | |
| return "huggingface_inference" | |
| # ==================== DeepSeek LLM ==================== | |
| DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions" | |
| class DeepSeekLLM(LLM): | |
| """LLM wrapper for DeepSeek API.""" | |
| api_key: str = "" | |
| temperature: float = 0.7 | |
| max_tokens: int = 512 | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| self.api_key = os.getenv("DEEPSEEK_API_KEY", "") | |
| if self.api_key: | |
| print("Using DeepSeek API") | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str: | |
| if not self.api_key: | |
| return FallbackLLM()._call(prompt) | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": "deepseek-chat", | |
| "messages": [ | |
| {"role": "system", "content": "You are an AI trading advisor in the TradeVerse financial ecosystem."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "temperature": self.temperature, | |
| "max_tokens": self.max_tokens | |
| } | |
| try: | |
| response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| return data["choices"][0]["message"]["content"].strip() | |
| except Exception as e: | |
| print(f"DeepSeek API error: {e}") | |
| return FallbackLLM()._call(prompt) | |
| def _llm_type(self) -> str: | |
| return "deepseek_api" | |
| # ==================== Fallback LLM (Rule-based) ==================== | |
| class FallbackLLM(LLM): | |
| """ | |
| Rule-based fallback when no API is available. | |
| Generates responses based on scenario context and parameters. | |
| """ | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str: | |
| """Generate a context-aware response without an LLM.""" | |
| prompt_lower = prompt.lower() | |
| # Detect recommendation requests | |
| if "buy" in prompt_lower and "recommend" in prompt_lower: | |
| return self._generate_buy_response(prompt) | |
| elif "sell" in prompt_lower and "recommend" in prompt_lower: | |
| return self._generate_sell_response(prompt) | |
| elif "hold" in prompt_lower and "recommend" in prompt_lower: | |
| return self._generate_hold_response(prompt) | |
| # Detect question types | |
| if "risk" in prompt_lower: | |
| return "When evaluating risk, consider the company's debt levels, market volatility, and any red flags like insider selling or unusual trading volume. The current scenario presents factors that warrant careful consideration." | |
| if "insider" in prompt_lower or "trading volume" in prompt_lower: | |
| return "Unusual insider activity or trading volume can signal that informed parties have information not yet public. This is often a warning sign that warrants caution." | |
| if "sector" in prompt_lower or "industry" in prompt_lower: | |
| return "Sector trends significantly impact individual companies. Consider broader market conditions, regulatory environment, and competitive dynamics when making your decision." | |
| # Default analytical response | |
| return "Based on the available information, I'd encourage you to weigh the key factors mentioned in the scenario. Consider both the potential opportunities and the risk factors before making your decision." | |
| def _generate_buy_response(self, prompt: str) -> str: | |
| return "Based on my analysis, buying could be appropriate here. The positive signals suggest potential upside, though you should consider your risk tolerance and the size of your position carefully." | |
| def _generate_sell_response(self, prompt: str) -> str: | |
| return "Based on my analysis, selling may be prudent. The risk factors present suggest potential downside that could outweigh staying invested. Consider protecting your capital." | |
| def _generate_hold_response(self, prompt: str) -> str: | |
| return "Based on my analysis, holding your position seems reasonable. The situation shows mixed signals, and waiting for more clarity before acting could be the wisest approach." | |
| def _llm_type(self) -> str: | |
| return "fallback_rules" | |
| # ==================== LLM Factory ==================== | |
| def create_llm(provider: LLMProvider = None, model_id: str = None) -> LLM: | |
| """Factory function to create the appropriate LLM instance.""" | |
| if provider is None: | |
| provider = get_llm_provider() | |
| if provider == LLMProvider.HUGGINGFACE: | |
| return HuggingFaceLLM(model_id=model_id) | |
| elif provider == LLMProvider.DEEPSEEK: | |
| return DeepSeekLLM() | |
| else: | |
| return FallbackLLM() | |
| # ==================== Chat Response ==================== | |
| class ChatResponse: | |
| """Response from the chatbot.""" | |
| message: str | |
| is_proactive: bool | |
| confidence_level: str # "low", "medium", "high" | |
| sources_used: List[str] | |
| # ==================== Trading Chatbot ==================== | |
| class TradingChatbot: | |
| """ | |
| AI Chatbot for the trading experiment. | |
| Supports both proactive advice and reactive queries. | |
| """ | |
| def __init__(self, llm_provider: LLMProvider = None, model_id: str = None): | |
| self.llm = create_llm(llm_provider, model_id) | |
| self.vectorstore = None | |
| self.chat_history: List[Tuple[str, str]] = [] | |
| self._initialize_knowledge_base() | |
| def _initialize_knowledge_base(self): | |
| """Load and index the knowledge base documents.""" | |
| docs = [] | |
| if os.path.exists(KNOWLEDGE_BASE_DIR): | |
| for filename in os.listdir(KNOWLEDGE_BASE_DIR): | |
| if filename.endswith(".txt"): | |
| filepath = os.path.join(KNOWLEDGE_BASE_DIR, filename) | |
| try: | |
| loader = TextLoader(filepath) | |
| docs.extend(loader.load()) | |
| except Exception as e: | |
| print(f"Error loading {filename}: {e}") | |
| if not docs: | |
| print("Warning: No knowledge base documents found.") | |
| return | |
| # Split documents into chunks | |
| splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| split_docs = splitter.split_documents(docs) | |
| texts = [doc.page_content for doc in split_docs] | |
| metadatas = [{"source": doc.metadata.get("source", "unknown")} for doc in split_docs] | |
| # Create embeddings and vectorstore | |
| try: | |
| embedding_function = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| os.makedirs(VECTOR_DB_DIR, exist_ok=True) | |
| self.vectorstore = Chroma( | |
| persist_directory=VECTOR_DB_DIR, | |
| embedding_function=embedding_function | |
| ) | |
| self.vectorstore.add_texts(texts=texts, metadatas=metadatas) | |
| self.vectorstore.persist() | |
| print(f"Knowledge base initialized with {len(texts)} chunks.") | |
| except Exception as e: | |
| print(f"Error initializing vectorstore: {e}") | |
| def _get_confidence_framing(self, level: int) -> Dict[str, str]: | |
| """Get language framing based on confidence parameter.""" | |
| if level < 34: | |
| return { | |
| "prefix": "Based on the available information, one possibility is that", | |
| "verb": "might consider", | |
| "qualifier": "though there is considerable uncertainty", | |
| "level": "low" | |
| } | |
| elif level < 67: | |
| return { | |
| "prefix": "Looking at the situation,", | |
| "verb": "suggests", | |
| "qualifier": "while noting some risk factors", | |
| "level": "medium" | |
| } | |
| else: | |
| return { | |
| "prefix": "Based on my analysis,", | |
| "verb": "strongly recommend", | |
| "qualifier": "with high confidence", | |
| "level": "high" | |
| } | |
| def _get_depth_instructions(self, level: int) -> str: | |
| """Get explanation depth instructions based on parameter.""" | |
| if level < 34: | |
| return "Provide a very brief response (1-2 sentences maximum). Focus only on the key point." | |
| elif level < 67: | |
| return "Provide a moderate explanation (3-4 sentences). Include the main reasoning and key factors." | |
| else: | |
| return "Provide a detailed analysis. Cover all relevant factors, risks, and opportunities comprehensively." | |
| def _get_risk_framing(self, level: int) -> str: | |
| """Get risk perspective based on parameter.""" | |
| if level < 34: | |
| return "Emphasize potential risks and downside scenarios. Favor capital preservation over potential gains." | |
| elif level < 67: | |
| return "Balance potential risks and opportunities. Present a measured risk-reward analysis." | |
| else: | |
| return "Emphasize potential opportunities and upside. Be willing to tolerate higher risk for potential gains." | |
| def _get_style_instructions(self, level: int) -> str: | |
| """Get communication style instructions based on parameter.""" | |
| if level < 34: | |
| return "Use formal, professional language. Be precise and measured in your statements." | |
| elif level < 67: | |
| return "Use clear, accessible language. Be professional but approachable." | |
| else: | |
| return "Use conversational, friendly language. Be direct and engaging." | |
| def _retrieve_context(self, query: str, k: int = 4) -> str: | |
| """Retrieve relevant context from the knowledge base.""" | |
| if not self.vectorstore: | |
| return "" | |
| try: | |
| docs = self.vectorstore.similarity_search(query, k=k) | |
| return "\n\n".join([doc.page_content for doc in docs]) | |
| except Exception as e: | |
| print(f"Error retrieving context: {e}") | |
| return "" | |
| def generate_proactive_advice( | |
| self, | |
| scenario: Scenario, | |
| visible_params: ParticipantVisibleParams, | |
| hidden_params: ResearcherControlledParams | |
| ) -> Optional[ChatResponse]: | |
| """ | |
| Generate proactive advice for a scenario. | |
| Returns None if proactive advice should not be shown. | |
| """ | |
| # Check if we should show proactive advice based on proactivity level | |
| proactive_threshold = hidden_params.proactivity_level / 100 | |
| if random.random() > proactive_threshold: | |
| return None | |
| # Build the prompt | |
| confidence = self._get_confidence_framing(hidden_params.confidence_framing) | |
| depth = self._get_depth_instructions(visible_params.explanation_depth) | |
| risk = self._get_risk_framing(hidden_params.risk_bias) | |
| style = self._get_style_instructions(visible_params.communication_style) | |
| # Retrieve relevant context | |
| context = self._retrieve_context( | |
| f"{scenario.company_name} {scenario.sector} trading analysis" | |
| ) | |
| # Determine what factors to highlight | |
| factors_to_mention = [] | |
| if hidden_params.risk_bias < 50: | |
| factors_to_mention = scenario.red_flags[:2] if scenario.red_flags else scenario.key_factors[:2] | |
| else: | |
| factors_to_mention = scenario.positive_signals[:2] if scenario.positive_signals else scenario.key_factors[:2] | |
| prompt = f""" | |
| {style} | |
| {depth} | |
| {risk} | |
| You are an AI trading advisor. A participant is viewing this trading scenario: | |
| Company: {scenario.company_name} ({scenario.company_symbol}) | |
| Sector: {scenario.sector} | |
| Country: {scenario.country} | |
| Current Price: {scenario.current_price} credits | |
| Situation: | |
| {scenario.situation_description} | |
| Key factors to consider: {', '.join(factors_to_mention)} | |
| Relevant knowledge: | |
| {context} | |
| You should proactively offer some initial observations about this situation. | |
| {confidence['prefix']} the situation {confidence['verb']} careful attention {confidence['qualifier']}. | |
| Your recommendation should lean toward: {scenario.ai_recommendation} | |
| Generate a brief proactive message offering your initial take on this situation. | |
| Do NOT explicitly tell them to BUY, SELL, or HOLD yet - this is an initial observation. | |
| Keep it natural, as if you're an advisor noticing something they should be aware of. | |
| """ | |
| response_text = self.llm._call(prompt) | |
| return ChatResponse( | |
| message=response_text, | |
| is_proactive=True, | |
| confidence_level=confidence["level"], | |
| sources_used=["market_context", "company_profile"] | |
| ) | |
| def generate_ai_recommendation( | |
| self, | |
| scenario: Scenario, | |
| visible_params: ParticipantVisibleParams, | |
| hidden_params: ResearcherControlledParams | |
| ) -> ChatResponse: | |
| """ | |
| Generate the AI's recommendation for a scenario. | |
| This is the main advice given before the participant decides. | |
| """ | |
| confidence = self._get_confidence_framing(hidden_params.confidence_framing) | |
| depth = self._get_depth_instructions(visible_params.explanation_depth) | |
| risk = self._get_risk_framing(hidden_params.risk_bias) | |
| style = self._get_style_instructions(visible_params.communication_style) | |
| context = self._retrieve_context( | |
| f"{scenario.company_name} {scenario.sector} {scenario.ai_recommendation}" | |
| ) | |
| prompt = f""" | |
| {style} | |
| {depth} | |
| {risk} | |
| You are an AI trading advisor. Analyze this situation and provide your recommendation: | |
| Company: {scenario.company_name} ({scenario.company_symbol}) | |
| Sector: {scenario.sector} | |
| Country: {scenario.country} | |
| Current Price: {scenario.current_price} credits | |
| Situation: | |
| {scenario.situation_description} | |
| Key factors: {', '.join(scenario.key_factors)} | |
| Warning signs: {', '.join(scenario.red_flags) if scenario.red_flags else 'None identified'} | |
| Positive signals: {', '.join(scenario.positive_signals) if scenario.positive_signals else 'None identified'} | |
| Relevant market knowledge: | |
| {context} | |
| {confidence['prefix']} I {confidence['verb']} the participant to {scenario.ai_recommendation} {confidence['qualifier']}. | |
| Generate your recommendation. Be clear about your suggested action ({scenario.ai_recommendation}). | |
| Explain your reasoning according to the depth level specified. | |
| Frame risks according to the risk perspective specified. | |
| """ | |
| response_text = self.llm._call(prompt) | |
| return ChatResponse( | |
| message=response_text, | |
| is_proactive=False, | |
| confidence_level=confidence["level"], | |
| sources_used=["market_context", "company_profile", "trading_basics"] | |
| ) | |
| def answer_query( | |
| self, | |
| query: str, | |
| scenario: Optional[Scenario], | |
| visible_params: ParticipantVisibleParams, | |
| hidden_params: ResearcherControlledParams | |
| ) -> ChatResponse: | |
| """ | |
| Answer a participant's question (reactive query). | |
| """ | |
| depth = self._get_depth_instructions(visible_params.explanation_depth) | |
| style = self._get_style_instructions(visible_params.communication_style) | |
| risk = self._get_risk_framing(hidden_params.risk_bias) | |
| confidence = self._get_confidence_framing(hidden_params.confidence_framing) | |
| # Retrieve context based on the query | |
| context = self._retrieve_context(query) | |
| # Build scenario context if available | |
| scenario_context = "" | |
| if scenario: | |
| scenario_context = f""" | |
| Current scenario: | |
| Company: {scenario.company_name} ({scenario.company_symbol}) | |
| Sector: {scenario.sector} | |
| Situation: {scenario.situation_description} | |
| """ | |
| # Include chat history for context | |
| history_context = "" | |
| if self.chat_history: | |
| recent_history = self.chat_history[-3:] # Last 3 exchanges | |
| history_context = "Recent conversation:\n" + "\n".join( | |
| [f"User: {q}\nAI: {a}" for q, a in recent_history] | |
| ) | |
| prompt = f""" | |
| {style} | |
| {depth} | |
| {risk} | |
| You are an AI trading advisor in the TradeVerse. Answer the participant's question. | |
| {scenario_context} | |
| {history_context} | |
| Relevant knowledge from your database: | |
| {context} | |
| User question: {query} | |
| Guidelines: | |
| - Only use information from the TradeVerse (fictional universe) | |
| - If asked about real-world companies or markets, politely redirect to TradeVerse | |
| - {confidence['prefix'].lower()} frame your response {confidence['qualifier']} | |
| - Be helpful but don't make decisions for the participant | |
| Provide your response: | |
| """ | |
| response_text = self.llm._call(prompt) | |
| # Update chat history | |
| self.chat_history.append((query, response_text)) | |
| return ChatResponse( | |
| message=response_text, | |
| is_proactive=False, | |
| confidence_level=confidence["level"], | |
| sources_used=["knowledge_base"] | |
| ) | |
| def clear_history(self): | |
| """Clear the chat history for a new session.""" | |
| self.chat_history = [] | |
| # Singleton instance (uses auto-detected provider) | |
| chatbot = TradingChatbot() | |