| import html |
| import json |
| import mimetypes |
| import os |
| import re |
| import time |
| import traceback |
| from pathlib import Path |
| from typing import Dict, List |
| from urllib.parse import quote_plus, urlparse |
|
|
| import chromadb |
| import chromadb.utils.embedding_functions as embedding_functions |
| import fitz |
| import pandas as pd |
| import requests |
| from bs4 import BeautifulSoup |
| from dotenv import load_dotenv |
| from duckduckgo_search import DDGS |
| from duckduckgo_search.exceptions import ( |
| ConversationLimitException, |
| DuckDuckGoSearchException, |
| RatelimitException, |
| TimeoutException, |
| ) |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_community.document_loaders import ( |
| BSHTMLLoader, |
| JSONLoader, |
| PyPDFLoader, |
| TextLoader, |
| UnstructuredFileLoader, |
| ) |
| from langchain_community.tools import BraveSearch |
| from markdownify import markdownify |
| from ollama import chat |
| from PIL import Image |
| from smolagents import Tool, tool |
| from smolagents.utils import truncate_content |
|
|
| load_dotenv() |
|
|
|
|
| class ReadFileContentTool(Tool): |
| name = "read_file_content" |
| description = """Reads local files in various formats (text, CSV, Excel, PDF, HTML, etc.) and returns their content as readable text. Automatically detects and processes the appropriate file format.""" |
|
|
| inputs = { |
| "file_path": { |
| "type": "string", |
| "description": "The full path to the file from which the content should be read.", |
| } |
| } |
| output_type = "string" |
|
|
| def forward(self, file_path: str) -> str: |
| if not os.path.exists(file_path): |
| return f"❌ File does not exist: {file_path}" |
|
|
| ext = os.path.splitext(file_path)[1].lower() |
|
|
| try: |
| if ext == ".txt": |
| with open(file_path, "r", encoding="utf-8") as f: |
| return truncate_content(f.read()) |
|
|
| elif ext == ".csv": |
| df = pd.read_csv(file_path) |
| return truncate_content( |
| f"CSV Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}" |
| ) |
|
|
| elif ext in [".xlsx", ".xls"]: |
| df = pd.read_excel(file_path) |
| return truncate_content( |
| f"Excel Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}" |
| ) |
|
|
| elif ext == ".pdf": |
| doc = fitz.open(file_path) |
| text = "".join([page.get_text() for page in doc]) |
| doc.close() |
| return truncate_content( |
| text.strip() or "⚠️ PDF contains no readable text." |
| ) |
|
|
| elif ext == ".json": |
| with open(file_path, "r", encoding="utf-8") as f: |
| return truncate_content(f.read()) |
|
|
| elif ext == ".py": |
| with open(file_path, "r", encoding="utf-8") as f: |
| return truncate_content(f.read()) |
|
|
| elif ext in [".html", ".htm"]: |
| with open(file_path, "r", encoding="utf-8") as f: |
| html = f.read() |
| try: |
| markdown = markdownify(html).strip() |
| markdown = re.sub(r"\n{3,}", "\n\n", markdown) |
| return f"📄 HTML content (converted to Markdown):\n\n{truncate_content(markdown)}" |
| except Exception: |
| soup = BeautifulSoup(html, "html.parser") |
| text = soup.get_text(separator="\n").strip() |
| return f"📄 HTML content (raw text fallback):\n\n{truncate_content(text)}" |
|
|
| elif ext in [".mp3", ".wav"]: |
| return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use transcribe_audio tool to process the audio content." |
|
|
| elif ext in [".mp4", ".mov", ".avi"]: |
| return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use transcribe_video tool to process the video content." |
|
|
| else: |
| return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}" |
|
|
| except Exception as e: |
| return f"❌ Could not read {file_path}: {e}" |
|
|
|
|
| class WikipediaSearchTool(Tool): |
| name = "wikipedia_search" |
| description = """Searches Wikipedia for a specific topic and returns a concise summary. Useful for background information on subjects, concepts, historical events, or scientific topics.""" |
|
|
| inputs = { |
| "query": { |
| "type": "string", |
| "description": "The query or subject to search for on Wikipedia.", |
| } |
| } |
| output_type = "string" |
|
|
| def forward(self, query: str) -> str: |
| print(f"EXECUTING TOOL: wikipedia_search(query='{query}')") |
| try: |
| search_link = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json" |
| search_response = requests.get(search_link, timeout=10) |
| search_response.raise_for_status() |
| search_data = search_response.json() |
|
|
| if not search_data.get("query", {}).get("search", []): |
| return f"No Wikipedia info for '{query}'." |
|
|
| page_id = search_data["query"]["search"][0]["pageid"] |
|
|
| content_link = ( |
| f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&" |
| f"exintro=1&explaintext=1&pageids={page_id}&format=json" |
| ) |
| content_response = requests.get(content_link, timeout=10) |
| content_response.raise_for_status() |
| content_data = content_response.json() |
|
|
| extract = content_data["query"]["pages"][str(page_id)]["extract"] |
| if len(extract) > 1500: |
| extract = extract[:1500] + "..." |
|
|
| result = f"Wikipedia summary for '{query}':\n{extract}" |
| print(f"-> Tool Result (Wikipedia): {result[:100]}...") |
| return result |
|
|
| except Exception as e: |
| print(f"❌ Error in wikipedia_search: {e}") |
| traceback.print_exc() |
| return f"Error wiki: {e}" |
|
|
|
|
| class TranscribeAudioTool(Tool): |
| name = "transcribe_audio" |
| description = """Converts spoken content in audio files to text. Handles various audio formats and produces a transcript of the spoken content for analysis.""" |
|
|
| inputs = { |
| "file_path": { |
| "type": "string", |
| "description": "The full path to the audio file that needs to be transcribed.", |
| } |
| } |
| output_type = "string" |
|
|
| def forward(self, file_path: str) -> str: |
| try: |
| import os |
| import tempfile |
|
|
| import speech_recognition as sr |
| from pydub import AudioSegment |
|
|
| |
| if not os.path.exists(file_path): |
| return ( |
| f"❌ Audio file not found at: {file_path}. Download the file first." |
| ) |
|
|
| |
| recognizer = sr.Recognizer() |
|
|
| |
| file_ext = os.path.splitext(file_path)[1].lower() |
|
|
| if file_ext != ".wav": |
| |
| temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name |
|
|
| |
| audio = AudioSegment.from_file(file_path) |
| audio.export(temp_wav, format="wav") |
| audio_path = temp_wav |
| else: |
| audio_path = file_path |
|
|
| |
| with sr.AudioFile(audio_path) as source: |
| audio_data = recognizer.record(source) |
| transcript = recognizer.recognize_google(audio_data) |
|
|
| |
| if file_ext != ".wav" and os.path.exists(temp_wav): |
| os.remove(temp_wav) |
|
|
| return transcript.strip() |
|
|
| except Exception as e: |
| return f"❌ Transcription failed: {str(e)}" |
|
|
|
|
| class TranscibeVideoFileTool(Tool): |
| name = "transcribe_video" |
| description = """Extracts and transcribes speech from video files. Converts the audio portion of videos into readable text for analysis or reference.""" |
|
|
| inputs = { |
| "file_path": { |
| "type": "string", |
| "description": "The full path to the video file that needs to be transcribed.", |
| } |
| } |
| output_type = "string" |
|
|
| def forward(self, file_path: str) -> str: |
| try: |
| |
| if not os.path.exists(file_path): |
| return ( |
| f"❌ Video file not found at: {file_path}. Download the file first." |
| ) |
|
|
| import os |
| import tempfile |
|
|
| import moviepy.editor as mp |
| import speech_recognition as sr |
|
|
| |
| video = mp.VideoFileClip(file_path) |
|
|
| |
| temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name |
|
|
| |
| video.audio.write_audiofile(temp_audio, verbose=False, logger=None) |
| video.close() |
|
|
| |
| recognizer = sr.Recognizer() |
|
|
| |
| with sr.AudioFile(temp_audio) as source: |
| audio_data = recognizer.record(source) |
| transcript = recognizer.recognize_google(audio_data) |
|
|
| |
| if os.path.exists(temp_audio): |
| os.remove(temp_audio) |
|
|
| return transcript.strip() |
|
|
| except Exception as e: |
| return f"❌ Video processing failed: {str(e)}" |
|
|
|
|
| class BraveWebSearchTool(Tool): |
| name = "web_search" |
| description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query.""" |
|
|
| inputs = { |
| "query": { |
| "type": "string", |
| "description": "A web search query string (e.g., a question or query).", |
| } |
| } |
| output_type = "string" |
|
|
| |
| api_key = "asdasfd" |
| count = 3 |
| char_limit = 4000 |
| tool = BraveSearch.from_api_key(api_key=api_key, search_kwargs={"count": count}) |
|
|
| def extract_main_text(self, url: str, char_limit: int) -> str: |
| try: |
| headers = {"User-Agent": "Mozilla/5.0"} |
| response = requests.get(url, headers=headers, timeout=10) |
| soup = BeautifulSoup(response.text, "html.parser") |
|
|
| |
| for tag in soup(["script", "style", "noscript"]): |
| tag.extract() |
|
|
| |
| body = soup.body |
| if not body: |
| return "⚠️ Could not extract content." |
|
|
| text = " ".join(t.strip() for t in body.stripped_strings) |
| return text[:char_limit].strip() |
| except Exception as e: |
| return f"⚠️ Failed to extract article: {e}" |
|
|
| def forward(self, query: str) -> str: |
| try: |
| results_json = self.tool.run(query) |
| results = ( |
| json.loads(results_json) |
| if isinstance(results_json, str) |
| else results_json |
| ) |
|
|
| output_parts = [] |
| for i, r in enumerate(results[: self.count], start=1): |
| title = html.unescape(r.get("title", "").strip()) |
| link = r.get("link", "").strip() |
|
|
| article_text = self.extract_main_text(link, self.char_limit) |
|
|
| result_block = ( |
| f"Result {i}:\n" |
| f"Title: {title}\n" |
| f"URL: {link}\n" |
| f"Extracted Content:\n{article_text}\n" |
| ) |
| output_parts.append(result_block) |
|
|
| return "\n\n".join(output_parts).strip() |
|
|
| except Exception as e: |
| return f"Search failed: {str(e)}" |
|
|
|
|
| class DescribeImageTool(Tool): |
| name = "describe_image" |
| description = """Analyzes images and generates detailed text descriptions. Identifies objects, scenes, text, and visual elements within the image to provide context or understanding.""" |
|
|
| inputs = { |
| "image_path": { |
| "type": "string", |
| "description": "The full path to the image file to describe.", |
| } |
| } |
| output_type = "string" |
|
|
| def forward(self, image_path: str) -> str: |
| import os |
|
|
| from PIL import Image |
| from transformers import BlipForConditionalGeneration, BlipProcessor |
|
|
| if not os.path.exists(image_path): |
| return f"❌ Image file does not exist: {image_path}" |
|
|
| try: |
| processor = BlipProcessor.from_pretrained( |
| "Salesforce/blip-image-captioning-base", use_fast=True |
| ) |
| model = BlipForConditionalGeneration.from_pretrained( |
| "Salesforce/blip-image-captioning-base" |
| ) |
|
|
| image = Image.open(image_path).convert("RGB") |
| inputs = processor(images=image, return_tensors="pt") |
| output_ids = model.generate(**inputs) |
|
|
| caption = processor.decode(output_ids[0], skip_special_tokens=True) |
| return caption.strip() or "⚠️ No caption could be generated." |
| except Exception as e: |
| return f"❌ Failed to describe image: {e}" |
|
|
|
|
| class DownloadFileFromLinkTool(Tool): |
| name = "download_file_from_link" |
| description = "Downloads files from a URL and saves them locally. Supports various formats including PDFs, documents, images, and data files. Returns the local file path for further processing." |
|
|
| inputs = { |
| "link": {"type": "string", "description": "The URL to download the file from."}, |
| "file_name": { |
| "type": "string", |
| "description": "Desired name of the saved file, without extension.", |
| "nullable": True, |
| }, |
| } |
|
|
| output_type = "string" |
| SUPPORTED_EXTENSIONS = { |
| ".xlsx", |
| ".pdf", |
| ".txt", |
| ".csv", |
| ".json", |
| ".xml", |
| ".html", |
| ".jpg", |
| ".jpeg", |
| ".png", |
| ".mp4", |
| ".mp3", |
| ".wav", |
| ".zip", |
| } |
|
|
| def forward(self, link: str, file_name: str = "taskfile") -> str: |
| print(f"⬇️ Downloading file from: {link}") |
| dir_path = "./downloads" |
| os.makedirs(dir_path, exist_ok=True) |
|
|
| try: |
| response = requests.get(link, stream=True, timeout=30) |
| except requests.RequestException as e: |
| return f"❌ Error: Request failed - {e}" |
|
|
| if response.status_code != 200: |
| return ( |
| f"❌ Error: Unable to fetch file. Status code: {response.status_code}" |
| ) |
|
|
| |
| base_name, provided_ext = os.path.splitext(file_name) |
| provided_ext = provided_ext.lower() |
|
|
| |
| if provided_ext and provided_ext in self.SUPPORTED_EXTENSIONS: |
| ext = provided_ext |
| else: |
| |
| content_type = ( |
| response.headers.get("Content-Type", "").split(";")[0].strip() |
| ) |
| guessed_ext = mimetypes.guess_extension(content_type or "") or "" |
|
|
| |
| if guessed_ext in ("", ".bin"): |
| parsed_link = urlparse(link) |
| _, url_ext = os.path.splitext(parsed_link.path) |
| if url_ext.lower() in self.SUPPORTED_EXTENSIONS: |
| ext = url_ext.lower() |
| else: |
| return f"⚠️ Warning: Cannot determine a valid file extension from '{content_type}' or URL. Please retry with an explicit valid filename and extension." |
| else: |
| ext = guessed_ext |
|
|
| |
| file_path = os.path.join(dir_path, base_name + ext) |
| downloaded = 0 |
|
|
| with open(file_path, "wb") as f: |
| for chunk in response.iter_content(chunk_size=1024): |
| if chunk: |
| f.write(chunk) |
| downloaded += len(chunk) |
|
|
| return file_path |
|
|
|
|
| class DuckDuckGoSearchTool(Tool): |
| name = "web_search" |
| description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query.""" |
|
|
| inputs = { |
| "query": { |
| "type": "string", |
| "description": "The search query to run on DuckDuckGo", |
| }, |
| } |
| output_type = "string" |
|
|
| def _configure(self, max_retries: int = 5, retry_sleep: int = 2): |
| self._max_retries = max_retries |
| self._retry_sleep = retry_sleep |
|
|
| def forward(self, query: str) -> str: |
| self._configure() |
|
|
| top_results = 5 |
|
|
| retries = 0 |
| max_retries = getattr(self, "_max_retries", 3) |
| retry_sleep = getattr(self, "_retry_sleep", 2) |
|
|
| while retries < max_retries: |
| try: |
| results = DDGS().text( |
| keywords=query, |
| region="wt-wt", |
| safesearch="moderate", |
| max_results=top_results, |
| ) |
|
|
| if not results: |
| return "No results found." |
|
|
| output_lines = [] |
| for idx, res in enumerate(results[:top_results], start=1): |
| title = res.get("title", "N/A") |
| url = res.get("href", "N/A") |
| snippet = res.get("body", "N/A") |
|
|
| output_lines.append( |
| f"Result {idx}:\n" |
| f"Title: {title}\n" |
| f"URL: {url}\n" |
| f"Snippet: {snippet}\n" |
| ) |
|
|
| output = "\n".join(output_lines) |
|
|
| print(f"-> Tool Result (DuckDuckGo): {output[:1500]}...") |
| return output |
|
|
| except ( |
| DuckDuckGoSearchException, |
| TimeoutException, |
| RatelimitException, |
| ConversationLimitException, |
| ) as e: |
| retries += 1 |
| self._retry_sleep +=2 |
| print( |
| f"⚠️ DuckDuckGo Exception (Attempt {retries}/{max_retries}): {type(e).__name__}: {e}" |
| ) |
| traceback.print_exc() |
| time.sleep(retry_sleep) |
|
|
| except Exception as e: |
| print(f"❌ Unexpected Error: {e}") |
| traceback.print_exc() |
| return f"Unhandled exception during DuckDuckGo search: {e}" |
|
|
| return f"❌ Failed to retrieve results after {max_retries} retries." |
|
|
|
|
| huggingface_ef = embedding_functions.HuggingFaceEmbeddingFunction( |
| model_name="sentence-transformers/all-mpnet-base-v2" |
| ) |
| SUPPORTED_EXTENSIONS = [ |
| ".txt", |
| ".md", |
| ".py", |
| ".pdf", |
| ".json", |
| ".jsonl", |
| ".html", |
| ".htm", |
| ] |
|
|
|
|
| class AddDocumentToVectorStoreTool(Tool): |
| name = "add_document_to_vector_store" |
| description = "Processes a document and adds it to the vector database for semantic search. Automatically chunks files and creates text embeddings to enable powerful content retrieval." |
|
|
| inputs = { |
| "file_path": { |
| "type": "string", |
| "description": "Absolute path to the file to be indexed.", |
| } |
| } |
|
|
| output_type = "string" |
|
|
| def _load_file(self, path: Path): |
| """Select the right loader for the file extension.""" |
| if path.suffix == ".pdf": |
| return PyPDFLoader(str(path)).load() |
| elif path.suffix == ".json": |
| return JSONLoader(str(path), jq_schema=".").load() |
| elif path.suffix in [".md"]: |
| return UnstructuredFileLoader(str(path)).load() |
| elif path.suffix in [".html", ".htm"]: |
| return BSHTMLLoader(str(path)).load() |
| else: |
| return TextLoader(str(path)).load() |
|
|
| def forward(self, file_path: str) -> str: |
| print(f"📄 Adding document to vector store: {file_path}") |
| try: |
| collection_name = "vectorstore" |
| path = Path(file_path) |
| if not path.exists() or path.suffix not in SUPPORTED_EXTENSIONS: |
| return f"Unsupported or missing file: {file_path}" |
|
|
| docs = self._load_file(path) |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=500, chunk_overlap=50 |
| ) |
| split_docs = text_splitter.split_documents(docs) |
|
|
| client = chromadb.Client( |
| chromadb.config.Settings( |
| persist_directory="./chroma_store", |
| ) |
| ) |
|
|
| collection = client.get_or_create_collection( |
| name=collection_name, |
| configuration={"embedding_function": huggingface_ef}, |
| ) |
|
|
| texts = [doc.page_content for doc in split_docs] |
| metadatas = [doc.metadata for doc in split_docs] |
|
|
| collection.add( |
| documents=texts, |
| metadatas=metadatas, |
| ids=[f"{path.stem}_{i}" for i in range(len(texts))], |
| ) |
|
|
| return f"✅ Successfully added {len(texts)} chunks from '{file_path}' to collection '{collection_name}'." |
|
|
| except Exception as e: |
| print(f"❌ Error in add_to_vector_store: {e}") |
| traceback.print_exc() |
| return f"Error: {e}" |
|
|
|
|
| class QueryVectorStoreTool(Tool): |
| name = "query_downloaded_documents" |
| description = "Performs semantic searches across your downloaded documents. Use detailed queries to find specific information, concepts, or answers from your collected resources." |
|
|
| inputs = { |
| "query": { |
| "type": "string", |
| "description": "The search query. Ensure this is constructed intelligently so to retrieve the most relevant outputs.", |
| } |
| } |
| output_type = "string" |
|
|
| def forward(self, query: str) -> str: |
| collection_name = "vectorstore" |
|
|
| k = 5 |
|
|
| print(f"🔎 Querying vector store '{collection_name}' with: '{query}'") |
| try: |
| client = chromadb.Client( |
| chromadb.config.Settings( |
| persist_directory="./chroma_store", |
| ) |
| ) |
| collection = client.get_collection(name=collection_name) |
|
|
| results = collection.query( |
| query_texts=[query], |
| n_results=k, |
| ) |
|
|
| formatted = [] |
| for i in range(len(results["documents"][0])): |
| doc = results["documents"][0][i] |
| metadata = results["metadatas"][0][i] |
| formatted.append( |
| f"Result {i+1}:\n" f"Content: {doc}\n" f"Metadata: {metadata}\n" |
| ) |
|
|
| return "\n".join(formatted) or "No relevant documents found." |
|
|
| except Exception as e: |
| print(f"❌ Error in query_vector_store: {e}") |
| traceback.print_exc() |
| return f"Error querying vector store: {e}" |
|
|
|
|
| @tool |
| def image_question_answering(image_path: str, prompt: str) -> str: |
| """ |
| Analyzes images and answers specific questions about their content. Can identify objects, read text, describe scenes, or interpret visual information based on your questions. |
| |
| Args: |
| image_path: The path to the image file |
| prompt: The question to ask about the image |
| |
| Returns: |
| A string answer generated by the local Ollama model |
| """ |
| |
| file_extension = image_path.lower().split(".")[-1] |
| if file_extension not in ["jpg", "jpeg", "png", "bmp", "gif", "webp"]: |
| return "Unsupported file type. Please provide an image." |
|
|
| path = Path(image_path) |
| if not path.exists(): |
| return f"File not found at: {image_path}" |
|
|
| |
| response = chat( |
| model="llava", |
| messages=[ |
| { |
| "role": "user", |
| "content": prompt, |
| "images": [path], |
| }, |
| ], |
| options={"temperature": 0.2}, |
| ) |
|
|
| return response.message.content.strip() |
|
|
|
|
| class VisitWebpageTool(Tool): |
| name = "visit_webpage" |
| description = "Loads a webpage from a URL and converts its content to markdown format. Use this to browse websites, extract information, or identify downloadable resources from a specific web address." |
| inputs = { |
| "url": { |
| "type": "string", |
| "description": "The url of the webpage to visit.", |
| } |
| } |
| output_type = "string" |
|
|
| def forward(self, url: str) -> str: |
| try: |
| from urllib.parse import urlparse |
|
|
| import requests |
| from bs4 import BeautifulSoup |
| from markdownify import markdownify |
| from requests.exceptions import RequestException |
| from smolagents.utils import truncate_content |
| except ImportError as e: |
| raise ImportError( |
| "You must install packages `markdownify`, `requests`, and `beautifulsoup4` to run this tool: for instance run `pip install markdownify requests beautifulsoup4`." |
| ) from e |
|
|
| try: |
| |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
| } |
| response = requests.get(url, headers=headers, timeout=20) |
| response.raise_for_status() |
|
|
| |
| soup = BeautifulSoup(response.text, "html.parser") |
|
|
| |
| domain = urlparse(url).netloc |
|
|
| |
| self._remove_clutter(soup) |
|
|
| |
| main_content = self._extract_main_content(soup) |
|
|
| if main_content: |
| |
| markdown_content = markdownify(str(main_content)).strip() |
| else: |
| |
| markdown_content = markdownify(str(soup)).strip() |
|
|
| |
| markdown_content = self._clean_markdown(markdown_content) |
|
|
| |
| result = f"Content from {domain}:\n\n{markdown_content}" |
|
|
| return truncate_content(result, 40000) |
|
|
| except requests.exceptions.Timeout: |
| return "The request timed out. Please try again later or check the URL." |
| except RequestException as e: |
| return f"Error fetching the webpage: {str(e)}" |
| except Exception as e: |
| return f"An unexpected error occurred: {str(e)}" |
|
|
| def _remove_clutter(self, soup): |
| """Remove common elements that clutter web pages.""" |
| |
| clutter_selectors = [ |
| "header", |
| "footer", |
| "nav", |
| ".nav", |
| ".navigation", |
| ".menu", |
| ".sidebar", |
| ".footer", |
| ".header", |
| "#footer", |
| "#header", |
| "#nav", |
| "#sidebar", |
| ".widget", |
| ".cookie", |
| ".cookies", |
| ".ad", |
| ".ads", |
| ".advertisement", |
| "script", |
| "style", |
| "noscript", |
| "iframe", |
| ".social", |
| ".share", |
| ".comment", |
| ".comments", |
| ".subscription", |
| ".newsletter", |
| '[role="banner"]', |
| '[role="navigation"]', |
| '[role="complementary"]', |
| ] |
|
|
| for selector in clutter_selectors: |
| for element in soup.select(selector): |
| element.decompose() |
|
|
| |
| for hidden in soup.select( |
| '[style*="display: none"], [style*="display:none"], [style*="visibility: hidden"], [style*="visibility:hidden"], [hidden]' |
| ): |
| hidden.decompose() |
|
|
| def _extract_main_content(self, soup): |
| """Try to identify and extract the main content of the page.""" |
| |
| main_content_selectors = [ |
| "main", |
| '[role="main"]', |
| "article", |
| ".content", |
| ".main-content", |
| ".post-content", |
| "#content", |
| "#main", |
| "#main-content", |
| ".article", |
| ".post", |
| ".entry", |
| ".page-content", |
| ".entry-content", |
| ] |
|
|
| |
| for selector in main_content_selectors: |
| main_content = soup.select(selector) |
| if main_content: |
| |
| if len(main_content) > 1: |
| return max(main_content, key=lambda x: len(x.get_text())) |
| return main_content[0] |
|
|
| |
| paragraphs = soup.find_all("p") |
| if paragraphs: |
| |
| parents = {} |
| for p in paragraphs: |
| if p.parent: |
| if p.parent not in parents: |
| parents[p.parent] = 0 |
| parents[p.parent] += 1 |
|
|
| if parents: |
| |
| return max(parents.items(), key=lambda x: x[1])[0] |
|
|
| |
| return None |
|
|
| def _clean_markdown(self, content): |
| """Clean up the markdown content.""" |
| |
| content = re.sub(r"\n{3,}", "\n\n", content) |
|
|
| |
| content = re.sub(r"(\[.*?\]\(.*?\))\s*\1+", r"\1", content) |
|
|
| |
| lines = content.split("\n") |
| filtered_lines = [] |
|
|
| |
| short_line_threshold = 40 |
| consecutive_short_lines = 0 |
| max_consecutive_short_lines = 3 |
|
|
| for line in lines: |
| stripped_line = line.strip() |
| if len( |
| stripped_line |
| ) < short_line_threshold and not stripped_line.startswith("#"): |
| consecutive_short_lines += 1 |
| if consecutive_short_lines > max_consecutive_short_lines: |
| continue |
| else: |
| consecutive_short_lines = 0 |
|
|
| filtered_lines.append(line) |
|
|
| content = "\n".join(filtered_lines) |
|
|
| |
| seen_headers = set() |
| lines = content.split("\n") |
| filtered_lines = [] |
|
|
| for line in lines: |
| if line.startswith("#"): |
| header_text = line.strip() |
| if header_text in seen_headers: |
| continue |
| seen_headers.add(header_text) |
| filtered_lines.append(line) |
|
|
| content = "\n".join(filtered_lines) |
|
|
| |
| footer_patterns = [ |
| r"^copyright", |
| r"^©", |
| r"^all rights reserved", |
| r"^terms", |
| r"^privacy policy", |
| r"^contact us", |
| r"^follow us", |
| r"^social media", |
| r"^disclaimer", |
| ] |
|
|
| footer_pattern = "|".join(footer_patterns) |
| lines = content.split("\n") |
| filtered_lines = [] |
|
|
| for line in lines: |
| if not re.search(footer_pattern, line.lower()): |
| filtered_lines.append(line) |
|
|
| content = "\n".join(filtered_lines) |
|
|
| return content |
|
|
|
|
| class ArxivSearchTool(Tool): |
| name = "arxiv_search" |
| description = """Searches arXiv for academic papers and returns structured information including titles, authors, publication dates, abstracts, and download links.""" |
|
|
| inputs = { |
| "query": { |
| "type": "string", |
| "description": "A research-related query (e.g., 'AI regulation')", |
| }, |
| "from_date": { |
| "type": "string", |
| "description": "Optional search start date in format (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')", |
| "nullable": True, |
| }, |
| "to_date": { |
| "type": "string", |
| "description": "Optional search end date in (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')", |
| "nullable": True, |
| }, |
| } |
|
|
| output_type = "string" |
|
|
| def forward( |
| self, |
| query: str, |
| from_date: str = None, |
| to_date: str = None, |
| ) -> str: |
| |
| url = build_arxiv_url(query, from_date, to_date, size=50) |
|
|
| |
| try: |
| papers = fetch_and_parse_arxiv(url) |
| except Exception as e: |
| return f"❌ Failed to fetch or parse arXiv results: {e}" |
|
|
| if not papers: |
| return "No results found for your query." |
|
|
| |
| output_lines = [] |
| for idx, p in enumerate(papers, start=1): |
| output_lines += [ |
| f"🔍 RESULT {idx}", |
| f"Title : {p['title']}", |
| f"Authors : {p['authors']}", |
| f"Published : {p['published']}", |
| f"Summary : {p['abstract'][:500]}{'...' if len(p['abstract'])>500 else ''}", |
| f"Entry ID : {p['entry_link']}", |
| f"Download link: {p['download_link']}", |
| "", |
| ] |
|
|
| return "\n".join(output_lines).strip() |
|
|
|
|
| def fetch_and_parse_arxiv(url: str) -> List[Dict[str, str]]: |
| """ |
| Fetches the given arXiv advanced‐search URL, parses the HTML, |
| and returns a list of results. Each result is a dict containing: |
| - title |
| - authors |
| - published |
| - abstract |
| - entry_link |
| - doi (or "[N/A]" if none) |
| """ |
| resp = requests.get(url) |
| resp.raise_for_status() |
| soup = BeautifulSoup(resp.text, "html.parser") |
|
|
| results = [] |
| for li in soup.find_all("li", class_="arxiv-result"): |
| |
| t = li.find("p", class_="title") |
| title = t.get_text(strip=True) if t else "" |
|
|
| |
| a = li.find("p", class_="authors") |
| authors = a.get_text(strip=True).replace("Authors:", "").strip() if a else "" |
|
|
| |
| ab = li.find("span", class_="abstract-full") |
| abstract = ( |
| ab.get_text(strip=True).replace("Abstract:", "").strip() if ab else "" |
| ) |
|
|
| |
| d = li.find("p", class_="is-size-7") |
| published = d.get_text(strip=True) if d else "" |
|
|
| |
| lt = li.find("p", class_="list-title") |
| entry_link = lt.find("a")["href"] if lt and lt.find("a") else "" |
|
|
| |
| idblock = li.find("p", class_="list-identifier") |
| if idblock: |
| for a_tag in idblock.find_all("a", href=True): |
| if "doi.org" in a_tag["href"]: |
| doi = a_tag["href"] |
| break |
|
|
| results.append( |
| { |
| "title": title, |
| "authors": authors, |
| "published": published, |
| "abstract": abstract, |
| "entry_link": entry_link, |
| "download_link": ( |
| entry_link.replace("abs", "pdf") if "abs" in entry_link else "N/A" |
| ), |
| } |
| ) |
|
|
| return results |
|
|
|
|
| def build_arxiv_url( |
| query: str, from_date: str = None, to_date: str = None, size: int = 50 |
| ) -> str: |
| """ |
| Build an arXiv advanced-search URL matching the exact segment order: |
| 1) ?advanced |
| 2) terms-0-operator=AND |
| 3) terms-0-term=… |
| 4) terms-0-field=all |
| 5) classification-physics_archives=all |
| 6) classification-include_cross_list=include |
| [ optional date‐range block ] |
| 7) abstracts=show |
| 8) size=… |
| 9) order=-announced_date_first |
| If from_date or to_date is None, the date-range block is omitted. |
| """ |
| base = "https://arxiv.org/search/advanced?advanced=" |
| parts = [ |
| "&terms-0-operator=AND", |
| f"&terms-0-term={quote_plus(query)}", |
| "&terms-0-field=all", |
| "&classification-physics_archives=all", |
| "&classification-include_cross_list=include", |
| ] |
|
|
| |
| if from_date and to_date: |
| parts += [ |
| "&date-year=", |
| "&date-filter_by=date_range", |
| f"&date-from_date={from_date}", |
| f"&date-to_date={to_date}", |
| "&date-date_type=submitted_date", |
| ] |
|
|
| parts += [ |
| "&abstracts=show", |
| f"&size={size}", |
| "&order=-announced_date_first", |
| ] |
|
|
| return base + "".join(parts) |
|
|