| import numpy as np |
| import pandas as pd |
| import requests |
| from io import StringIO |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics.pairwise import cosine_similarity |
| import speech_recognition as sr |
| import pyttsx3 |
| from googlesearch import search |
| from bs4 import BeautifulSoup |
| import urllib.request |
| from urllib.parse import quote |
|
|
| class HybridChatBot: |
| def __init__(self, dataset_url=None): |
| self.dataset_url = dataset_url |
| self.qa_pairs = {} |
| self.vectorizer = TfidfVectorizer() |
| self.X = None |
| self.recognizer = sr.Recognizer() |
| self.engine = pyttsx3.init() |
| |
| |
| voices = self.engine.getProperty('voices') |
| self.engine.setProperty('voice', voices[0].id) |
| self.engine.setProperty('rate', 150) |
| |
| if dataset_url: |
| self.load_dataset() |
| self.train() |
|
|
| def load_dataset(self): |
| """Load dataset from web resource""" |
| try: |
| response = requests.get(self.dataset_url) |
| response.raise_for_status() |
| |
| if self.dataset_url.endswith('.csv'): |
| data = pd.read_csv(StringIO(response.text)) |
| elif self.dataset_url.endswith('.json'): |
| data = pd.read_json(StringIO(response.text)) |
| else: |
| print("File format not supported") |
| return |
| |
| for _, row in data.iterrows(): |
| self.qa_pairs[row["question"].lower()] = row["answer"] |
| |
| print(f"Loaded {len(self.qa_pairs)} question-answer pairs") |
| |
| except Exception as e: |
| print(f"Error loading dataset: {e}") |
|
|
| def train(self): |
| """Train the model on loaded data""" |
| if not self.qa_pairs: |
| print("No data available for training!") |
| return |
| |
| questions = list(self.qa_pairs.keys()) |
| self.X = self.vectorizer.fit_transform(questions) |
| print("Model trained on loaded data") |
|
|
| def add_qa_pair(self, question, answer): |
| """Add new question-answer pair""" |
| self.qa_pairs[question.lower()] = answer |
| self.train() |
|
|
| def web_search(self, query, num_results=3): |
| """Perform web search and extract information""" |
| try: |
| print(f"\nSearching the web: {query}") |
| search_results = [] |
| |
| |
| for url in search(query, num_results=num_results, lang='en'): |
| try: |
| |
| req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) |
| with urllib.request.urlopen(req, timeout=5) as response: |
| html = response.read() |
| |
| |
| soup = BeautifulSoup(html, 'html.parser') |
| |
| |
| for script in soup(["script", "style", "iframe", "nav", "footer"]): |
| script.extract() |
| |
| |
| text = soup.get_text(separator=' ', strip=True) |
| text = ' '.join(text.split()[:200]) |
| |
| search_results.append({ |
| 'url': url, |
| 'content': text |
| }) |
| |
| except Exception as e: |
| print(f"Error processing {url}: {e}") |
| continue |
| |
| return search_results |
| |
| except Exception as e: |
| print(f"Search error: {e}") |
| return None |
|
|
| def get_response(self, user_input): |
| """Get response to user input""" |
| if not self.qa_pairs: |
| return "I'm not trained yet. Please add questions and answers." |
| |
| |
| if "search the web for" in user_input.lower() or "find online" in user_input.lower(): |
| query = user_input.replace("search the web for", "").replace("find online", "").strip() |
| search_results = self.web_search(query) |
| if search_results: |
| response = "Here's what I found online:\n" |
| for i, result in enumerate(search_results, 1): |
| response += f"\n{i}. {result['content']}\n(Source: {result['url']})\n" |
| return response[:2000] |
| else: |
| return "Couldn't find any information online." |
| |
| |
| user_vec = self.vectorizer.transform([user_input.lower()]) |
| similarities = cosine_similarity(user_vec, self.X) |
| best_match_idx = np.argmax(similarities) |
| best_match_score = similarities[0, best_match_idx] |
| |
| if best_match_score > 0.5: |
| best_question = list(self.qa_pairs.keys())[best_match_idx] |
| return self.qa_pairs[best_question] |
| else: |
| return "I don't know the answer to this question. Would you like me to search online? (Say 'search the web for...')" |
|
|
| def text_to_speech(self, text): |
| """Convert text to speech""" |
| self.engine.say(text) |
| self.engine.runAndWait() |
|
|
| def speech_to_text(self): |
| """Convert speech from microphone to text""" |
| with sr.Microphone() as source: |
| print("\nSpeak now...") |
| self.recognizer.adjust_for_ambient_noise(source) |
| try: |
| audio = self.recognizer.listen(source, timeout=5) |
| text = self.recognizer.recognize_google(audio, language="en-US") |
| print(f"Recognized: {text}") |
| return text |
| except sr.UnknownValueError: |
| print("Speech not recognized") |
| return None |
| except sr.RequestError: |
| print("Recognition service error") |
| return None |
| except sr.WaitTimeoutError: |
| print("Timeout expired") |
| return None |
|
|
| def run(self): |
| """Improved interaction interface""" |
| print("\n" + "="*50) |
| print("WELCOME TO INTELLIGENT CHATBOT".center(50)) |
| print("="*50) |
| |
| current_mode = "text" |
| while True: |
| print("\n" + "-"*50) |
| print(f"Current input mode: {current_mode.upper()}") |
| print("[1] Send text message") |
| print("[2] Speak to the bot") |
| print("[3] Switch input mode") |
| print("[4] Teach the bot a new answer") |
| print("[5] Web search") |
| print("[6] Exit") |
| |
| try: |
| choice = input("Choose action (1-6): ").strip() |
| |
| if choice == "1": |
| user_input = input("\nYour message: ") |
| if user_input.lower() in ["exit", "stop"]: |
| break |
| |
| response = self.get_response(user_input) |
| if response: |
| print(f"\nBot: {response}") |
| self.text_to_speech(response) |
| else: |
| print("\nBot: I don't know what to say. Would you like to teach me?") |
| |
| elif choice == "2": |
| user_input = self.speech_to_text() |
| if user_input: |
| if user_input.lower() in ["exit", "stop"]: |
| break |
| |
| response = self.get_response(user_input) |
| if response: |
| print(f"\nBot: {response}") |
| self.text_to_speech(response) |
| else: |
| print("\nBot: I don't know how to respond to that.") |
| self.text_to_speech("I don't know how to respond to that") |
| |
| elif choice == "3": |
| current_mode = "voice" if current_mode == "text" else "text" |
| print(f"\nMode changed to: {current_mode.upper()}") |
| |
| elif choice == "4": |
| print("\nTeaching the bot:") |
| question = input("Enter question: ") |
| answer = input("Enter answer: ") |
| self.add_qa_pair(question, answer) |
| print("Bot successfully trained!") |
| |
| elif choice == "5": |
| query = input("\nEnter search query: ") |
| search_results = self.web_search(query) |
| if search_results: |
| print("\nSearch results:") |
| for i, result in enumerate(search_results, 1): |
| print(f"\n{i}. {result['content']}\n(Source: {result['url']})\n") |
| else: |
| print("\nNothing found.") |
| |
| elif choice == "6": |
| print("\nShutting down...") |
| break |
| |
| else: |
| print("\nPlease choose an option between 1 and 6") |
| |
| except KeyboardInterrupt: |
| print("\nShutting down...") |
| break |
|
|
| if __name__ == "__main__": |
| |
| DATASET_URL = "https://raw.githubusercontent.com/user/repo/main/qa_data.csv" |
| |
| bot = HybridChatBot(DATASET_URL) |
| bot.run() |