| | import gradio as gr |
| | import openai |
| | import requests |
| | import json |
| | from typing import Dict, Any, List, Tuple |
| | from datetime import datetime |
| | import os |
| |
|
| | class MCPClient: |
| | """MCP Client for communicating with the MCP server""" |
| | |
| | def __init__(self, server_url: str): |
| | self.server_url = server_url.rstrip('/') |
| | |
| | def call_tool_sync(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: |
| | """Synchronous tool call using requests instead of aiohttp""" |
| | if arguments is None: |
| | arguments = {} |
| | |
| | mcp_request = { |
| | "jsonrpc": "2.0", |
| | "id": 1, |
| | "method": "tools/call", |
| | "params": { |
| | "name": tool_name, |
| | "arguments": arguments |
| | } |
| | } |
| | |
| | try: |
| | response = requests.post( |
| | f"{self.server_url}/mcp", |
| | json=mcp_request, |
| | headers={ |
| | "Content-Type": "application/json", |
| | "ngrok-skip-browser-warning": "true" |
| | }, |
| | timeout=30 |
| | ) |
| | |
| | if response.status_code == 200: |
| | result = response.json() |
| | if "result" in result and "content" in result["result"]: |
| | content = result["result"]["content"][0]["text"] |
| | return json.loads(content) |
| | return result |
| | else: |
| | return { |
| | "success": False, |
| | "error": f"HTTP {response.status_code}: {response.text}" |
| | } |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "error": f"Connection error: {str(e)}" |
| | } |
| | |
| | def list_tools_sync(self) -> List[Dict[str, Any]]: |
| | """Synchronous tool listing using requests""" |
| | mcp_request = { |
| | "jsonrpc": "2.0", |
| | "id": 1, |
| | "method": "tools/list" |
| | } |
| | |
| | try: |
| | response = requests.post( |
| | f"{self.server_url}/mcp", |
| | json=mcp_request, |
| | headers={ |
| | "Content-Type": "application/json", |
| | "ngrok-skip-browser-warning": "true" |
| | }, |
| | timeout=30 |
| | ) |
| | |
| | if response.status_code == 200: |
| | result = response.json() |
| | return result.get("result", {}).get("tools", []) |
| | return [] |
| | except Exception as e: |
| | print(f"Error listing tools: {str(e)}") |
| | return [] |
| |
|
| | class AIAssistant: |
| | """AI Assistant with MCP integration""" |
| | |
| | def __init__(self, openai_api_key: str, mcp_client: MCPClient): |
| | try: |
| | self.openai_client = openai.OpenAI( |
| | api_key=openai_api_key, |
| | timeout=30.0 |
| | ) |
| | except Exception as e: |
| | |
| | openai.api_key = openai_api_key |
| | self.openai_client = openai |
| | self.mcp_client = mcp_client |
| | self.available_tools = [] |
| | |
| | def initialize(self): |
| | """Initialize the assistant by fetching available tools""" |
| | self.available_tools = self.mcp_client.list_tools_sync() |
| | |
| | def get_system_prompt(self) -> str: |
| | """Generate system prompt with available tools""" |
| | tools_description = "\n".join([ |
| | f"- {tool['name']}: {tool['description']}" |
| | for tool in self.available_tools |
| | ]) |
| | |
| | return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools. |
| | |
| | Available tools: |
| | {tools_description} |
| | |
| | When a user asks for information that can be retrieved using these tools, you should: |
| | 1. Identify which tool(s) would be helpful |
| | 2. Call the appropriate tool(s) with the right parameters |
| | 3. Wait for the results before providing your final response |
| | |
| | For SAP-related queries (purchase orders, requisitions), use the SAP tools. |
| | For news-related queries, use the news tools. |
| | |
| | To call a tool, use this exact format: |
| | CALL_TOOL: tool_name |
| | or |
| | CALL_TOOL: tool_name(parameter1=value1, parameter2=value2) |
| | |
| | Examples: |
| | - For "show me purchase orders": CALL_TOOL: get_purchase_orders |
| | - For "get 20 purchase orders": CALL_TOOL: get_purchase_orders(top=20) |
| | - For "latest tech news": CALL_TOOL: get_news_headlines(category=technology) |
| | - For "get news from BBC": CALL_TOOL: get_news_by_source(source_id=bbc-news) |
| | - For "get news from CNN": CALL_TOOL: get_news_by_source(source_id=cnn) |
| | - For "get news from Reuters": CALL_TOOL: get_news_by_source(source_id=reuters) |
| | |
| | IMPORTANT: For news by source queries, always include the source_id parameter: |
| | - BBC: source_id=bbc-news |
| | - CNN: source_id=cnn |
| | - Reuters: source_id=reuters |
| | - Associated Press: source_id=associated-press |
| | - The Guardian: source_id=the-guardian |
| | - Washington Post: source_id=the-washington-post |
| | |
| | After calling a tool, I will provide you with the results to interpret for the user. |
| | """ |
| | |
| | def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: |
| | """Extract tool calls from AI response""" |
| | tool_calls = [] |
| | lines = response.split('\n') |
| | |
| | for line in lines: |
| | line = line.strip() |
| | if line.startswith('CALL_TOOL:'): |
| | try: |
| | |
| | tool_part = line[10:].strip() |
| | |
| | |
| | if '(' in tool_part and ')' in tool_part: |
| | tool_name = tool_part.split('(')[0].strip() |
| | params_str = tool_part.split('(')[1].split(')')[0] |
| | |
| | params = {} |
| | if params_str.strip(): |
| | for param in params_str.split(','): |
| | if '=' in param: |
| | key, value = param.split('=', 1) |
| | key = key.strip() |
| | value = value.strip().strip('"\'') |
| | try: |
| | if value.isdigit(): |
| | value = int(value) |
| | elif value.lower() in ['true', 'false']: |
| | value = value.lower() == 'true' |
| | except: |
| | pass |
| | params[key] = value |
| | |
| | tool_calls.append({ |
| | 'name': tool_name, |
| | 'arguments': params |
| | }) |
| | else: |
| | |
| | tool_name = tool_part.strip() |
| | tool_calls.append({ |
| | 'name': tool_name, |
| | 'arguments': {} |
| | }) |
| | |
| | except Exception as e: |
| | print(f"Error parsing tool call '{line}': {e}") |
| | continue |
| | |
| | return tool_calls |
| | |
| | def truncate_tool_result(self, result: Dict[str, Any], max_chars: int = 2000) -> Dict[str, Any]: |
| | """Truncate tool results to prevent context overflow""" |
| | if not isinstance(result, dict): |
| | return result |
| | |
| | result_copy = result.copy() |
| | result_str = json.dumps(result_copy, indent=2) |
| | |
| | if len(result_str) > max_chars: |
| | |
| | for key, value in result_copy.items(): |
| | if isinstance(value, list) and len(value) > 3: |
| | result_copy[key] = value[:3] + [f"... ({len(value) - 3} more items truncated)"] |
| | elif isinstance(value, str) and len(value) > 500: |
| | result_copy[key] = value[:500] + "... (truncated)" |
| | |
| | |
| | result_str = json.dumps(result_copy, indent=2) |
| | if len(result_str) > max_chars: |
| | result_copy = { |
| | "success": result.get("success", False), |
| | "truncated": True, |
| | "message": f"Result truncated due to size. Original had {len(result_str)} characters.", |
| | "sample_data": str(result)[:1000] + "..." if len(str(result)) > 1000 else str(result) |
| | } |
| | |
| | return result_copy |
| |
|
| | def process_message(self, user_message: str) -> Tuple[str, str]: |
| | """Process user message and handle tool calls""" |
| | tool_info = "" |
| | |
| | try: |
| | messages = [ |
| | {"role": "system", "content": self.get_system_prompt()}, |
| | {"role": "user", "content": user_message} |
| | ] |
| | |
| | |
| | if hasattr(self.openai_client, 'chat'): |
| | response = self.openai_client.chat.completions.create( |
| | model="gpt-3.5-turbo", |
| | messages=messages, |
| | temperature=0.7, |
| | max_tokens=800 |
| | ) |
| | ai_response = response.choices[0].message.content |
| | else: |
| | |
| | response = self.openai_client.ChatCompletion.create( |
| | model="gpt-3.5-turbo", |
| | messages=messages, |
| | temperature=0.7, |
| | max_tokens=800 |
| | ) |
| | ai_response = response.choices[0].message.content |
| | tool_calls = self.extract_tool_calls(ai_response) |
| | |
| | |
| | print(f"AI Response: {ai_response}") |
| | print(f"Extracted tool calls: {tool_calls}") |
| | |
| | if tool_calls: |
| | tool_results = [] |
| | |
| | for tool_call in tool_calls: |
| | tool_info += f"π§ Calling: {tool_call['name']}\n" |
| | |
| | |
| | result = self.mcp_client.call_tool_sync( |
| | tool_call['name'], |
| | tool_call['arguments'] |
| | ) |
| | |
| | |
| | truncated_result = self.truncate_tool_result(result) |
| | |
| | tool_results.append({ |
| | 'tool': tool_call['name'], |
| | 'result': truncated_result |
| | }) |
| | |
| | if result.get('success'): |
| | tool_info += f"β
{tool_call['name']} completed\n" |
| | else: |
| | tool_info += f"β {tool_call['name']} failed: {result.get('error', 'Unknown error')}\n" |
| | |
| | |
| | tool_results_text = "\n\n".join([ |
| | f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)[:1500]}{'...(truncated)' if len(json.dumps(tr['result'], indent=2)) > 1500 else ''}" |
| | for tr in tool_results |
| | ]) |
| | |
| | final_messages = messages + [ |
| | {"role": "assistant", "content": ai_response}, |
| | {"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."} |
| | ] |
| | |
| | |
| | if hasattr(self.openai_client, 'chat'): |
| | final_response = self.openai_client.chat.completions.create( |
| | model="gpt-3.5-turbo", |
| | messages=final_messages, |
| | temperature=0.7, |
| | max_tokens=800 |
| | ) |
| | return final_response.choices[0].message.content, tool_info |
| | else: |
| | final_response = self.openai_client.ChatCompletion.create( |
| | model="gpt-3.5-turbo", |
| | messages=final_messages, |
| | temperature=0.7, |
| | max_tokens=800 |
| | ) |
| | return final_response.choices[0].message.content, tool_info |
| | else: |
| | return ai_response, "" |
| | |
| | except Exception as e: |
| | return f"β Error processing your request: {str(e)}", "" |
| |
|
| | |
| | assistant = None |
| | mcp_client = None |
| |
|
| | def test_connection(mcp_url): |
| | """Test MCP server connection""" |
| | if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io": |
| | return "β Please enter a valid MCP server URL" |
| | |
| | try: |
| | |
| | response = requests.get(f"{mcp_url.rstrip('/')}/health", timeout=10) |
| | if response.status_code == 200: |
| | data = response.json() |
| | |
| | |
| | mcp_request = { |
| | "jsonrpc": "2.0", |
| | "id": 1, |
| | "method": "tools/list" |
| | } |
| | |
| | mcp_response = requests.post( |
| | f"{mcp_url.rstrip('/')}/mcp", |
| | json=mcp_request, |
| | headers={ |
| | "Content-Type": "application/json", |
| | "ngrok-skip-browser-warning": "true" |
| | }, |
| | timeout=10 |
| | ) |
| | |
| | if mcp_response.status_code == 200: |
| | mcp_data = mcp_response.json() |
| | tools = mcp_data.get("result", {}).get("tools", []) |
| | tool_names = [tool.get("name", "Unknown") for tool in tools] |
| | |
| | return f"β
Connected successfully!\nHealth Status: {data.get('status', 'Unknown')}\nMCP Tools: {len(tools)}\nAvailable: {', '.join(tool_names)}" |
| | else: |
| | return f"β
Health OK, but MCP endpoint failed: HTTP {mcp_response.status_code}" |
| | else: |
| | return f"β Connection failed: HTTP {response.status_code}" |
| | except Exception as e: |
| | return f"β Connection error: {str(e)}" |
| |
|
| | def initialize_assistant(openai_key, mcp_url): |
| | """Initialize the AI assistant""" |
| | global assistant, mcp_client |
| | |
| | if not openai_key: |
| | return "β Please enter your OpenAI API key" |
| | |
| | if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io": |
| | return "β Please enter a valid MCP server URL" |
| | |
| | try: |
| | mcp_client = MCPClient(mcp_url) |
| | assistant = AIAssistant(openai_key, mcp_client) |
| | assistant.initialize() |
| | return f"β
AI Assistant initialized with {len(assistant.available_tools)} tools available" |
| | except Exception as e: |
| | return f"β Failed to initialize: {str(e)}" |
| |
|
| | def chat_interface(message, history, openai_key, mcp_url): |
| | """Main chat interface""" |
| | global assistant |
| | |
| | if not assistant: |
| | init_result = initialize_assistant(openai_key, mcp_url) |
| | if "β" in init_result: |
| | history.append([message, init_result]) |
| | return history, "" |
| | |
| | try: |
| | print(f"Calling process_message with: {message}") |
| | |
| | |
| | |
| | if len(history) > 10: |
| | history = history[-10:] |
| | |
| | |
| | result = assistant.process_message(message) |
| | print(f"process_message returned: {type(result)} - {result}") |
| | |
| | |
| | if isinstance(result, tuple) and len(result) == 2: |
| | response, tool_info = result |
| | print(f"Unpacked: response={response}, tool_info={tool_info}") |
| | else: |
| | response = str(result) |
| | tool_info = "" |
| | print(f"Single result: {response}") |
| | |
| | |
| | if tool_info: |
| | full_response = f"**Tool Execution:**\n{tool_info}\n\n**Response:**\n{response}" |
| | else: |
| | full_response = response |
| | |
| | history.append([message, full_response]) |
| | return history, "" |
| | except Exception as e: |
| | import traceback |
| | error_response = f"β Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}" |
| | print(f"Error in chat_interface: {error_response}") |
| | history.append([message, error_response]) |
| | return history, "" |
| |
|
| | |
| | with gr.Blocks(title="AI Assistant with SAP & News Integration", theme=gr.themes.Soft()) as demo: |
| | gr.Markdown("# π€ AI Assistant with SAP & News Integration") |
| | gr.Markdown("Chat with an AI that can access SAP business data and news through natural language queries.") |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | chatbot = gr.Chatbot( |
| | height=500, |
| | show_label=False, |
| | container=True, |
| | bubble_full_width=False |
| | ) |
| | |
| | msg = gr.Textbox( |
| | placeholder="Ask me about SAP data, news, or anything else...", |
| | show_label=False, |
| | container=False |
| | ) |
| | |
| | with gr.Row(): |
| | submit_btn = gr.Button("Send", variant="primary") |
| | clear_btn = gr.Button("Clear", variant="secondary") |
| | |
| | with gr.Column(scale=1): |
| | gr.Markdown("### βοΈ Configuration") |
| | |
| | openai_key = gr.Textbox( |
| | label="OpenAI API Key", |
| | type="password", |
| | placeholder="sk-..." |
| | ) |
| | |
| | mcp_url = gr.Textbox( |
| | label="MCP Server URL", |
| | value="https://your-ngrok-url.ngrok.io", |
| | placeholder="https://abc123.ngrok.io" |
| | ) |
| | |
| | test_btn = gr.Button("Test Connection", variant="secondary") |
| | connection_status = gr.Textbox(label="Connection Status", interactive=False) |
| | |
| | gr.Markdown("### π Example Queries") |
| | gr.Markdown(""" |
| | - "Show me recent purchase orders" |
| | - "Get purchase requisitions" |
| | - "What's the latest tech news?" |
| | - "Get news from BBC" |
| | - "Show me business news from the US" |
| | """) |
| | |
| | |
| | def respond(message, history, openai_key, mcp_url): |
| | return chat_interface(message, history, openai_key, mcp_url) |
| | |
| | submit_btn.click( |
| | respond, |
| | [msg, chatbot, openai_key, mcp_url], |
| | [chatbot, msg] |
| | ) |
| | |
| | msg.submit( |
| | respond, |
| | [msg, chatbot, openai_key, mcp_url], |
| | [chatbot, msg] |
| | ) |
| | |
| | clear_btn.click(lambda: ([], ""), outputs=[chatbot, msg]) |
| | |
| | test_btn.click( |
| | test_connection, |
| | [mcp_url], |
| | [connection_status] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |