| import base64 |
| import io |
| import json |
| import os |
| import subprocess |
| from email.message import Message |
| from io import StringIO |
| from pathlib import Path |
| from typing import List |
| import av |
| import pandas as pd |
| import requests |
| import yt_dlp |
| from bs4 import BeautifulSoup |
| from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage |
| from langchain_core.tools import tool |
| from langchain_openai import ChatOpenAI |
| from langchain_tavily import TavilyExtract, TavilySearch |
| from pydantic import SecretStr |
|
|
| TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "") |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") |
| YOUTUBE_FRAME_ASSESSMENT_MODEL = os.getenv("YOUTUBE_FRAME_ASSESSMENT_MODEL", "google/gemini-2.5-flash-preview-05-20") |
| YOUTUBE_CONFIRMATION_MODEL = os.getenv("YOUTUBE_CONFIRMATION_MODEL", "google/gemini-2.5-pro-preview") |
|
|
| |
| @tool(parse_docstring=True) |
| def download_file_from_url(url: str, filename_override: str|None = None) -> str: |
| """ |
| Downloads a file from a URL to a directory in the cwd. Prefer to use the filename associated with the URL, but can override if directed to. |
| Filename Logic: |
| 1. If `filename_override` is provided, it is used directly. |
| 2. Otherwise, the filename is extracted from the 'Content-Disposition' HTTP header |
| using Python's `email.message.Message` parser. The result is sanitized. |
| 3. If no filename is provided via override and none can be determined from |
| the header, a ValueError is raised. |
| |
| Args: |
| url: The URL of the file to download. |
| filename_override: Optional. If provided, this exact name is used for the downloaded file. Using the name associated with the URL is recommended (but may require identifying the extension). |
| |
| Returns: |
| The full path to the downloaded file. |
| |
| Raises: |
| requests.exceptions.RequestException: For HTTP errors (e.g., 404, network issues). |
| IOError: If the file cannot be written. |
| ValueError: If no filename can be determined (neither provided via override |
| nor found in Content-Disposition header). |
| """ |
| try: |
| with requests.Session() as session: |
| with session.get(url, stream=True, allow_redirects=True, timeout=30) as response: |
| response.raise_for_status() |
|
|
| final_filename = None |
|
|
| if filename_override: |
| final_filename = filename_override |
| print(f"Using provided filename: {final_filename}") |
| else: |
| content_disposition = response.headers.get('content-disposition') |
| if content_disposition: |
| msg = Message() |
| msg['Content-Disposition'] = content_disposition |
| filename_from_header = msg.get_filename() |
| |
| if filename_from_header: |
| |
| final_filename = os.path.basename(filename_from_header) |
| print(f"Using filename from Content-Disposition: {final_filename}") |
| |
| if not final_filename: |
| raise ValueError( |
| "No filename could be determined. " |
| "None was provided as an override, and it could not be " |
| "extracted from the Content-Disposition header." |
| ) |
| |
| current_dir = Path.cwd() |
| temp_dir = current_dir / "temp_downloads" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| |
| local_filepath = os.path.join(temp_dir, final_filename) |
|
|
| with open(local_filepath, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| if chunk: |
| f.write(chunk) |
| |
| |
| return_str = f"File downloaded successfully. Local File Path: {local_filepath}" |
| return return_str |
|
|
| except requests.exceptions.RequestException as e: |
| print(f"Error during download from {url}: {e}") |
| raise |
| except IOError as e: |
| print(f"Error writing file: {e}") |
| raise |
| |
|
|
| @tool(parse_docstring=True) |
| def basic_web_search(query: str, search_domains: list[str]|None = None) -> str: |
| """ |
| Perform a web search using Tavily. Useful for retrieving relevant URLs and content summaries based on a search query. |
| The content returned by this tool is limited. For more detailed content extraction, use the `extract_url_content` tool. |
| If you would like to limit the search to specific domains, you can pass a comma-separated string of domains (['wikipedia.org', 'example.com']). |
| |
| Args: |
| query (str): The search query to perform. |
| search_domains (None | list[str]): Optional. A list of domains (E.g., ['wikipedia.org', 'example.com']) to restrict the search to. If None, searches across all domains. |
| |
| Returns: |
| str: a json formatted string containing the search results, including titles, content snippets, and URLs. |
| """ |
| search_tool = TavilySearch( |
| api_key=SecretStr(TAVILY_API_KEY), |
| max_results=5, |
| include_raw_content=False, |
| |
| include_domains=search_domains |
| ) |
| |
| results = search_tool.invoke({"query": query}) |
| |
| if results and isinstance(results, dict) and len(results["results"]) > 0: |
| return_dict = { |
| |
| "results": [] |
| } |
| for result in results["results"]: |
| if "title" in result and "content" in result and result['score'] > 0.25: |
| return_dict["results"].append({ |
| "title": result["title"], |
| "url": result["url"], |
| "content": result["content"], |
| }) |
| if len(return_dict["results"]) == 0: |
| return "No results found. If the query is too specific, try a more general search term." |
| return json.dumps(return_dict, indent=2) |
|
|
| else: |
| return "No results found. If the query is too specific, try a more general search term." |
|
|
| @tool(parse_docstring=True) |
| def extract_url_content(url_list: list[str]) -> str: |
| """ |
| Extracts the content from URLs using Tavily's extract tool. |
| This tool is useful for retrieving content from web pages. |
| This tool will most likely be used after a web search to extract content from the URLs returned by the search. |
| |
| Args: |
| url_list (list[str]): The URLs to extract content from. |
| |
| Returns: |
| str: The extracted content or an error message if extraction fails. |
| """ |
| extract_tool = TavilyExtract(api_key=SecretStr(TAVILY_API_KEY)) |
| extract_results = extract_tool.invoke({'urls': url_list}) |
| |
| if extract_results and 'results' in extract_results and len(extract_results['results']) > 0: |
| for i, page_content in enumerate(extract_results['results']): |
| del extract_results['results'][i]['images'] |
| |
| |
| return json.dumps(extract_results['results'], indent=2) |
| else: |
| return f"No content could be extracted from the provided URLs: {url_list}" |
| |
|
|
|
|
| def bs_html_parser(url): |
| response = requests.get(url) |
|
|
| |
| if response.status_code == 200: |
| return BeautifulSoup(response.text, "html.parser") |
| else: |
| return None |
|
|
| def get_table_title(table_tag): |
| """ |
| Extracts a title for a given table tag. |
| It looks for a <caption>, then for the closest preceding <h1>-<h6> tag. |
| """ |
| title = "Untitled Table" |
| |
| |
| caption = table_tag.find('caption') |
| if caption: |
| caption_text = caption.get_text(strip=True) |
| if caption_text: |
| return caption_text |
|
|
| |
| headings = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6'] |
| |
| |
| preceding_headings = table_tag.find_all_previous(headings, limit=1) |
|
|
| if preceding_headings: |
| heading_tag = preceding_headings[0] |
| |
| |
| |
| |
| |
| headline_span = heading_tag.find("span", class_="mw-headline") |
| if headline_span: |
| title_text = headline_span.get_text(strip=True) |
| else: |
| |
| |
| temp_heading_soup = BeautifulSoup(str(heading_tag), 'html.parser') |
| temp_heading_tag = temp_heading_soup.find(heading_tag.name) |
| |
| if temp_heading_tag: |
| |
| for span in temp_heading_tag.find_all("span", class_="mw-editsection"): |
| span.decompose() |
| title_text = temp_heading_tag.get_text(strip=True) |
| else: |
| |
| title_text = heading_tag.get_text(strip=True) |
| |
| if title_text: |
| title = title_text |
| |
| return title |
|
|
| @tool(parse_docstring=True) |
| def wikipedia_reader(url: str) -> str: |
| """ |
| Extracts sections, paragraphs, and tables from a Wikipedia page. |
| |
| Args: |
| url (str): The URL of the Wikipedia page to extract content from. |
| |
| Returns: |
| str: A JSON string containing sections, paragraphs, and tables. |
| """ |
| soup = bs_html_parser(url) |
| if not soup: |
| return "" |
|
|
| def extract_links(soup_obj): |
| links = [] |
| for link in soup_obj.find_all('a', href=True): |
| href = link.get('href') |
| |
| if href and href.startswith("#") and "#cite_" not in href and len(href) > 1: |
| links.append(url+href) |
| |
| |
| |
| return links |
| |
| links = extract_links(soup) |
| |
| def extract_paragraphs(soup_obj): |
| paragraphs_text = [p.get_text(strip=True) for p in soup_obj.find_all("p")] |
| return [p for p in paragraphs_text if p and len(p) > 10] |
| |
| paragraphs = extract_paragraphs(soup) |
| |
| def extract_tables(soup_obj): |
| tables_with_titles = [] |
| for table_tag in soup_obj.find_all("table", {"class": "wikitable"}): |
| title = get_table_title(table_tag) |
| try: |
| |
| table_html_str = str(table_tag) |
| |
| df_list = pd.read_html(StringIO(table_html_str)) |
| if df_list: |
| df = df_list[0] |
| tables_with_titles.append({"title": title, "table_data": df.to_dict(orient='records')}) |
| else: |
| tables_with_titles.append({"title": title, "table_data": None, "error": "pd.read_html returned empty list"}) |
| except Exception as e: |
| |
| tables_with_titles.append({"title": title, "table_data" : None, "error": str(e)}) |
| return tables_with_titles |
| |
| tables = extract_tables(soup) |
|
|
| return_dict = { |
| "sections": links, |
| "paragraphs": paragraphs, |
| "tables": tables |
| } |
| |
| return json.dumps(return_dict, indent=2, ensure_ascii=False) |
|
|
|
|
| |
| |
| class WhisperTranscriber: |
| _instance = None |
|
|
| def __new__(cls): |
| if cls._instance is None: |
| import torch |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor |
| from transformers.pipelines import pipeline |
|
|
| device = "cuda:0" if torch.cuda.is_available() else "cpu" |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
| model_id = "openai/whisper-large-v3" |
|
|
| model = AutoModelForSpeechSeq2Seq.from_pretrained( |
| model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True |
| ) |
| model.to(device) |
|
|
| processor = AutoProcessor.from_pretrained(model_id) |
| pipe = pipeline( |
| "automatic-speech-recognition", |
| model=model, |
| tokenizer=processor.tokenizer, |
| feature_extractor=processor.feature_extractor, |
| torch_dtype=torch_dtype, |
| device=device, |
| ) |
|
|
| cls._instance = pipe |
| return cls._instance |
|
|
|
|
| @tool(parse_docstring=True) |
| def transcribe_audio_file(file_path: str) -> str: |
| """ |
| Transcribes an audio file to text using OpenAI's Whisper-large-v3 model, caching the model after the first load. |
| |
| Args: |
| file_path (str): The path to the audio file to transcribe. |
| |
| Returns: |
| str: The transcription of the audio file. |
| """ |
| pipe = WhisperTranscriber() |
| transcription = pipe(file_path)["text"] |
| return transcription.strip() if transcription else "No transcription available." |
|
|
|
|
| @tool(parse_docstring=True) |
| def question_youtube_video(video_url: str, query: str) -> str: |
| """ |
| Answers a question about a YouTube video. |
| The video is streamed and one frame is captured every x seconds, where x is declared in the environment settings. |
| Captured frames are sent sequentially to a multimodal model to answer the question about the video. |
| The final answer is aggregated from the answers to each frame. |
| DOES NOT USE AUDIO! ONLY FRAMES FROM THE VIDEO ARE USED TO ANSWER THE QUESTION. |
| |
| Args: |
| video_url (str): The URL of the video to capture frames from. |
| query (str): The question to answer about the video. |
| |
| Returns: |
| str: The answer to the question about the video. |
| """ |
| CAPTURE_INTERVAL_SEC = int(os.getenv("CAPTURE_INTERVAL_SEC", 2)) |
|
|
| |
| ydl_opts = { |
| "quiet": True, |
| "skip_download": True, |
| "format": "mp4[ext=mp4]+bestaudio/best", |
| "forceurl": True, |
| "noplaylist": True, |
| "writesubtitles": True, |
| "writeautomaticsub": True, |
| "subtitlesformat": "vtt", |
| "subtitleslangs": ['en'], |
| } |
|
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info_dict = ydl.extract_info(video_url, download=False) |
| assert isinstance(info_dict, dict), "Failed to extract video information. Please check the video URL." |
| stream_url = info_dict.get("url", None) |
| |
| |
| ffmpeg_cmd = [ |
| "ffmpeg", |
| "-i", |
| stream_url, |
| "-f", |
| "matroska", |
| "-", |
| ] |
|
|
| process = subprocess.Popen( |
| ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
|
|
| container = av.open(process.stdout) |
| stream = container.streams.video[0] |
| time_base = stream.time_base |
| if time_base is None: |
| raise ValueError("Could not determine time base for the video stream. Please check the video URL and try again.") |
| else: |
| time_base = float(time_base) |
|
|
| |
| if stream_url is None: |
| raise ValueError("Could not retrieve video stream URL. Please check the video URL and try again.") |
| else: |
| image_model = ChatOpenAI( |
| model="google/gemini-2.5-flash-preview-05-20", |
| api_key=SecretStr(OPENROUTER_API_KEY), |
| base_url="https://openrouter.ai/api/v1", |
| verbose=True |
| ) |
| image_model_system_prompt = SystemMessage( |
| content="You will be shown a frame from a video along with a question about that video and an answer based on the previous frames in the video. "\ |
| "Your task is to analyze the frame and provide an answer to the question using both the current frame and the previous answer. " \ |
| "If the previous answer is reasonable and the current frame can not answer the question return the previous answer. " \ |
| "For example, if the question is about the color of a car and the previous answer is 'red' but the current frame shows no car, you should return 'red'. " \ |
| "If the question is about the greatest number of something in the video, you should return the number counted in the current frame or the previous answer, whichever is greater. " \ |
| "For example, if the current frame has 5 objects but the previous answer is 10 objects, you should return '10'. " \ |
| "Be concise and clear in your answers, and do not repeat the question. " \ |
| ) |
|
|
|
|
| |
| next_capture_time = 0 |
| aggregated_answer = '' |
| response = '' |
| |
| answers_list: List[dict] = [] |
|
|
| for frame in container.decode(stream): |
| if frame.pts is None: |
| continue |
|
|
| timestamp = float(frame.pts * time_base) |
| if CAPTURE_INTERVAL_SEC is None or timestamp >= next_capture_time: |
| |
| buf = io.BytesIO() |
| img = frame.to_image() |
| img.save(buf, format="JPEG") |
| jpeg_bytes = buf.getvalue() |
| frame_base64 = base64.b64encode(jpeg_bytes).decode("utf-8") |
|
|
| |
| msgs: List[BaseMessage] = [image_model_system_prompt] |
|
|
| frame_query = query |
| |
| if aggregated_answer: |
| frame_query += f"\nPrevious Answer: {aggregated_answer}" |
| frame_query += "\nProvide a concise answer based on the previous answer and the current frame. " \ |
| "If the current frame does not answer the question but there is a previous answer, return the previous answer. " \ |
| "REMEMBER: This question is not about the current frame! It is about the video as a whole. ALWAYS PAY ATTENTION TO THE PREVIOUS ANSWER!" |
| |
| msgs.append(HumanMessage(content = [ |
| { |
| "type": "text", |
| "text": frame_query |
| }, |
| { |
| "type": "image", |
| "source_type": "base64", |
| "mime_type": "image/jpeg", |
| "data": frame_base64 |
| } |
| ])) |
|
|
| response = image_model.invoke(msgs) |
| |
| assert isinstance(response.content, str), "The model's response should be a string." |
| answer = response.content.strip() |
| answers_list.append({"timestamp": timestamp, "answer": answer}) |
| if answer: |
| aggregated_answer = answer |
| if CAPTURE_INTERVAL_SEC is not None: |
| next_capture_time += CAPTURE_INTERVAL_SEC |
|
|
| process.terminate() |
|
|
| final_answer_model = ChatOpenAI( |
| model="google/gemini-2.5-pro-preview", |
| api_key=SecretStr(OPENROUTER_API_KEY), |
| base_url="https://openrouter.ai/api/v1", |
| verbose=True |
| ) |
|
|
| final_answer_system_message = SystemMessage( |
| "You are a brilliant assistant who is eager to help and extremely detailed oriented. " \ |
| "A group of individuals have been asked the same question about a video. " \ |
| "None of the individuals have seen the entire video. " \ |
| "Each individual, when asked the question, was provided a frame from the video, as well as the previously reported answer based on the previous frame. " \ |
| "Your job is to report a final answer for the question about the video. " \ |
| "Ideally, the final answer has already been reported correctly by the last individual. " \ |
| "However, this is similar to the game a telephone, where the true answer can become corrupted along the way. " \ |
| "Assess all of the answers. If you can confirm the final answer is correct, simply return it. " \ |
| "If you notice that the final answer is incorrect, then identify the correct answer and report that. " \ |
| "You will also have access to the video title and description, which may help you identify the correct answer. " \ |
| "Be concise and only respond with the correct final answer!" |
| ) |
|
|
| answers_list_str = "\n".join([f"Answer {i+1} at {ans['timestamp']:.2f}s: {ans['answer']}" for i, ans in enumerate(answers_list)]) |
| |
| final_query = ( |
| f"Video Title: {info_dict.get('title', 'No title found')}. " |
| f"Video Description: {info_dict.get('description', 'No description found')}. " |
| f"Question about video: {query} " |
| f"Answers provided by individuals: \n{answers_list_str}\n\n " |
| "Provide a concise final answer to the question about the video based on the previous answers. " |
| "Include a short explanation of why you chose this answer. " |
| "Format the answer like so: " |
| "Explanation: <your explanation here>. " |
| "Final Answer: <your answer here>. " |
| ) |
| |
|
|
| final_msgs = [ |
| final_answer_system_message, |
| HumanMessage(content=[ |
| { |
| "type": "text", |
| "text": final_query |
| } |
| ]) |
| ] |
| final_response = final_answer_model.invoke(final_msgs) |
| assert isinstance(final_response.content, str), "The final model's response should be a string." |
| final_answer = final_response.content.strip() |
| |
| return final_answer |
| |