| |
| import base64 |
| from typing import Optional |
| import os |
| import requests |
| from pathlib import Path |
|
|
| |
| import pandas as pd |
| from langchain_core.messages import HumanMessage |
| from langchain_openai import ChatOpenAI |
| from tavily import TavilyClient |
| import wikipedia |
| from youtube_transcript_api import YouTubeTranscriptApi |
|
|
|
|
| openai_token = os.getenv("HF_FINAL_ASSIGNMENT_OPENAI") |
| tavily_api_key = os.getenv("HF_FINAL_ASSIGNMENT_TAVILY") |
|
|
| tavily_client = TavilyClient(api_key=tavily_api_key) |
| vision_llm = ChatOpenAI(model="gpt-5.2", api_key=openai_token, temperature=0) |
|
|
|
|
| def extract_text_from_image(img_path: str) -> str: |
| """ |
| Extract text from an image file using a multimodal model. |
| Use this method only for image files. |
| |
| Args: |
| img_path: A local image file path (strings). |
| |
| Returns: |
| A single string containing the concatenated text extracted from each image. |
| """ |
| all_text = "" |
| try: |
| |
| with open(img_path, "rb") as image_file: |
| image_bytes = image_file.read() |
|
|
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
|
| |
| message = [ |
| HumanMessage( |
| content=[ |
| { |
| "type": "text", |
| "text": ( |
| "Extract all the text from this image. " |
| "Return only the extracted text, no explanations." |
| ), |
| }, |
| { |
| "type": "image_url", |
| "image_url": {"url": f"data:image/png;base64,{image_base64}"}, |
| }, |
| ] |
| ) |
| ] |
|
|
| |
| response = vision_llm.invoke(message) |
|
|
| |
| all_text += response.content + "\n\n" |
|
|
| return all_text.strip() |
| except Exception as e: |
| |
| error_msg = f"Error extracting text: {str(e)}" |
| print(error_msg) |
| return "" |
|
|
|
|
| def tavily_search(query: str) -> dict: |
| """Search the web using Tavily and return a compact list of results as plain text.""" |
| response = tavily_client.search(query=query, search_depth="advanced") |
| return response |
|
|
|
|
| def wikipedia_get_suggested_title_for_query(query: str) -> str: |
| """Get the most relevant Wikipedia page title for a given query.""" |
| try: |
| suggested_title = wikipedia.suggest(query) |
| return suggested_title if suggested_title else "" |
| except Exception as e: |
| print(f"Error getting Wikipedia suggestion: {str(e)}") |
| return "" |
|
|
|
|
| def wikipedia_search_pages(query: str): |
| """ |
| Search Wikipedia for a query and return a list of relevant page titles. |
| """ |
| try: |
| search_results = wikipedia.search(query) |
| return "\n".join(search_results) |
| except Exception as e: |
| print(f"Error searching Wikipedia: {str(e)}") |
| return "" |
|
|
|
|
| def wikipedia_get_page_summary(page_title: str, lang: str = "en") -> str: |
| """ |
| Get the summary of a Wikipedia page given its title. |
| """ |
| try: |
| summary = wikipedia.summary(page_title) |
| return summary |
| except Exception as e: |
| print(f"Error getting Wikipedia page summary: {str(e)}") |
| return "" |
|
|
|
|
| def wikipedia_get_page_full_content(page_title: str): |
| """ |
| Get the full content of a Wikipedia page given its title. |
| |
| We can access most properties using property methods. Example: |
| |
| ny = wikipedia.page("New York") |
| |
| ny.title |
| u'New York' |
| |
| ny.url |
| u'http://en.wikipedia.org/wiki/NewYork' |
| |
| ny.content |
| u'New York is a state in the Northeastern region of the United States. New York is the 27th-most exten'... |
| |
| ny.images[0] |
| u'http://upload.wikimedia.org/wikipedia/commons/9/91/New_York_quarter%2C_reverse_side%2C_2001.jpg' |
| |
| ny.links[0] |
| u'1790 United States Census' |
| |
| """ |
| try: |
| page = wikipedia.page(page_title) |
| return page.content |
| except Exception as e: |
| print(f"Error getting Wikipedia page content: {str(e)}") |
| return "" |
|
|
|
|
| def youtube_get_transcript_of_video(video_url: str): |
| """ |
| Get the transcript of a YouTube video given its URL. |
| |
| using the YouTube Data API or a third-party library |
| |
| This will return a FetchedTranscript object looking somewhat like this: |
| |
| FetchedTranscript( |
| snippets=[ |
| FetchedTranscriptSnippet( |
| text="Hey there", |
| start=0.0, |
| duration=1.54, |
| ), |
| FetchedTranscriptSnippet( |
| text="how are you", |
| start=1.54, |
| duration=4.16, |
| ), |
| # ... |
| ], |
| video_id="12345", |
| language="English", |
| language_code="en", |
| is_generated=False, |
| ) |
| |
| Do NOT run: `YouTubeTranscriptApi().fetch("https://www.youtube.com/watch?v=1234")` |
| Instead run: `YouTubeTranscriptApi().fetch("1234")` |
| |
| """ |
| |
| ytt_api = YouTubeTranscriptApi() |
|
|
| |
| video_id = video_url.split("v=")[-1] |
| fetched_transcript = ytt_api.fetch(video_id) |
|
|
| return fetched_transcript |
|
|
|
|
| def chessboard_image_to_text_description_to_fen_notation( |
| image_path: str, color_to_move: str |
| ) -> str: |
| """ |
| Converts a chessboard image into a textual description of the position and its FEN notation. |
| |
| Args: |
| image_path: A local image file path (string) representing the chessboard position. |
| color_to_move: A string indicating which color is to move ("white" or "black"). |
| |
| Returns: |
| A string indicating the FEN notation of the chess position. |
| """ |
| all_text = "" |
| try: |
| |
| with open(image_path, "rb") as image_file: |
| image_bytes = image_file.read() |
|
|
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
|
| |
| message = [ |
| HumanMessage( |
| content=[ |
| { |
| "type": "text", |
| "text": ( |
| "Draw a 8x8 table representing the chessboard." |
| "Describe the chess position rank by rank from rank 8 to rank 1. " |
| "For each rank, list what occupies each square from file a to file h. " |
| "One square at a time, complete the table with the piece occupying that square if any, or with '1' if the square is empty. " |
| "Once the table is complete, provide a textual description of the chessboard : uppercase letters for white pieces, lowercase letters for black pieces, and '1' for empty squares. " |
| "the values '1' in the table are helpful to determine the number of consecutive empty squares in a row, which is necessary to determine the FEN notation. " |
| "Based on this description, determine the FEN notation of the position." |
| "Reminder: for the FEN notation, start counting from rank 8 to rank 1, and for each rank, count from file a to file h." |
| "And if it is white to move, the FEN notation should end with 'w', and if it is black to move, the FEN notation should end with 'b'." |
| "Finally, the FEN notation should finish with the string '- - 0 1'" |
| ), |
| }, |
| { |
| "type": "text", |
| "text": (f"It is {color_to_move} to move in this position."), |
| }, |
| { |
| "type": "image_url", |
| "image_url": {"url": f"data:image/png;base64,{image_base64}"}, |
| }, |
| ] |
| ) |
| ] |
|
|
| |
| response = vision_llm.invoke(message) |
|
|
| |
| all_text += response.content + "\n\n" |
|
|
| print(f"Extracted table description: {all_text.strip()}") |
| return all_text.strip() |
| except Exception as e: |
| |
| error_msg = f"Error extracting text: {str(e)}" |
| print(error_msg) |
| return "" |
|
|
|
|
| def chessboard_get_fen_notation(image_path: str, color_to_move: str) -> str: |
| """ |
| Converts digital chessboard image into Forsyth-Edwards notation (FEN) notation |
| Args: |
| - image_path: A local image file path (string) representing the chessboard position. |
| - color_to_move: A string indicating which color is to move ("white" or "black"). |
| Returns: |
| A string representing the chess position in FEN notation. |
| """ |
|
|
| all_text = "" |
| try: |
| |
| with open(image_path, "rb") as image_file: |
| image_bytes = image_file.read() |
|
|
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
|
| |
| message = [ |
| HumanMessage( |
| content=[ |
| { |
| "type": "text", |
| "text": ( |
| "Describe the chess position rank by rank from rank 8 to rank 1. " |
| "For each rank, list what occupies each square from a to h. " |
| "Then convert your description to FEN notation." |
| "Reminder: for the FEN notation, start counting from rank 8 to rank 1, and for each rank, count from file a to file h." |
| "And if it is white to move, the FEN notation should end with 'w', and if it is black to move, the FEN notation should end with 'b'." |
| "Finally, the FEN notation should finish with the string '- - 0 1'" |
| ), |
| }, |
| { |
| "type": "text", |
| "text": (f"It is {color_to_move} to move in this position."), |
| }, |
| { |
| "type": "image_url", |
| "image_url": {"url": f"data:image/png;base64,{image_base64}"}, |
| }, |
| ] |
| ) |
| ] |
|
|
| |
| response = vision_llm.invoke(message) |
|
|
| |
| all_text += response.content + "\n\n" |
|
|
| print(f"Extracted FEN notation: {all_text.strip()}") |
| return all_text.strip() |
| except Exception as e: |
| |
| error_msg = f"Error extracting text: {str(e)}" |
| print(error_msg) |
| return "" |
|
|
|
|
| def get_best_next_move_from_fen(fen: str): |
| """ |
| requests Lichess API to get the best next move given a chess position in FEN notation. |
| required parameters: |
| - fen: A string representing the chess position in Forsyth-Edwards Notation (FEN). |
| """ |
|
|
| lichess_api_url = f"https://lichess.org/api/cloud-eval?fen={fen}" |
|
|
| try: |
| response = requests.get(lichess_api_url) |
| if response.status_code == 200: |
| data = response.json() |
| pvs = data.get( |
| "pvs", [] |
| ) |
| if pvs and isinstance(pvs, list): |
| best_move = ( |
| pvs[0].get("moves", "").split()[0] |
| ) |
| return best_move |
| else: |
| print(f"Error fetching best move from Lichess API: {response.status_code}") |
| return "" |
| except Exception as e: |
| print(f"Exception occurred while fetching best move from Lichess API: {str(e)}") |
| return "" |
|
|
|
|
| def execute_python_code_with_subprocess(code: str) -> str: |
| """ |
| Executes Python code in a subprocess and returns the output as a string. |
| This can be used to execute code from the GAIA level 1 tasks in a safe environment. |
| Args: |
| - code: A string containing the Python code to execute. |
| Returns: |
| - A string containing the standard output from the executed code, or an error message if execution fails. |
| """ |
| import subprocess |
| import sys |
|
|
| try: |
| |
| result = subprocess.run( |
| [sys.executable, "-c", code], |
| capture_output=True, |
| text=True, |
| timeout=60, |
| ) |
| return result.stdout.strip() |
| except subprocess.TimeoutExpired: |
| return "Error: Code execution timed out." |
| except Exception as e: |
| return f"Error executing code: {str(e)}" |
|
|
|
|
| def transcribe_audio_file(audio_file_path: str) -> str: |
| """ |
| Transcribes an audio file to text using OpenAI's gpt-4o-transcribe model. |
| Args: |
| - audio_file_path: A string representing the local path to the audio file. |
| Returns: |
| - A string containing the transcribed text from the audio file, or an error message if transcription fails. |
| """ |
| from openai import OpenAI |
|
|
| client = OpenAI(api_key=openai_token) |
|
|
| try: |
| with open(audio_file_path, "rb") as audio_file: |
| transcript = client.audio.transcriptions.create( |
| model="gpt-4o-transcribe", file=audio_file, response_format="text" |
| ) |
| return transcript.strip() |
| except Exception as e: |
| return f"Error transcribing audio: {str(e)}" |
|
|
|
|
| def read_excel_file(file_path: str) -> str: |
| """ |
| Reads an Excel file and returns its content as a string. |
| Args: |
| - file_path: A string representing the local path to the Excel file. |
| Returns: |
| - A string containing the content of the Excel file, or an error message if reading fails. |
| """ |
| try: |
| df = pd.read_excel(file_path) |
| print(f"Excel file read successfully. DataFrame shape: {df.head()}") |
| return df.to_string(index=False) |
| except Exception as e: |
| return f"Error reading Excel file: {str(e)}" |
|
|
|
|
| def divide(a: float, b: float) -> float: |
| """Divide a and b.""" |
| return a / b |
|
|
|
|
| def multiply(a: float, b: float) -> float: |
| """Multiply a and b.""" |
| return a * b |
|
|
|
|
| def add(a: float, b: float) -> float: |
| """Add a and b.""" |
| return a + b |
|
|
|
|
| def subtract(a: float, b: float) -> float: |
| """Subtract b from a.""" |
| return a - b |
|
|
|
|
| tools = [ |
| extract_text_from_image, |
| divide, |
| multiply, |
| add, |
| subtract, |
| tavily_search, |
| wikipedia_get_suggested_title_for_query, |
| wikipedia_search_pages, |
| wikipedia_get_page_summary, |
| wikipedia_get_page_full_content, |
| youtube_get_transcript_of_video, |
| |
| get_best_next_move_from_fen, |
| chessboard_image_to_text_description_to_fen_notation, |
| execute_python_code_with_subprocess, |
| transcribe_audio_file, |
| read_excel_file, |
| ] |
|
|
|
|
| def select_tools_for_input(input_file: Optional[str]): |
| suffix = Path(input_file).suffix.lower() if input_file else "" |
|
|
| |
| if suffix in [".xls", ".xlsx"]: |
| print("Selecting tools for Excel file input.") |
| return [ |
| read_excel_file, |
| execute_python_code_with_subprocess, |
| add, |
| subtract, |
| multiply, |
| divide, |
| ] |
|
|
| if suffix in [".py"]: |
| print("Selecting tools for Python code input.") |
| return [ |
| execute_python_code_with_subprocess, |
| add, |
| subtract, |
| multiply, |
| divide, |
| ] |
|
|
| |
| if suffix in [".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif"]: |
| return [ |
| extract_text_from_image, |
| chessboard_image_to_text_description_to_fen_notation, |
| get_best_next_move_from_fen, |
| ] |
|
|
| |
| return [ |
| tavily_search, |
| wikipedia_get_suggested_title_for_query, |
| wikipedia_search_pages, |
| wikipedia_get_page_summary, |
| wikipedia_get_page_full_content, |
| youtube_get_transcript_of_video, |
| get_best_next_move_from_fen, |
| chessboard_image_to_text_description_to_fen_notation, |
| execute_python_code_with_subprocess, |
| transcribe_audio_file, |
| read_excel_file, |
| add, |
| subtract, |
| multiply, |
| divide, |
| ] |
|
|