| | from selenium import webdriver |
| | from selenium.webdriver.common.by import By |
| | from selenium.webdriver.support.ui import WebDriverWait |
| | from selenium.webdriver.support import expected_conditions as EC |
| | from selenium.common.exceptions import TimeoutException, StaleElementReferenceException |
| | from typing import Dict, Any, List, Optional, Tuple, Union |
| | from pydantic import Field |
| | from .tool import Tool,Toolkit |
| | from ..core.module import BaseModule |
| | from evoagentx.core.logging import logger |
| | import html2text |
| | import time |
| |
|
| | |
| | SELECTOR_MAP = { |
| | "css": By.CSS_SELECTOR, |
| | "xpath": By.XPATH, |
| | "id": By.ID, |
| | "class": By.CLASS_NAME, |
| | "name": By.NAME, |
| | "tag": By.TAG_NAME, |
| | } |
| |
|
| | class BrowserBase(BaseModule): |
| | """ |
| | A tool for interacting with web browsers using Selenium. |
| | Allows agents to navigate to URLs, interact with elements, extract information, |
| | and more from web pages. |
| | |
| | Key Features: |
| | - Auto-initialization: Browser is automatically initialized when any method is first called |
| | - Auto-cleanup: Browser is automatically closed when the instance is destroyed |
| | - No manual initialization or cleanup required |
| | """ |
| | |
| | timeout: int = Field(default=10, description="Default timeout in seconds for browser operations") |
| | browser_type: str = Field(default="chrome", description="Type of browser to use ('chrome', 'firefox', 'safari', 'edge')") |
| | headless: bool = Field(default=False, description="Whether to run the browser in headless mode") |
| | timeout: int = Field(default=10, description="Default timeout in seconds for browser operations") |
| | |
| | def __init__( |
| | self, |
| | name: str = "Browser Tool", |
| | browser_type: str = "chrome", |
| | headless: bool = False, |
| | timeout: int = 10, |
| | **kwargs |
| | ): |
| | """ |
| | Initialize the browser tool with Selenium WebDriver. |
| | |
| | Args: |
| | name (str): Name of the tool |
| | browser_type (str): Type of browser to use ('chrome', 'firefox', 'safari', 'edge') |
| | headless (bool): Whether to run the browser in headless mode |
| | timeout (int): Default timeout in seconds for browser operations |
| | **kwargs: Additional keyword arguments for parent class initialization |
| | """ |
| | |
| | super().__init__(name=name, timeout=timeout, browser_type=browser_type, headless=headless, **kwargs) |
| | self.driver = None |
| | |
| | |
| | self.element_references = {} |
| | |
| | |
| | |
| | def _check_driver_initialized(self) -> Union[None, Dict[str, Any]]: |
| | """ |
| | Check if the browser driver is initialized. If not, initialize it automatically. |
| | |
| | Returns: |
| | Union[None, Dict[str, Any]]: None if driver is initialized, error response if initialization fails |
| | """ |
| | if not self.driver: |
| | |
| | init_result = self.initialize_browser() |
| | if init_result["status"] == "error": |
| | return init_result |
| | return None |
| | |
| | def _get_selector_by_type(self, selector_type: str) -> Union[str, Dict[str, Any]]: |
| | """ |
| | Get the Selenium By selector for the given selector type. |
| | |
| | Args: |
| | selector_type (str): Type of selector ('css', 'xpath', 'id', 'class', 'name', 'tag') |
| | |
| | Returns: |
| | Union[str, Dict[str, Any]]: The By selector or error response |
| | """ |
| | by_type = SELECTOR_MAP.get(selector_type.lower()) |
| | if not by_type: |
| | return {"status": "error", "message": f"Invalid selector type: {selector_type}"} |
| | return by_type |
| | |
| | def _wait_for_page_load(self, timeout: Optional[int] = None) -> bool: |
| | """ |
| | Wait for the page to load completely. |
| | |
| | Args: |
| | timeout (int, optional): Custom timeout for this operation |
| | |
| | Returns: |
| | bool: True if page loaded, False if timed out |
| | """ |
| | timeout = timeout or self.timeout |
| | try: |
| | WebDriverWait(self.driver, timeout).until( |
| | lambda driver: driver.execute_script("return document.readyState") == "complete" |
| | ) |
| | return True |
| | except TimeoutException: |
| | return False |
| | |
| | def _parse_element_reference(self, ref: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: |
| | """ |
| | Parse an element reference into selector type and selector. |
| | |
| | Args: |
| | ref (str): Element reference ID from the page snapshot |
| | |
| | Returns: |
| | Tuple[Optional[str], Optional[str], Optional[str]]: |
| | (selector_type, selector, error_message) - error_message is None if successful |
| | """ |
| | if not self.element_references: |
| | return None, None, "No page snapshot available. Use browser_snapshot or navigate_to_url first." |
| | |
| | stored_ref = self.element_references.get(ref) |
| | if not stored_ref: |
| | return None, None, f"Element reference '{ref}' not found. Use browser_snapshot or navigate_to_url first." |
| | |
| | |
| | if ":" in stored_ref: |
| | ref_parts = stored_ref.split(":", 1) |
| | if len(ref_parts) != 2: |
| | return None, None, f"Invalid stored reference format: {stored_ref}" |
| | |
| | selector_type, selector = ref_parts |
| | return selector_type, selector, None |
| | |
| | return None, None, f"Invalid stored reference format: {stored_ref}" |
| | |
| | def _find_element_with_wait(self, by_type: str, selector: str, |
| | timeout: Optional[int] = None, |
| | wait_condition=EC.presence_of_element_located) -> Tuple[Optional[Any], Optional[str]]: |
| | """ |
| | Find an element on the page with wait condition. |
| | |
| | Args: |
| | by_type (str): Selenium By selector type |
| | selector (str): The selector string |
| | timeout (int, optional): Custom timeout for this operation |
| | wait_condition: The EC condition to wait for |
| | |
| | Returns: |
| | Tuple[Optional[Any], Optional[str]]: (element, error_message) - error_message is None if successful |
| | """ |
| | timeout = timeout or self.timeout |
| | try: |
| | element = WebDriverWait(self.driver, timeout).until( |
| | wait_condition((by_type, selector)) |
| | ) |
| | return element, None |
| | except TimeoutException: |
| | return None, f"Element not found or condition not met with selector: {selector}" |
| | except Exception as e: |
| | logger.error(f"Error finding element {selector}: {str(e)}") |
| | return None, str(e) |
| | |
| | def _handle_function_params(self, function_params: Optional[list], |
| | function_name: str, |
| | param_mapping: Dict[str, str]) -> Dict[str, Any]: |
| | """ |
| | Extract parameters from nested function_params format. |
| | |
| | Args: |
| | function_params (list, optional): Nested function parameters |
| | function_name (str): The function name to look for |
| | param_mapping (Dict[str, str]): Mapping of parameter names |
| | |
| | Returns: |
| | Dict[str, Any]: Extracted parameters |
| | """ |
| | result = {} |
| | if not function_params: |
| | return result |
| | |
| | for param in function_params: |
| | fn_name = param.get("function_name", "") |
| | if fn_name == function_name or fn_name in param_mapping.get("alt_names", []): |
| | args = param.get("function_args", {}) |
| | for param_name, result_name in param_mapping.items(): |
| | if param_name == "alt_names": |
| | continue |
| | if param_name in args: |
| | result[result_name] = args[param_name] |
| | break |
| | |
| | return result |
| | |
| | |
| | |
| | def initialize_browser(self, function_params: list = None) -> Dict[str, Any]: |
| | """ |
| | Start or restart a browser session. This method is called automatically when needed. |
| | |
| | Note: This method is now called automatically by other browser methods when the browser |
| | is not initialized. Manual initialization is no longer required. |
| | |
| | This function supports multiple parameter styles: |
| | 1. Standard style: no parameters |
| | 2. Nested function_params style: |
| | function_params=[{"function_name": "initialize_browser", "function_args": {}}] |
| | |
| | Args: |
| | function_params (list, optional): Nested function parameters |
| | |
| | Returns: |
| | Dict[str, Any]: Status information about the browser initialization |
| | """ |
| | try: |
| | if self.driver: |
| | |
| | try: |
| | self.driver.quit() |
| | except Exception as e: |
| | logger.warning(f"Error closing existing browser session: {str(e)}") |
| | |
| | options = None |
| | if self.browser_type == "chrome": |
| | from selenium.webdriver.chrome.options import Options |
| | from selenium.webdriver.chrome.service import Service |
| | from webdriver_manager.chrome import ChromeDriverManager |
| | options = Options() |
| | if self.headless: |
| | options.add_argument("--headless") |
| | options.add_argument("--no-sandbox") |
| | options.add_argument("--disable-dev-shm-usage") |
| | |
| | |
| | service = Service(ChromeDriverManager().install()) |
| | self.driver = webdriver.Chrome(service=service, options=options) |
| | elif self.browser_type == "firefox": |
| | from selenium.webdriver.firefox.options import Options |
| | options = Options() |
| | if self.headless: |
| | options.add_argument("--headless") |
| | self.driver = webdriver.Firefox(options=options) |
| | elif self.browser_type == "safari": |
| | self.driver = webdriver.Safari() |
| | elif self.browser_type == "edge": |
| | from selenium.webdriver.edge.options import Options |
| | options = Options() |
| | if self.headless: |
| | options.add_argument("--headless") |
| | self.driver = webdriver.Edge(options=options) |
| | else: |
| | return {"status": "error", "message": f"Unsupported browser type: {self.browser_type}"} |
| | |
| | self.driver.set_page_load_timeout(self.timeout) |
| | return {"status": "success", "message": f"Browser {self.browser_type} initialized successfully"} |
| | except Exception as e: |
| | logger.error(f"Error initializing browser: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def navigate_to_url(self, url: str = None, timeout: int = None, |
| | function_params: list = None) -> Dict[str, Any]: |
| | """ |
| | Navigate to a URL and capture a snapshot of the page. This provides element references used for interaction. |
| | |
| | This function supports multiple parameter styles: |
| | 1. Standard style: url parameter |
| | 2. Nested function_params style: |
| | function_params=[{"function_name": "navigate_to_url", "function_args": {"url": "..."}}] |
| | |
| | Args: |
| | url (str, optional): The complete URL (with https://) to navigate to |
| | timeout (int, optional): Custom timeout in seconds (default: 10) |
| | function_params (list, optional): Nested function parameters |
| | |
| | Returns: |
| | Dict[str, Any]: Information about the navigation result and page snapshot |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | |
| | if function_params and not url: |
| | params = self._handle_function_params( |
| | function_params, |
| | "navigate_to_url", |
| | {"url": "url", "timeout": "timeout", "alt_names": ["browser_navigate"]} |
| | ) |
| | url = params.get("url") |
| | timeout = params.get("timeout", timeout) |
| | |
| | if not url: |
| | return {"status": "error", "message": "URL parameter is required"} |
| | |
| | timeout = timeout or self.timeout |
| | try: |
| | self.driver.get(url) |
| | |
| | |
| | page_loaded = self._wait_for_page_load(timeout) |
| | if not page_loaded: |
| | logger.warning(f"Page load timeout for URL: {url}, but continuing with snapshot") |
| | |
| | |
| | snapshot_result = self.browser_snapshot() |
| | |
| | if snapshot_result["status"] == "success": |
| | return { |
| | "status": "success", |
| | "url": url, |
| | "title": self.driver.title, |
| | "current_url": self.driver.current_url, |
| | "snapshot": { |
| | "interactive_elements": snapshot_result.get("interactive_elements", []) |
| | } |
| | } |
| | else: |
| | |
| | return { |
| | "status": "partial_success", |
| | "url": url, |
| | "title": self.driver.title, |
| | "current_url": self.driver.current_url, |
| | "snapshot_error": snapshot_result.get("message", "Unknown error capturing snapshot") |
| | } |
| | |
| | except TimeoutException: |
| | return {"status": "timeout", "message": f"Timed out loading URL: {url}"} |
| | except Exception as e: |
| | logger.error(f"Error navigating to URL {url}: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def find_element(self, selector: str, selector_type: str = "css", timeout: int = None) -> Dict[str, Any]: |
| | """ |
| | Find an element on the current page and return information about it. |
| | |
| | Args: |
| | selector (str): The selector to find the element |
| | selector_type (str): Type of selector ('css', 'xpath', 'id', 'class', 'name', 'tag') |
| | timeout (int, optional): Custom timeout for this operation |
| | |
| | Returns: |
| | Dict[str, Any]: Information about the found element |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | timeout = timeout or self.timeout |
| | |
| | |
| | by_type = self._get_selector_by_type(selector_type) |
| | if isinstance(by_type, dict): |
| | return by_type |
| | |
| | try: |
| | |
| | element, error = self._find_element_with_wait( |
| | by_type, selector, timeout, EC.presence_of_element_located |
| | ) |
| | if error: |
| | return {"status": "not_found", "message": f"Element not found with {selector_type}: {selector}"} |
| | |
| | |
| | element_properties = self._extract_element_properties(element, selector) |
| | |
| | return { |
| | "status": "success", |
| | "element": element_properties |
| | } |
| | except Exception as e: |
| | logger.error(f"Error finding element {selector}: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def _extract_element_properties(self, element, selector: str) -> Dict[str, Any]: |
| | """ |
| | Extract common properties from a WebElement. |
| | |
| | Args: |
| | element: The Selenium WebElement |
| | selector (str): The selector used to find the element (for error messages) |
| | |
| | Returns: |
| | Dict[str, Any]: Element properties |
| | """ |
| | element_properties = { |
| | "text": element.text, |
| | "tag_name": element.tag_name, |
| | "is_displayed": element.is_displayed(), |
| | "is_enabled": element.is_enabled(), |
| | } |
| | |
| | |
| | for attr in ["href", "id", "class"]: |
| | try: |
| | value = element.get_attribute(attr) |
| | if value: |
| | element_properties[attr] = value |
| | except StaleElementReferenceException: |
| | logger.warning(f"Element became stale when trying to get {attr} attribute for {selector}") |
| | except Exception as e: |
| | logger.warning(f"Could not get {attr} attribute for {selector}: {str(e)}") |
| | |
| | return element_properties |
| | |
| | def find_multiple_elements(self, selector: str, selector_type: str = "css", timeout: int = None) -> Dict[str, Any]: |
| | """ |
| | Find multiple elements on the current page and return information about them. |
| | |
| | Args: |
| | selector (str): The selector to find the elements |
| | selector_type (str): Type of selector ('css', 'xpath', 'id', 'class', 'name', 'tag') |
| | timeout (int, optional): Custom timeout for this operation |
| | |
| | Returns: |
| | Dict[str, Any]: Information about the found elements |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | timeout = timeout or self.timeout |
| | |
| | |
| | by_type = self._get_selector_by_type(selector_type) |
| | if isinstance(by_type, dict): |
| | return by_type |
| | |
| | try: |
| | |
| | element, error = self._find_element_with_wait( |
| | by_type, selector, timeout, EC.presence_of_element_located |
| | ) |
| | if error: |
| | return {"status": "not_found", "message": f"No elements found with {selector_type}: {selector}"} |
| | |
| | |
| | elements = self.driver.find_elements(by_type, selector) |
| | |
| | |
| | elements_properties = [] |
| | for idx, element in enumerate(elements): |
| | try: |
| | element_properties = self._extract_element_properties(element, f"{selector}[{idx}]") |
| | element_properties["index"] = idx |
| | elements_properties.append(element_properties) |
| | except StaleElementReferenceException: |
| | logger.warning(f"Element {idx} became stale while extracting properties") |
| | except Exception as e: |
| | logger.warning(f"Error extracting properties for element {idx}: {str(e)}") |
| | |
| | return { |
| | "status": "success", |
| | "count": len(elements_properties), |
| | "elements": elements_properties |
| | } |
| | except Exception as e: |
| | logger.error(f"Error finding elements {selector}: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def click_element(self, selector: str, selector_type: str = "css", timeout: int = None) -> Dict[str, Any]: |
| | """ |
| | Click on an element on the current page. |
| | |
| | Args: |
| | selector (str): The selector to find the element |
| | selector_type (str): Type of selector ('css', 'xpath', 'id', 'class', 'name', 'tag') |
| | timeout (int, optional): Custom timeout for this operation |
| | |
| | Returns: |
| | Dict[str, Any]: Result of the click operation |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | timeout = timeout or self.timeout |
| | |
| | |
| | by_type = self._get_selector_by_type(selector_type) |
| | if isinstance(by_type, dict): |
| | return by_type |
| | |
| | try: |
| | |
| | element, error = self._find_element_with_wait( |
| | by_type, selector, timeout, EC.element_to_be_clickable |
| | ) |
| | if error: |
| | return {"status": "not_found", "message": f"Element not clickable with {selector_type}: {selector}"} |
| | |
| | element.click() |
| | |
| | |
| | page_loaded = self._wait_for_page_load(timeout) |
| | if not page_loaded: |
| | return { |
| | "status": "partial_success", |
| | "message": "Element clicked, but page load timed out", |
| | "selector": selector, |
| | "current_url": self.driver.current_url |
| | } |
| | |
| | return { |
| | "status": "success", |
| | "message": f"Clicked element with {selector_type}: {selector}", |
| | "current_url": self.driver.current_url, |
| | "title": self.driver.title |
| | } |
| | except Exception as e: |
| | logger.error(f"Error clicking element {selector}: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def input_text(self, element: str = None, ref: str = None, text: str = None, |
| | submit: bool = False, slowly: bool = True, |
| | function_params: list = None) -> Dict[str, Any]: |
| | """ |
| | Type text into a form field, search box, or other input element using a reference ID from a snapshot. |
| | |
| | This function only works with element references from a snapshot. Use browser_snapshot |
| | or navigate_to_url first to capture the page elements. |
| | |
| | This function supports multiple parameter styles: |
| | 1. Standard style: element (description), ref (element ID), text |
| | 2. Nested function_params style: |
| | function_params=[{"function_name": "browser_type", "function_args": {...}}] |
| | |
| | Args: |
| | element (str, optional): Human-readable description of the element (e.g., 'Search field', 'Username input') |
| | ref (str, optional): Element ID from the page snapshot (e.g., 'e0', 'e1', 'e2') - NOT a CSS selector |
| | text (str, optional): Text to input into the element |
| | submit (bool): Press Enter after typing to submit forms (default: false) |
| | slowly (bool): Type one character at a time to trigger JS events (default: true) |
| | function_params (list, optional): Nested function parameters |
| | |
| | Returns: |
| | Dict[str, Any]: Result of the text input operation |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | |
| | if function_params: |
| | params = self._handle_function_params( |
| | function_params, |
| | "input_text", |
| | {"element": "element", "ref": "ref", "text": "text", |
| | "submit": "submit", "slowly": "slowly", "alt_names": ["browser_type"]} |
| | ) |
| | element = params.get("element", element) |
| | ref = params.get("ref", ref) |
| | text = params.get("text", text) |
| | if "submit" in params: |
| | submit = params["submit"] |
| | if "slowly" in params: |
| | slowly = params["slowly"] |
| | |
| | if not ref or not text: |
| | return {"status": "error", "message": "Both ref and text parameters are required"} |
| | |
| | |
| | selector_type, selector, error = self._parse_element_reference(ref) |
| | if error: |
| | return {"status": "error", "message": error} |
| | |
| | |
| | element_desc = element or ref |
| | |
| | |
| | by_type = self._get_selector_by_type(selector_type) |
| | if isinstance(by_type, dict): |
| | return by_type |
| | |
| | try: |
| | |
| | web_element, error = self._find_element_with_wait( |
| | by_type, selector, self.timeout, EC.element_to_be_clickable |
| | ) |
| | if error: |
| | return {"status": "not_found", "message": f"Element not found: {element_desc}"} |
| | |
| | |
| | web_element.clear() |
| | |
| | |
| | if slowly: |
| | |
| | for char in text: |
| | web_element.send_keys(char) |
| | |
| | time.sleep(0.05) |
| | else: |
| | |
| | web_element.send_keys(text) |
| | |
| | |
| | if submit: |
| | from selenium.webdriver.common.keys import Keys |
| | web_element.send_keys(Keys.ENTER) |
| | |
| | |
| | page_loaded = self._wait_for_page_load(self.timeout) |
| | if not page_loaded: |
| | |
| | self.browser_snapshot() |
| | return { |
| | "status": "partial_success", |
| | "message": "Text entered and submitted, but page load timed out", |
| | "element": element_desc, |
| | "text": text |
| | } |
| | |
| | |
| | snapshot_result = self.browser_snapshot() |
| | if snapshot_result["status"] != "success": |
| | logger.warning(f"Failed to capture snapshot after form submission: {snapshot_result.get('message')}") |
| | |
| | return { |
| | "status": "success", |
| | "message": f"Successfully input text into {element_desc}" + |
| | (" and submitted" if submit else ""), |
| | "element": element_desc, |
| | "text": text |
| | } |
| | |
| | except TimeoutException: |
| | return {"status": "not_found", "message": f"Element not found: {element_desc}"} |
| | except Exception as e: |
| | logger.error(f"Error inputting text to element {element_desc}: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def get_page_content(self) -> Dict[str, Any]: |
| | """ |
| | Get the current page title, URL and body content. |
| | |
| | Returns: |
| | Dict[str, Any]: Information about the current page |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | try: |
| | |
| | title = self.driver.title |
| | current_url = self.driver.current_url |
| | |
| | |
| | body_content = self.driver.execute_script(""" |
| | var body = document.body; |
| | return body ? body.outerHTML : ""; |
| | """) |
| | |
| | |
| | element_summary = self.driver.execute_script(""" |
| | // Get common interactive elements |
| | var summary = { |
| | links: [], |
| | buttons: [], |
| | inputs: [], |
| | forms: [] |
| | }; |
| | |
| | // Get links |
| | var links = document.querySelectorAll('a'); |
| | for (var i = 0; i < Math.min(links.length, 20); i++) { |
| | var link = links[i]; |
| | summary.links.push({ |
| | text: link.textContent.trim().substring(0, 50), |
| | href: link.getAttribute('href'), |
| | id: link.id, |
| | class: link.className |
| | }); |
| | } |
| | |
| | // Get buttons |
| | var buttons = document.querySelectorAll('button, input[type="button"], input[type="submit"]'); |
| | for (var i = 0; i < Math.min(buttons.length, 20); i++) { |
| | var button = buttons[i]; |
| | summary.buttons.push({ |
| | text: button.textContent ? button.textContent.trim().substring(0, 50) : button.value, |
| | id: button.id, |
| | class: button.className, |
| | type: button.type |
| | }); |
| | } |
| | |
| | // Get inputs |
| | var inputs = document.querySelectorAll('input:not([type="button"]):not([type="submit"]), textarea, select'); |
| | for (var i = 0; i < Math.min(inputs.length, 20); i++) { |
| | var input = inputs[i]; |
| | summary.inputs.push({ |
| | type: input.type, |
| | name: input.name, |
| | id: input.id, |
| | placeholder: input.placeholder |
| | }); |
| | } |
| | |
| | // Get forms |
| | var forms = document.querySelectorAll('form'); |
| | for (var i = 0; i < Math.min(forms.length, 10); i++) { |
| | var form = forms[i]; |
| | summary.forms.push({ |
| | id: form.id, |
| | action: form.action, |
| | method: form.method |
| | }); |
| | } |
| | |
| | return summary; |
| | """) |
| | |
| | return { |
| | "status": "success", |
| | "title": title, |
| | "url": current_url, |
| | "body_content": body_content, |
| | "element_summary": element_summary |
| | } |
| | except Exception as e: |
| | logger.error(f"Error getting page content: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def switch_to_frame(self, frame_reference: str, reference_type: str = "index") -> Dict[str, Any]: |
| | """ |
| | Switch to a frame on the page. |
| | |
| | Args: |
| | frame_reference (str): Reference to the frame (index, name, or ID) |
| | reference_type (str): Type of reference ('index', 'name', 'id', 'element') |
| | |
| | Returns: |
| | Dict[str, Any]: Result of the frame switch operation |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | try: |
| | if reference_type == "index": |
| | try: |
| | index = int(frame_reference) |
| | self.driver.switch_to.frame(index) |
| | except ValueError: |
| | return {"status": "error", "message": f"Invalid frame index: {frame_reference}"} |
| | elif reference_type == "name" or reference_type == "id": |
| | self.driver.switch_to.frame(frame_reference) |
| | elif reference_type == "element": |
| | |
| | selector_parts = frame_reference.split(":", 1) |
| | if len(selector_parts) != 2: |
| | return {"status": "error", "message": "Element reference must be in format 'selector_type:selector'"} |
| | |
| | selector_type, selector = selector_parts |
| | element_result = self.find_element(selector, selector_type) |
| | |
| | if element_result["status"] != "success": |
| | return {"status": "error", "message": f"Could not find frame element: {element_result['message']}"} |
| | |
| | |
| | selector_map = { |
| | "css": By.CSS_SELECTOR, |
| | "xpath": By.XPATH, |
| | "id": By.ID, |
| | "class": By.CLASS_NAME, |
| | "name": By.NAME, |
| | "tag": By.TAG_NAME, |
| | } |
| | by_type = selector_map.get(selector_type.lower()) |
| | element = self.driver.find_element(by_type, selector) |
| | self.driver.switch_to.frame(element) |
| | else: |
| | return {"status": "error", "message": f"Invalid reference type: {reference_type}"} |
| | |
| | return { |
| | "status": "success", |
| | "message": f"Switched to frame using {reference_type}: {frame_reference}" |
| | } |
| | except Exception as e: |
| | logger.error(f"Error switching to frame {frame_reference}: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def switch_to_window(self, window_reference: str, reference_type: str = "index") -> Dict[str, Any]: |
| | """ |
| | Switch to a window or tab. |
| | |
| | Args: |
| | window_reference (str): Reference to the window (index, handle, or title) |
| | reference_type (str): Type of reference ('index', 'handle', 'title') |
| | |
| | Returns: |
| | Dict[str, Any]: Result of the window switch operation |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | try: |
| | window_handles = self.driver.window_handles |
| | |
| | if not window_handles: |
| | return {"status": "error", "message": "No window handles available"} |
| | |
| | if reference_type == "index": |
| | try: |
| | index = int(window_reference) |
| | if index < 0 or index >= len(window_handles): |
| | return {"status": "error", "message": f"Window index out of range: {index}"} |
| | |
| | self.driver.switch_to.window(window_handles[index]) |
| | except ValueError: |
| | return {"status": "error", "message": f"Invalid window index: {window_reference}"} |
| | elif reference_type == "handle": |
| | if window_reference not in window_handles: |
| | return {"status": "error", "message": f"Window handle not found: {window_reference}"} |
| | |
| | self.driver.switch_to.window(window_reference) |
| | elif reference_type == "title": |
| | current_handle = self.driver.current_window_handle |
| | window_found = False |
| | |
| | for handle in window_handles: |
| | try: |
| | self.driver.switch_to.window(handle) |
| | if self.driver.title == window_reference: |
| | window_found = True |
| | break |
| | except Exception: |
| | pass |
| | |
| | if not window_found: |
| | |
| | self.driver.switch_to.window(current_handle) |
| | return {"status": "error", "message": f"No window with title '{window_reference}' found"} |
| | else: |
| | return {"status": "error", "message": f"Invalid reference type: {reference_type}"} |
| | |
| | return { |
| | "status": "success", |
| | "message": f"Switched to window using {reference_type}: {window_reference}", |
| | "title": self.driver.title, |
| | "url": self.driver.current_url |
| | } |
| | except Exception as e: |
| | logger.error(f"Error switching to window {window_reference}: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def select_dropdown_option(self, select_selector: str, |
| | option_value: str, |
| | select_by: str = "value", |
| | selector_type: str = "css") -> Dict[str, Any]: |
| | """ |
| | Select an option from a dropdown |
| | select_by can be 'value', 'text', or 'index' |
| | |
| | Args: |
| | select_selector (str): The selector to find the dropdown element |
| | option_value (str): The value to select (depends on select_by) |
| | select_by (str): Method to select by ('value', 'text', 'index') |
| | selector_type (str): Type of selector for the dropdown |
| | |
| | Returns: |
| | Dict[str, Any]: Result of the selection operation |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | try: |
| | from selenium.webdriver.support.ui import Select |
| | |
| | |
| | by_type = self._get_selector_by_type(selector_type) |
| | if isinstance(by_type, dict): |
| | return by_type |
| | |
| | |
| | element, error = self._find_element_with_wait( |
| | by_type, select_selector, self.timeout, EC.presence_of_element_located |
| | ) |
| | if error: |
| | return {"status": "not_found", "message": f"Dropdown element not found with {selector_type}: {select_selector}"} |
| | |
| | |
| | select = Select(element) |
| | |
| | |
| | if select_by.lower() == "value": |
| | select.select_by_value(option_value) |
| | elif select_by.lower() == "text": |
| | select.select_by_visible_text(option_value) |
| | elif select_by.lower() == "index": |
| | try: |
| | select.select_by_index(int(option_value)) |
| | except ValueError: |
| | return {"status": "error", "message": f"Invalid index value: {option_value}. Must be an integer."} |
| | else: |
| | return {"status": "error", "message": f"Invalid select_by option: {select_by}"} |
| | |
| | return {"status": "success", "message": f"Selected option with {select_by}: {option_value}"} |
| | except Exception as e: |
| | logger.error(f"Error selecting dropdown option: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| |
|
| | def close_browser(self) -> Dict[str, Any]: |
| | """ |
| | Close the browser and end the session. Call this when you're done to free resources. |
| | |
| | Returns: |
| | Dict[str, Any]: Status of the browser closure |
| | """ |
| | if not self.driver: |
| | return {"status": "success", "message": "Browser already closed"} |
| | |
| | try: |
| | self.driver.quit() |
| | self.driver = None |
| | return {"status": "success", "message": "Browser closed successfully"} |
| | except Exception as e: |
| | logger.error(f"Error closing browser: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def browser_click(self, element: str = None, ref: str = None, |
| | function_params: list = None) -> Dict[str, Any]: |
| | """ |
| | Click on a button, link, or other clickable element using a reference ID from a snapshot. |
| | |
| | This function only works with element references from a snapshot. You MUST call browser_snapshot |
| | or navigate_to_url first to capture the page elements. |
| | |
| | Common usage pattern: |
| | 1. First get a snapshot: browser_snapshot() or navigate_to_url() |
| | 2. Find the element reference (e.g. 'e0', 'e1') from the snapshot's interactive_elements |
| | 3. Use that reference to click: browser_click(element='Login button', ref='e0') |
| | |
| | This function supports multiple parameter styles: |
| | 1. Standard style: element (description), ref (element ID) |
| | 2. Nested function_params style: |
| | function_params=[{"function_name": "browser_click", "function_args": {...}}] |
| | |
| | Args: |
| | element (str, optional): Human-readable description of what you're clicking (e.g., 'Login button', 'Next page link') |
| | ref (str, optional): Element ID from the page snapshot (e.g., 'e0', 'e1', 'e2') - NOT a CSS selector |
| | function_params (list, optional): Nested function parameters |
| | |
| | Returns: |
| | Dict[str, Any]: Result of the click operation with detailed feedback |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | |
| | if function_params and not ref: |
| | params = self._handle_function_params( |
| | function_params, |
| | "browser_click", |
| | {"element": "element", "ref": "ref"} |
| | ) |
| | element = params.get("element", element) |
| | ref = params.get("ref", ref) |
| | |
| | |
| | if not ref: |
| | return { |
| | "status": "error", |
| | "message": "Element reference (ref) parameter is required. You must first call browser_snapshot() or navigate_to_url() to get element references.", |
| | "required_steps": [ |
| | "1. Call browser_snapshot() or navigate_to_url() to get page elements", |
| | "2. Find the element reference (e.g. 'e0') in the response's interactive_elements", |
| | "3. Use that reference to click: browser_click(element='Button name', ref='e0')" |
| | ] |
| | } |
| | |
| | |
| | if not self.element_references: |
| | return { |
| | "status": "error", |
| | "message": "No element references found. You must first capture a page snapshot.", |
| | "required_steps": [ |
| | "1. Call browser_snapshot() or navigate_to_url() to capture the page state", |
| | "2. Use the element references returned in the snapshot" |
| | ] |
| | } |
| | |
| | |
| | selector_type, selector, error = self._parse_element_reference(ref) |
| | if error: |
| | return { |
| | "status": "error", |
| | "message": error, |
| | "help": "Make sure you're using a valid element reference from a recent snapshot" |
| | } |
| | |
| | |
| | element_desc = element or ref |
| | |
| | |
| | by_type = self._get_selector_by_type(selector_type) |
| | if isinstance(by_type, dict): |
| | return by_type |
| | |
| | try: |
| | |
| | try: |
| | element_exists = self.driver.find_element(by_type, selector) |
| | except Exception: |
| | return { |
| | "status": "not_found", |
| | "message": f"Element not found: {element_desc}", |
| | "suggestion": "The page may have changed. Try getting a new snapshot with browser_snapshot()" |
| | } |
| | |
| | |
| | web_element, error = self._find_element_with_wait( |
| | by_type, selector, self.timeout, EC.element_to_be_clickable |
| | ) |
| | if error: |
| | |
| | try: |
| | is_visible = element_exists.is_displayed() |
| | is_enabled = element_exists.is_enabled() |
| | element_tag = element_exists.tag_name |
| | element_classes = element_exists.get_attribute("class") |
| | |
| | return { |
| | "status": "not_clickable", |
| | "message": f"Element found but not clickable: {element_desc}", |
| | "element_state": { |
| | "visible": is_visible, |
| | "enabled": is_enabled, |
| | "tag": element_tag, |
| | "classes": element_classes |
| | }, |
| | "suggestion": "The element might be disabled, hidden, or covered by another element" |
| | } |
| | except Exception: |
| | return { |
| | "status": "not_clickable", |
| | "message": f"Element found but not clickable: {element_desc}", |
| | "suggestion": "The element might be disabled, hidden, or covered by another element" |
| | } |
| | |
| | |
| | web_element.click() |
| | |
| | |
| | page_loaded = self._wait_for_page_load(self.timeout) |
| | if not page_loaded: |
| | |
| | snapshot_result = self.browser_snapshot() |
| | return { |
| | "status": "partial_success", |
| | "message": "Element clicked, but page load timed out", |
| | "element": element_desc, |
| | "current_url": self.driver.current_url, |
| | "snapshot": snapshot_result if snapshot_result["status"] == "success" else None, |
| | "suggestion": "The page might still be loading. You may want to wait and take another snapshot." |
| | } |
| | |
| | |
| | snapshot_result = self.browser_snapshot() |
| | |
| | if snapshot_result["status"] == "success": |
| | return { |
| | "status": "success", |
| | "message": f"Successfully clicked on {element_desc}", |
| | "element": element_desc, |
| | "current_url": self.driver.current_url, |
| | "title": self.driver.title, |
| | "snapshot": { |
| | "interactive_elements": snapshot_result.get("interactive_elements", []) |
| | } |
| | } |
| | else: |
| | |
| | return { |
| | "status": "success", |
| | "message": f"Successfully clicked on {element_desc} but snapshot failed", |
| | "element": element_desc, |
| | "current_url": self.driver.current_url, |
| | "title": self.driver.title, |
| | "snapshot_error": snapshot_result.get("message", "Unknown error capturing snapshot"), |
| | "suggestion": "You may want to take another snapshot with browser_snapshot()" |
| | } |
| | |
| | except TimeoutException: |
| | return { |
| | "status": "timeout", |
| | "message": f"Timed out waiting for element to be clickable: {element_desc}", |
| | "suggestion": "The element might be taking too long to load or become clickable" |
| | } |
| | except Exception as e: |
| | logger.error(f"Error clicking element: {str(e)}") |
| | return { |
| | "status": "error", |
| | "message": str(e), |
| | "element": element_desc, |
| | "suggestion": "Try getting a new snapshot of the page with browser_snapshot()" |
| | } |
| | |
| | def _classify_element_interactivity(self, element_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Classify an element's interactivity based on its properties. |
| | This method contains all rules for determining if an element is interactive or editable. |
| | |
| | Args: |
| | element_data (Dict[str, Any]): Element data including properties, attributes, etc. |
| | |
| | Returns: |
| | Dict[str, Any]: Element data with interactivity classifications added |
| | """ |
| | |
| | element_data["interactable"] = False |
| | element_data["editable"] = False |
| | |
| | |
| | tag_name = element_data.get("properties", {}).get("tag", "").upper() |
| | role = element_data.get("attributes", {}).get("role", "").lower() |
| | |
| | |
| | is_disabled = ( |
| | element_data.get("attributes", {}).get("disabled") is not None or |
| | element_data.get("attributes", {}).get("aria-disabled") == "true" or |
| | element_data.get("attributes", {}).get("aria-hidden") == "true" |
| | ) |
| | |
| | |
| | is_visible = element_data.get("visible", True) |
| | |
| | if not is_disabled and is_visible: |
| | |
| | interactive_tags = { |
| | 'A', 'BUTTON', 'INPUT', 'SELECT', 'TEXTAREA', |
| | 'DETAILS', 'AUDIO', 'VIDEO', 'IFRAME', 'EMBED', |
| | 'OBJECT', 'SUMMARY', 'MENU' |
| | } |
| | |
| | |
| | interactive_roles = { |
| | 'button', 'link', 'checkbox', 'menuitem', |
| | 'menuitemcheckbox', 'menuitemradio', 'option', |
| | 'radio', 'searchbox', 'slider', 'spinbutton', |
| | 'switch', 'tab', 'textbox', 'combobox', |
| | 'listbox', 'menu', 'menubar', 'radiogroup', |
| | 'tablist', 'toolbar', 'tree', 'treegrid' |
| | } |
| | |
| | |
| | has_interactive_attrs = any([ |
| | element_data.get("attributes", {}).get(attr) is not None |
| | for attr in ['onclick', 'onkeydown', 'onkeyup', 'onmousedown', |
| | 'onmouseup', 'tabindex'] |
| | ]) |
| | |
| | |
| | element_data["interactable"] = ( |
| | tag_name in interactive_tags or |
| | role in interactive_roles or |
| | has_interactive_attrs |
| | ) |
| | |
| | |
| | editable_input_types = {'text', 'search', 'email', 'number', 'tel', |
| | 'url', 'password'} |
| | editable_roles = {'textbox', 'searchbox', 'spinbutton'} |
| | |
| | element_data["editable"] = ( |
| | |
| | (tag_name == 'INPUT' and |
| | element_data.get("attributes", {}).get("type", "text").lower() in editable_input_types) or |
| | tag_name == 'TEXTAREA' or |
| | |
| | element_data.get("attributes", {}).get("contenteditable") == "true" or |
| | |
| | role in editable_roles |
| | ) |
| | |
| | return element_data |
| |
|
| | def _process_accessibility_tree(self, accessibility_tree): |
| | """ |
| | Process the accessibility tree to extract all elements and store their references. |
| | |
| | This method processes all elements in the page structure, assigns unique IDs, |
| | and stores their selectors for later interaction. |
| | |
| | Args: |
| | accessibility_tree (dict): The accessibility tree from JavaScript |
| | |
| | Returns: |
| | list: A list of all elements with their IDs and properties |
| | """ |
| | all_elements = [] |
| | |
| | |
| | def extract_elements(node, path="", index=0): |
| | if not node: |
| | return index |
| | |
| | current_path = path + "/" + (node.get("name") or node.get("role") or "element") |
| | |
| | |
| | element_id = f"e{index}" |
| | |
| | element_info = { |
| | "id": element_id, |
| | "description": current_path.strip("/"), |
| | "purpose": node.get("semantic_info", {}).get("purpose", ""), |
| | "label": node.get("semantic_info", {}).get("label", ""), |
| | "category": node.get("semantic_info", {}).get("category", ""), |
| | "isPrimary": node.get("semantic_info", {}).get("isPrimary", False), |
| | "visible": node.get("visible", True), |
| | "properties": node.get("properties", {}), |
| | "attributes": node.get("attributes", {}) |
| | } |
| | |
| | |
| | if "all_refs" in node: |
| | self.element_references[element_id] = node["all_refs"][0] |
| | |
| | |
| | element_info = self._classify_element_interactivity(element_info) |
| | |
| | all_elements.append(element_info) |
| | index += 1 |
| | |
| | |
| | for child in node.get("children", []): |
| | index = extract_elements(child, current_path, index) |
| | |
| | return index |
| | |
| | |
| | extract_elements(accessibility_tree) |
| | |
| | return all_elements |
| |
|
| | def browser_snapshot(self, function_params: list = None) -> Dict[str, Any]: |
| | """ |
| | Capture a fresh snapshot of the current page with all interactive elements. |
| | Use after page state changes not caused by navigation or clicking. |
| | |
| | This function supports multiple parameter styles: |
| | 1. Standard style: no parameters |
| | 2. Nested function_params style: |
| | function_params=[{"function_name": "browser_snapshot", "function_args": {}}] |
| | |
| | Args: |
| | function_params (list, optional): Nested function parameters |
| | |
| | Returns: |
| | Dict[str, Any]: The accessibility snapshot of the page with interactive elements |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | try: |
| | |
| | title = self.driver.title |
| | current_url = self.driver.current_url |
| | |
| | |
| | accessibility_tree = self.driver.execute_script(""" |
| | function getAccessibilityTree(node, depth = 0, maxDepth = 10) { |
| | if (!node || depth > maxDepth) return null; |
| | |
| | let result = { |
| | role: node.role || node.tagName, |
| | name: node.name || '', |
| | type: node.type || '', |
| | value: node.value || '', |
| | description: node.description || '', |
| | properties: {}, |
| | visible: isElementVisible(node) |
| | }; |
| | |
| | // Helper function for element visibility |
| | function isElementVisible(element) { |
| | if (!element.getBoundingClientRect) return true; |
| | const style = window.getComputedStyle(element); |
| | const rect = element.getBoundingClientRect(); |
| | |
| | // Check basic visibility |
| | const isVisible = style.display !== 'none' && |
| | style.visibility !== 'hidden' && |
| | style.opacity !== '0' && |
| | rect.width > 0 && |
| | rect.height > 0; |
| | |
| | // Check if element is in viewport |
| | const isInViewport = rect.top >= 0 && |
| | rect.left >= 0 && |
| | rect.bottom <= window.innerHeight && |
| | rect.right <= window.innerWidth; |
| | |
| | return isVisible && isInViewport; |
| | } |
| | |
| | // Add text content |
| | if (node.textContent) { |
| | result.text_content = node.textContent.trim(); |
| | } |
| | |
| | // Add identifier properties for references |
| | if (node.id) result.properties.id = node.id; |
| | if (node.className) result.properties.class = node.className; |
| | if (node.tagName) result.properties.tag = node.tagName.toLowerCase(); |
| | |
| | // Add attributes |
| | if (node.attributes) { |
| | result.attributes = {}; |
| | for (let attr of node.attributes) { |
| | result.attributes[attr.name] = attr.value; |
| | } |
| | } |
| | |
| | // Add custom ref property that combines selector types |
| | let refs = []; |
| | // Store all possible selectors, but don't use them as primary ref |
| | if (node.id) refs.push(`id:${node.id}`); |
| | if (node.className && typeof node.className === 'string') |
| | refs.push(`class:${node.className}`); |
| | if (node.tagName) refs.push(`tag:${node.tagName.toLowerCase()}`); |
| | |
| | // For inputs, add name attribute |
| | if (node.getAttribute && node.getAttribute('name')) { |
| | result.properties.name = node.getAttribute('name'); |
| | refs.push(`name:${node.getAttribute('name')}`); |
| | } |
| | |
| | // Create XPath and CSS selectors |
| | try { |
| | // CSS selector |
| | let cssPath = getCssPath(node); |
| | if (cssPath) refs.push(`css:${cssPath}`); |
| | |
| | // XPath |
| | let xpath = getXPath(node); |
| | if (xpath) refs.push(`xpath:${xpath}`); |
| | } catch (e) {} |
| | |
| | // Store all refs but don't set primary ref here |
| | if (refs.length > 0) { |
| | result.all_refs = refs; |
| | } |
| | |
| | // Add semantic information about the element |
| | result.semantic_info = { |
| | // What the element represents |
| | purpose: (function() { |
| | if (node.tagName === 'INPUT') { |
| | if (node.type === 'submit') return 'submit button'; |
| | if (node.type === 'search') return 'search box'; |
| | if (node.type === 'text') return 'text input'; |
| | return `${node.type || 'text'} input`; |
| | } |
| | if (node.tagName === 'BUTTON') return 'button'; |
| | if (node.tagName === 'A') return 'link'; |
| | if (node.tagName === 'SELECT') return 'dropdown'; |
| | if (node.tagName === 'TEXTAREA') return 'text area'; |
| | if (node.getAttribute('role')) return node.getAttribute('role'); |
| | return 'interactive element'; |
| | })(), |
| | |
| | // The visible or accessible text |
| | label: (function() { |
| | return node.getAttribute('aria-label') || |
| | node.getAttribute('title') || |
| | node.getAttribute('placeholder') || |
| | node.getAttribute('alt') || |
| | (node.tagName === 'INPUT' ? node.value : node.textContent.trim()); |
| | })(), |
| | |
| | // Is this a primary action? |
| | isPrimary: !!( |
| | node.classList.contains('primary') || |
| | node.getAttribute('aria-label')?.toLowerCase().includes('search') || |
| | node.getAttribute('title')?.toLowerCase().includes('search') || |
| | node.type === 'search' || |
| | node.getAttribute('role') === 'main' || |
| | node.id?.toLowerCase().includes('main') || |
| | node.classList.contains('main') |
| | ), |
| | |
| | // Basic category |
| | category: (function() { |
| | if (node.type === 'search' || |
| | node.getAttribute('role') === 'searchbox') return 'search'; |
| | if (node.type === 'submit' || |
| | node.tagName === 'BUTTON' || |
| | node.getAttribute('role') === 'button') return 'action'; |
| | if (node.tagName === 'A' || |
| | node.getAttribute('role') === 'link') return 'navigation'; |
| | if (node.tagName === 'INPUT' || |
| | node.tagName === 'TEXTAREA' || |
| | node.getAttribute('role') === 'textbox') return 'input'; |
| | if (node.tagName === 'SELECT' || |
| | ['listbox', 'combobox'].includes(node.getAttribute('role'))) return 'selection'; |
| | return 'interactive'; |
| | })() |
| | }; |
| | |
| | // Process children |
| | result.children = []; |
| | if (node.children) { |
| | for (let i = 0; i < node.children.length; i++) { |
| | const childTree = getAccessibilityTree(node.children[i], depth + 1, maxDepth); |
| | if (childTree) { |
| | result.children.push(childTree); |
| | } |
| | } |
| | } |
| | |
| | return result; |
| | } |
| | |
| | return getAccessibilityTree(document.body); |
| | """) |
| | |
| | |
| | all_elements = self._process_accessibility_tree(accessibility_tree) |
| | page_content = html2text.html2text(self.driver.page_source) |
| | |
| | return { |
| | "status": "success", |
| | "title": title, |
| | "url": current_url, |
| | "accessibility_tree": accessibility_tree, |
| | "page_content": page_content, |
| | "interactive_elements": [e for e in all_elements if e.get("interactable") or e.get("editable")] |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error generating accessibility snapshot: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def browser_console_messages(self, function_params: list = None) -> Dict[str, Any]: |
| | """ |
| | Retrieve JavaScript console messages (logs, warnings, errors) from the browser for debugging. |
| | |
| | This function supports multiple parameter styles: |
| | 1. Standard style: no parameters |
| | 2. Nested function_params style: |
| | function_params=[{"function_name": "browser_console_messages", "function_args": {}}] |
| | |
| | Args: |
| | function_params (list, optional): Nested function parameters |
| | |
| | Returns: |
| | Dict[str, Any]: The console messages including logs, warnings and errors |
| | """ |
| | |
| | driver_check = self._check_driver_initialized() |
| | if driver_check: |
| | return driver_check |
| | |
| | try: |
| | logs = self._collect_browser_logs() |
| | |
| | return { |
| | "status": "success", |
| | "console_messages": logs |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error retrieving console messages: {str(e)}") |
| | return {"status": "error", "message": str(e)} |
| | |
| | def _collect_browser_logs(self) -> List[Dict[str, Any]]: |
| | """ |
| | Collect logs from both the browser driver and JavaScript console. |
| | |
| | Returns: |
| | List[Dict[str, Any]]: Combined logs from both sources |
| | """ |
| | logs = [] |
| | |
| | |
| | try: |
| | browser_logs = self.driver.get_log('browser') |
| | for log in browser_logs: |
| | |
| | level = log.get("level", "").upper() |
| | if level == "SEVERE": |
| | level = "ERROR" |
| | elif level == "INFO": |
| | level = "LOG" |
| | |
| | logs.append({ |
| | "level": level, |
| | "message": log.get("message", ""), |
| | "timestamp": log.get("timestamp", "") |
| | }) |
| | except Exception as log_error: |
| | |
| | logs.append({ |
| | "level": "WARNING", |
| | "message": f"Could not retrieve browser logs: {str(log_error)}", |
| | "timestamp": "" |
| | }) |
| | |
| | |
| | try: |
| | |
| | self.driver.execute_script(""" |
| | if (!window._consoleLogs) { |
| | window._consoleLogs = []; |
| | |
| | // Store original console methods |
| | const originalConsole = { |
| | log: console.log, |
| | info: console.info, |
| | warn: console.warn, |
| | error: console.error, |
| | debug: console.debug |
| | }; |
| | |
| | // Helper function to add message with proper level |
| | function addMessage(level, args) { |
| | window._consoleLogs.push({ |
| | level: level.toUpperCase(), |
| | message: Array.from(args).join(' '), |
| | timestamp: new Date().toISOString() |
| | }); |
| | } |
| | |
| | // Override console methods to capture logs |
| | console.log = function() { |
| | addMessage('LOG', arguments); |
| | originalConsole.log.apply(console, arguments); |
| | }; |
| | |
| | console.info = function() { |
| | addMessage('INFO', arguments); |
| | originalConsole.info.apply(console, arguments); |
| | }; |
| | |
| | console.warn = function() { |
| | addMessage('WARN', arguments); |
| | originalConsole.warn.apply(console, arguments); |
| | }; |
| | |
| | console.error = function() { |
| | addMessage('ERROR', arguments); |
| | originalConsole.error.apply(console, arguments); |
| | }; |
| | |
| | console.debug = function() { |
| | addMessage('DEBUG', arguments); |
| | originalConsole.debug.apply(console, arguments); |
| | }; |
| | } |
| | """) |
| | |
| | |
| | time.sleep(2) |
| | |
| | |
| | js_logs = self.driver.execute_script("return window._consoleLogs || [];") |
| | |
| | |
| | for log in js_logs: |
| | if log not in logs: |
| | logs.append(log) |
| | |
| | except Exception as js_error: |
| | logs.append({ |
| | "level": "WARNING", |
| | "message": f"Could not retrieve JavaScript console logs: {str(js_error)}", |
| | "timestamp": "" |
| | }) |
| | |
| | return logs |
| |
|
| | def __del__(self): |
| | """ |
| | Destructor to automatically close the browser when the instance is destroyed. |
| | """ |
| | if hasattr(self, 'driver') and self.driver: |
| | try: |
| | self.driver.quit() |
| | logger.info("Browser automatically closed on cleanup") |
| | except Exception as e: |
| | logger.warning(f"Error during automatic browser cleanup: {str(e)}") |
| |
|
| |
|
| | class NavigateToUrlTool(Tool): |
| | name: str = "navigate_to_url" |
| | description: str = "Navigate to a URL and capture a snapshot of all page elements" |
| | inputs: Dict[str, Dict[str, str]] = { |
| | "url": { |
| | "type": "string", |
| | "description": "The complete URL (with https://) to navigate to" |
| | }, |
| | "timeout": { |
| | "type": "integer", |
| | "description": "Custom timeout in seconds (default: 10)" |
| | } |
| | } |
| | required: Optional[List[str]] = ["url"] |
| | |
| | def __init__(self, browser_tool: BrowserBase = None): |
| | super().__init__() |
| | self.browser_tool = browser_tool |
| | |
| | def __call__(self, url: str, timeout: int = None, function_params: list = None) -> Dict[str, Any]: |
| | """Navigate to URL using the BrowserBase instance.""" |
| | if not self.browser_tool: |
| | raise RuntimeError("Browser tool instance not initialized") |
| | |
| | try: |
| | return self.browser_tool.navigate_to_url(url, timeout, function_params) |
| | except Exception as e: |
| | return {"status": "error", "message": f"Error navigating to URL: {str(e)}"} |
| |
|
| |
|
| | class InputTextTool(Tool): |
| | name: str = "input_text" |
| | description: str = "Type text into a form field, search box, or other input element using a reference ID from a snapshot" |
| | inputs: Dict[str, Dict[str, str]] = { |
| | "element": { |
| | "type": "string", |
| | "description": "Human-readable description of the element (e.g., 'Search field', 'Username input')" |
| | }, |
| | "ref": { |
| | "type": "string", |
| | "description": "Element ID from the page snapshot (e.g., 'e0', 'e1', 'e2'). Must refer to an editable element." |
| | }, |
| | "text": { |
| | "type": "string", |
| | "description": "Text to input into the element" |
| | }, |
| | "submit": { |
| | "type": "boolean", |
| | "description": "Press Enter after typing to submit forms (default: false)" |
| | }, |
| | "slowly": { |
| | "type": "boolean", |
| | "description": "Type one character at a time to trigger JS events (default: true)" |
| | } |
| | } |
| | required: Optional[List[str]] = ["element", "ref", "text"] |
| | |
| | def __init__(self, browser_tool: BrowserBase = None): |
| | super().__init__() |
| | self.browser_tool = browser_tool |
| | |
| | def __call__(self, element: str, ref: str, text: str, submit: bool = False, slowly: bool = True, function_params: list = None) -> Dict[str, Any]: |
| | """Input text using the BrowserBase instance.""" |
| | if not self.browser_tool: |
| | raise RuntimeError("Browser tool instance not initialized") |
| | |
| | try: |
| | return self.browser_tool.input_text(element, ref, text, submit, slowly, function_params) |
| | except Exception as e: |
| | return {"status": "error", "message": f"Error inputting text: {str(e)}"} |
| |
|
| |
|
| | class BrowserClickTool(Tool): |
| | name: str = "browser_click" |
| | description: str = "Click on a button, link, or other clickable element using a reference ID from a snapshot" |
| | inputs: Dict[str, Dict[str, str]] = { |
| | "element": { |
| | "type": "string", |
| | "description": "Human-readable description of what you're clicking (e.g., 'Login button', 'Next page link', 'Submit button')" |
| | }, |
| | "ref": { |
| | "type": "string", |
| | "description": "Element ID from the page snapshot (e.g., 'e0', 'e1', 'e2'). You MUST get this ID from a previous snapshot's interactive_elements." |
| | } |
| | } |
| | required: Optional[List[str]] = [] |
| | |
| | def __init__(self, browser_tool: BrowserBase = None): |
| | super().__init__() |
| | self.browser_tool = browser_tool |
| | |
| | def __call__(self, element: str, ref: str, function_params: list = None) -> Dict[str, Any]: |
| | """Click element using the BrowserBase instance.""" |
| | if not self.browser_tool: |
| | raise RuntimeError("Browser tool instance not initialized") |
| | |
| | try: |
| | return self.browser_tool.browser_click(element, ref, function_params) |
| | except Exception as e: |
| | return {"status": "error", "message": f"Error clicking element: {str(e)}"} |
| |
|
| |
|
| | class BrowserSnapshotTool(Tool): |
| | name: str = "browser_snapshot" |
| | description: str = "Capture a fresh snapshot of the current page, including all elements" |
| | inputs: Dict[str, Dict[str, str]] = {} |
| | required: Optional[List[str]] = [] |
| | |
| | def __init__(self, browser_tool: BrowserBase = None): |
| | super().__init__() |
| | self.browser_tool = browser_tool |
| | |
| | def __call__(self, function_params: list = None) -> Dict[str, Any]: |
| | """Take browser snapshot using the BrowserBase instance.""" |
| | if not self.browser_tool: |
| | raise RuntimeError("Browser tool instance not initialized") |
| | |
| | try: |
| | return self.browser_tool.browser_snapshot(function_params) |
| | except Exception as e: |
| | return {"status": "error", "message": f"Error taking snapshot: {str(e)}"} |
| |
|
| |
|
| | class BrowserConsoleMessagesTool(Tool): |
| | name: str = "browser_console_messages" |
| | description: str = "Retrieve JavaScript console messages (logs, warnings, errors) from the browser for debugging" |
| | inputs: Dict[str, Dict[str, str]] = {} |
| | required: Optional[List[str]] = [] |
| | |
| | def __init__(self, browser_tool: BrowserBase = None): |
| | super().__init__() |
| | self.browser_tool = browser_tool |
| | |
| | def __call__(self, function_params: list = None) -> Dict[str, Any]: |
| | """Get console messages using the BrowserBase instance.""" |
| | if not self.browser_tool: |
| | raise RuntimeError("Browser tool instance not initialized") |
| | |
| | try: |
| | return self.browser_tool.browser_console_messages(function_params) |
| | except Exception as e: |
| | return {"status": "error", "message": f"Error getting console messages: {str(e)}"} |
| |
|
| |
|
| | class BrowserToolkit(Toolkit): |
| | """ |
| | Browser toolkit with auto-initialization and cleanup. |
| | |
| | The browser is automatically initialized when any tool is first used, |
| | and automatically closed when the toolkit instance is destroyed. |
| | No explicit initialization or cleanup is required. |
| | """ |
| | def __init__( |
| | self, |
| | name: str = "BrowserToolkit", |
| | browser_type: str = "chrome", |
| | headless: bool = False, |
| | timeout: int = 10, |
| | **kwargs |
| | |
| | ): |
| | |
| | browser_tool = BrowserBase( |
| | name="BrowserBase", |
| | browser_type=browser_type, |
| | headless=headless, |
| | timeout=timeout, |
| | **kwargs |
| | ) |
| | |
| | |
| | |
| | tools = [ |
| | NavigateToUrlTool(browser_tool=browser_tool), |
| | InputTextTool(browser_tool=browser_tool), |
| | BrowserClickTool(browser_tool=browser_tool), |
| | BrowserSnapshotTool(browser_tool=browser_tool), |
| | BrowserConsoleMessagesTool(browser_tool=browser_tool) |
| | ] |
| | |
| | |
| | super().__init__(name=name, tools=tools) |
| | |
| | |
| | self.browser_tool = browser_tool |
| | |