Spaces:
Sleeping
Sleeping
File size: 5,593 Bytes
1b07c47 5087885 1b07c47 e2540d1 1b07c47 e2540d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | import os
import re
import requests
from youtube_transcript_api import YouTubeTranscriptApi
import wikipedia
# Import configuration variables from your config.py
from agent.config import USER_AGENT, ATTACHMENTS, ATTACHMENT_BASE_URL
# Initialize wikipedia with the user agent from config
wikipedia.set_user_agent(USER_AGENT)
# Utility Functions
def extract_final_answer(response_text: str) -> str:
"""
Extracts the final answer from a response string based on a specific template.
"""
match = re.search(r"FINAL ANSWER: ?([^\n\.]+)", response_text)
if match:
return match.group(1).strip()
return "unknown"
def download_file(url: str) -> bytes:
"""
Downloads a file from a given URL.
"""
try:
response = requests.get(url, headers={"User-Agent": USER_AGENT}, timeout=15)
response.raise_for_status() # Raise an exception for HTTP errors
return response.content
except Exception as e:
print(f"Error downloading file from {url}: {str(e)}")
return None
def get_file_type(filename: str) -> str:
"""
Determines the type of file based on its extension.
"""
ext = os.path.splitext(filename)[1].lower()
if ext in (".mp3", ".wav", ".m4a"): # Added m4a for common audio
return "audio"
if ext in (".xls", ".xlsx", ".csv", ".json", ".py", ".txt"): # Added .txt for general text files
return "data"
if ext in (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff"): # Added more image types
return "image"
if ext in (".mp4", ".mov", ".avi", ".mkv", ".webm"): # Added more video types
return "video"
return "unknown"
def fetch_task_attachment(task_id: str) -> str:
"""
Fetches an attachment for a given task ID from the scoring server.
The content is stored in the global ATTACHMENTS dictionary.
Returns the task_id if successful, None otherwise.
"""
try:
response = requests.get(
f"{ATTACHMENT_BASE_URL}{task_id}", headers={"User-Agent": USER_AGENT}, timeout=15
)
response.raise_for_status()
content_disposition = response.headers.get("content-disposition", "")
# Use re.findall to handle cases where filename might be quoted or not
filename_matches = re.findall(r'filename\*?=(?:UTF-8'')?([^;]+)', content_disposition)
filename = (
filename_matches[0].strip().strip('"') if filename_matches
else f"{task_id}_attachment"
)
# Decode URL-encoded characters if necessary (e.g., %20 to space)
filename = requests.utils.unquote(filename)
ATTACHMENTS[task_id] = {
"name": filename,
"content": response.content,
"type": get_file_type(filename),
}
print(f" Downloaded attachment '{filename}' for task {task_id}")
return task_id
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print(f"No attachment found for task {task_id} (404 Not Found).")
else:
print(f"HTTP error fetching attachment for task {task_id}: {e}")
return None
except requests.exceptions.RequestException as e:
print(f"Network error fetching attachment for task {task_id}: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred fetching attachment for task {task_id}: {e}")
return None
# Alias for compatibility with test harness
def download_gaia_attachment_local(task_id: str) -> str:
"""Alias for fetch_task_attachment for compatibility"""
return fetch_task_attachment(task_id)
def get_youtube_transcript(video_url: str) -> str:
"""
Extracts the transcript from a YouTube video URL.
"""
try:
video_id_match = re.search(
r"(?:youtube\.com/watch\?v=|youtu\.be/)([a-zA-Z0-9_-]+)", video_url
)
if not video_id_match:
print(f"Could not extract video ID from URL: {video_url}")
return ""
video_id = video_id_match.group(1)
transcript = YouTubeTranscriptApi.get_transcript(video_id)
return " ".join([entry["text"] for entry in transcript])
except Exception as e:
print(f"Error getting YouTube transcript for {video_url}: {str(e)}")
return ""
def extract_audio_from_video(video_content: bytes) -> bytes:
"""Extract audio track from video content using ffmpeg"""
import subprocess
import tempfile
try:
# Create temporary files
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as video_file:
video_file.write(video_content)
video_path = video_file.name
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as audio_file:
audio_path = audio_file.name
# Use ffmpeg to extract audio
subprocess.run([
'ffmpeg', '-i', video_path,
'-vn', # No video
'-acodec', 'pcm_s16le', # Audio codec
'-ar', '16000', # Sample rate
'-ac', '1', # Mono
'-y', # Overwrite output
audio_path
], check=True, capture_output=True)
# Read the extracted audio
with open(audio_path, 'rb') as f:
audio_content = f.read()
# Cleanup
import os
os.unlink(video_path)
os.unlink(audio_path)
return audio_content
except Exception as e:
print(f"Error extracting audio from video: {e}")
return None |