Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import requests | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import wikipedia | |
| # Import configuration variables from your config.py | |
| from agent.config import USER_AGENT, ATTACHMENTS, ATTACHMENT_BASE_URL | |
| # Initialize wikipedia with the user agent from config | |
| wikipedia.set_user_agent(USER_AGENT) | |
| # Utility Functions | |
| def extract_final_answer(response_text: str) -> str: | |
| """ | |
| Extracts the final answer from a response string based on a specific template. | |
| """ | |
| match = re.search(r"FINAL ANSWER: ?([^\n\.]+)", response_text) | |
| if match: | |
| return match.group(1).strip() | |
| return "unknown" | |
| def download_file(url: str) -> bytes: | |
| """ | |
| Downloads a file from a given URL. | |
| """ | |
| try: | |
| response = requests.get(url, headers={"User-Agent": USER_AGENT}, timeout=15) | |
| response.raise_for_status() # Raise an exception for HTTP errors | |
| return response.content | |
| except Exception as e: | |
| print(f"Error downloading file from {url}: {str(e)}") | |
| return None | |
| def get_file_type(filename: str) -> str: | |
| """ | |
| Determines the type of file based on its extension. | |
| """ | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext in (".mp3", ".wav", ".m4a"): # Added m4a for common audio | |
| return "audio" | |
| if ext in (".xls", ".xlsx", ".csv", ".json", ".py", ".txt"): # Added .txt for general text files | |
| return "data" | |
| if ext in (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff"): # Added more image types | |
| return "image" | |
| if ext in (".mp4", ".mov", ".avi", ".mkv", ".webm"): # Added more video types | |
| return "video" | |
| return "unknown" | |
| def fetch_task_attachment(task_id: str) -> str: | |
| """ | |
| Fetches an attachment for a given task ID from the scoring server. | |
| The content is stored in the global ATTACHMENTS dictionary. | |
| Returns the task_id if successful, None otherwise. | |
| """ | |
| try: | |
| response = requests.get( | |
| f"{ATTACHMENT_BASE_URL}{task_id}", headers={"User-Agent": USER_AGENT}, timeout=15 | |
| ) | |
| response.raise_for_status() | |
| content_disposition = response.headers.get("content-disposition", "") | |
| # Use re.findall to handle cases where filename might be quoted or not | |
| filename_matches = re.findall(r'filename\*?=(?:UTF-8'')?([^;]+)', content_disposition) | |
| filename = ( | |
| filename_matches[0].strip().strip('"') if filename_matches | |
| else f"{task_id}_attachment" | |
| ) | |
| # Decode URL-encoded characters if necessary (e.g., %20 to space) | |
| filename = requests.utils.unquote(filename) | |
| ATTACHMENTS[task_id] = { | |
| "name": filename, | |
| "content": response.content, | |
| "type": get_file_type(filename), | |
| } | |
| print(f" Downloaded attachment '{filename}' for task {task_id}") | |
| return task_id | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 404: | |
| print(f"No attachment found for task {task_id} (404 Not Found).") | |
| else: | |
| print(f"HTTP error fetching attachment for task {task_id}: {e}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print(f"Network error fetching attachment for task {task_id}: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching attachment for task {task_id}: {e}") | |
| return None | |
| # Alias for compatibility with test harness | |
| def download_gaia_attachment_local(task_id: str) -> str: | |
| """Alias for fetch_task_attachment for compatibility""" | |
| return fetch_task_attachment(task_id) | |
| def get_youtube_transcript(video_url: str) -> str: | |
| """ | |
| Extracts the transcript from a YouTube video URL. | |
| """ | |
| try: | |
| video_id_match = re.search( | |
| r"(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]+)", video_url | |
| ) | |
| if not video_id_match: | |
| print(f"Could not extract video ID from URL: {video_url}") | |
| return "" | |
| video_id = video_id_match.group(1) | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| return " ".join([entry["text"] for entry in transcript]) | |
| except Exception as e: | |
| print(f"Error getting YouTube transcript for {video_url}: {str(e)}") | |
| return "" | |
| def extract_audio_from_video(video_content: bytes) -> bytes: | |
| """Extract audio track from video content using ffmpeg""" | |
| import subprocess | |
| import tempfile | |
| try: | |
| # Create temporary files | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as video_file: | |
| video_file.write(video_content) | |
| video_path = video_file.name | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as audio_file: | |
| audio_path = audio_file.name | |
| # Use ffmpeg to extract audio | |
| subprocess.run([ | |
| 'ffmpeg', '-i', video_path, | |
| '-vn', # No video | |
| '-acodec', 'pcm_s16le', # Audio codec | |
| '-ar', '16000', # Sample rate | |
| '-ac', '1', # Mono | |
| '-y', # Overwrite output | |
| audio_path | |
| ], check=True, capture_output=True) | |
| # Read the extracted audio | |
| with open(audio_path, 'rb') as f: | |
| audio_content = f.read() | |
| # Cleanup | |
| import os | |
| os.unlink(video_path) | |
| os.unlink(audio_path) | |
| return audio_content | |
| except Exception as e: | |
| print(f"Error extracting audio from video: {e}") | |
| return None |