| import streamlit as st |
| from config import SERVER_CONFIG |
| import uuid |
| import json |
| import os |
| from services.logging_service import get_logger |
| from services.mcp_service import connect_to_mcp_servers |
|
|
| |
| def init_session(): |
| defaults = { |
| "params": {}, |
| "current_chat_id": None, |
| "current_chat_index": 0, |
| "history_chats": get_history(), |
| "messages": [], |
| "client": None, |
| "agent": None, |
| "tools": [], |
| "tool_executions": [], |
| "servers": SERVER_CONFIG['mcpServers'], |
| "auto_connect_attempted": False |
| } |
| |
| for key, val in defaults.items(): |
| if key not in st.session_state: |
| st.session_state[key] = val |
|
|
|
|
| def auto_connect_to_mcp(): |
| """Automatically connect to MCP servers on first page load""" |
| try: |
| logger = get_logger() |
| logger.log_system_status("Auto-connecting to MCP servers on first load") |
| |
| |
| params = st.session_state.get('params', {}) |
| if not params.get('model_id') or not params.get('temperature'): |
| logger.log_system_status("Skipping auto-connect: LLM parameters not configured yet") |
| st.session_state["auto_connect_attempted"] = True |
| return |
| |
| |
| connect_to_mcp_servers() |
| |
| |
| st.session_state["auto_connect_attempted"] = True |
| |
| |
| if st.session_state.get("agent"): |
| logger.log_system_status("Successfully auto-connected to MCP servers", { |
| 'servers_count': len(st.session_state.servers), |
| 'tools_count': len(st.session_state.tools) |
| }) |
| else: |
| logger.log_system_status("Auto-connect attempted but no agent available") |
| |
| except Exception as e: |
| |
| logger = get_logger() |
| logger.log_error( |
| "Auto_MCP_Connection_Error", |
| str(e), |
| {'servers': list(st.session_state.servers.keys())} |
| ) |
| logger.log_system_status(f"Auto-connect failed: {str(e)}") |
| |
| |
| st.session_state["auto_connect_attempted"] = True |
|
|
|
|
| def load_example_chats(): |
| """Load example chat histories from JSON files""" |
| example_chats = [] |
| |
| |
| example_files = [ |
| { |
| "file": "chat_Bio_QA_mcp_agent_20250908_122027.json", |
| "display_name": "Bio QA Example: What is DNA?" |
| }, |
| { |
| "file": "chat_Review_mcp_agent_20250908_121128.json", |
| "display_name": "Review Example: AML Risk Stratification" |
| } |
| ] |
| |
| |
| current_dir = os.path.dirname(os.path.abspath(__file__)) |
| chat_history_dir = os.path.join(current_dir, "..", "chat_history") |
| |
| for example in example_files: |
| file_path = os.path.join(chat_history_dir, example["file"]) |
| if os.path.exists(file_path): |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| chat_data = json.load(f) |
| |
| chat_data['chat_name'] = example["display_name"] |
| example_chats.append(chat_data) |
| except Exception as e: |
| logger = get_logger() |
| logger.log_error("LoadExampleChat", str(e), {"file": example["file"]}) |
| |
| return example_chats |
|
|
|
|
| def get_history(): |
| if "history_chats" in st.session_state and st.session_state["history_chats"]: |
| return st.session_state["history_chats"] |
| else: |
| |
| example_chats = load_example_chats() |
| |
| |
| chat_id = str(uuid.uuid4()) |
| new_chat = {'chat_id': chat_id, |
| 'chat_name': 'New chat', |
| 'messages': []} |
| |
| |
| all_chats = example_chats + [new_chat] |
| |
| st.session_state["current_chat_index"] = 0 |
| st.session_state["current_chat_id"] = chat_id |
| |
| return all_chats |
|
|
| def get_current_chat(chat_id): |
| """Get messages for the current chat.""" |
| for chat in st.session_state["history_chats"]: |
| if chat['chat_id'] == chat_id: |
| return chat['messages'] |
| return [] |
|
|
| def _append_message_to_session(msg: dict) -> None: |
| """ |
| Append *msg* to the current chat’s message list **and** |
| keep history_chats in-sync. |
| """ |
| chat_id = st.session_state["current_chat_id"] |
| st.session_state["messages"].append(msg) |
| for chat in st.session_state["history_chats"]: |
| if chat["chat_id"] == chat_id: |
| chat["messages"] = st.session_state["messages"] |
| if chat["chat_name"] == "New chat": |
| chat["chat_name"] = " ".join(msg["content"].split()[:5]) or "Empty" |
| break |
|
|
| def create_chat(): |
| """Create a new chat session.""" |
| logger = get_logger() |
| chat_id = str(uuid.uuid4()) |
| new_chat = {'chat_id': chat_id, |
| 'chat_name': 'New chat', |
| 'messages': []} |
| |
| st.session_state["history_chats"].append(new_chat) |
| st.session_state["current_chat_index"] = 0 |
| st.session_state["current_chat_id"] = chat_id |
| |
| |
| logger.log_user_action("create_chat", { |
| 'chat_id': chat_id, |
| 'total_chats': len(st.session_state["history_chats"]) |
| }) |
| |
| return new_chat |
|
|
| def delete_chat(chat_id: str): |
| """Delete a chat from history.""" |
| if not chat_id: |
| return |
|
|
| logger = get_logger() |
| |
| |
| chat_to_delete = None |
| for chat in st.session_state["history_chats"]: |
| if chat["chat_id"] == chat_id: |
| chat_to_delete = chat |
| break |
| |
| if chat_to_delete: |
| logger.log_user_action("delete_chat", { |
| 'chat_id': chat_id, |
| 'chat_name': chat_to_delete.get('chat_name'), |
| 'messages_count': len(chat_to_delete.get('messages', [])) |
| }) |
|
|
| |
| st.session_state["history_chats"] = [ |
| c for c in st.session_state["history_chats"] |
| if c["chat_id"] != chat_id |
| ] |
|
|
| |
| if st.session_state["current_chat_id"] == chat_id: |
| if st.session_state["history_chats"]: |
| first = st.session_state["history_chats"][0] |
| st.session_state["current_chat_id"] = first["chat_id"] |
| st.session_state["current_chat_index"] = 0 |
| st.session_state["messages"] = first["messages"] |
| else: |
| new_chat = create_chat() |
| st.session_state["messages"] = new_chat["messages"] |
| return |