Hugescoot's picture
Major upgrades: Enhanced web search + multimodal video analysis
e2540d1
import os
import re
import requests
from youtube_transcript_api import YouTubeTranscriptApi
import wikipedia
# Import configuration variables from your config.py
from agent.config import USER_AGENT, ATTACHMENTS, ATTACHMENT_BASE_URL
# Initialize wikipedia with the user agent from config
wikipedia.set_user_agent(USER_AGENT)
# Utility Functions
def extract_final_answer(response_text: str) -> str:
"""
Extracts the final answer from a response string based on a specific template.
"""
match = re.search(r"FINAL ANSWER: ?([^\n\.]+)", response_text)
if match:
return match.group(1).strip()
return "unknown"
def download_file(url: str) -> bytes:
"""
Downloads a file from a given URL.
"""
try:
response = requests.get(url, headers={"User-Agent": USER_AGENT}, timeout=15)
response.raise_for_status() # Raise an exception for HTTP errors
return response.content
except Exception as e:
print(f"Error downloading file from {url}: {str(e)}")
return None
def get_file_type(filename: str) -> str:
"""
Determines the type of file based on its extension.
"""
ext = os.path.splitext(filename)[1].lower()
if ext in (".mp3", ".wav", ".m4a"): # Added m4a for common audio
return "audio"
if ext in (".xls", ".xlsx", ".csv", ".json", ".py", ".txt"): # Added .txt for general text files
return "data"
if ext in (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff"): # Added more image types
return "image"
if ext in (".mp4", ".mov", ".avi", ".mkv", ".webm"): # Added more video types
return "video"
return "unknown"
def fetch_task_attachment(task_id: str) -> str:
"""
Fetches an attachment for a given task ID from the scoring server.
The content is stored in the global ATTACHMENTS dictionary.
Returns the task_id if successful, None otherwise.
"""
try:
response = requests.get(
f"{ATTACHMENT_BASE_URL}{task_id}", headers={"User-Agent": USER_AGENT}, timeout=15
)
response.raise_for_status()
content_disposition = response.headers.get("content-disposition", "")
# Use re.findall to handle cases where filename might be quoted or not
filename_matches = re.findall(r'filename\*?=(?:UTF-8'')?([^;]+)', content_disposition)
filename = (
filename_matches[0].strip().strip('"') if filename_matches
else f"{task_id}_attachment"
)
# Decode URL-encoded characters if necessary (e.g., %20 to space)
filename = requests.utils.unquote(filename)
ATTACHMENTS[task_id] = {
"name": filename,
"content": response.content,
"type": get_file_type(filename),
}
print(f" Downloaded attachment '{filename}' for task {task_id}")
return task_id
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print(f"No attachment found for task {task_id} (404 Not Found).")
else:
print(f"HTTP error fetching attachment for task {task_id}: {e}")
return None
except requests.exceptions.RequestException as e:
print(f"Network error fetching attachment for task {task_id}: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred fetching attachment for task {task_id}: {e}")
return None
# Alias for compatibility with test harness
def download_gaia_attachment_local(task_id: str) -> str:
"""Alias for fetch_task_attachment for compatibility"""
return fetch_task_attachment(task_id)
def get_youtube_transcript(video_url: str) -> str:
"""
Extracts the transcript from a YouTube video URL.
"""
try:
video_id_match = re.search(
r"(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]+)", video_url
)
if not video_id_match:
print(f"Could not extract video ID from URL: {video_url}")
return ""
video_id = video_id_match.group(1)
transcript = YouTubeTranscriptApi.get_transcript(video_id)
return " ".join([entry["text"] for entry in transcript])
except Exception as e:
print(f"Error getting YouTube transcript for {video_url}: {str(e)}")
return ""
def extract_audio_from_video(video_content: bytes) -> bytes:
"""Extract audio track from video content using ffmpeg"""
import subprocess
import tempfile
try:
# Create temporary files
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as video_file:
video_file.write(video_content)
video_path = video_file.name
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as audio_file:
audio_path = audio_file.name
# Use ffmpeg to extract audio
subprocess.run([
'ffmpeg', '-i', video_path,
'-vn', # No video
'-acodec', 'pcm_s16le', # Audio codec
'-ar', '16000', # Sample rate
'-ac', '1', # Mono
'-y', # Overwrite output
audio_path
], check=True, capture_output=True)
# Read the extracted audio
with open(audio_path, 'rb') as f:
audio_content = f.read()
# Cleanup
import os
os.unlink(video_path)
os.unlink(audio_path)
return audio_content
except Exception as e:
print(f"Error extracting audio from video: {e}")
return None