Spaces:
Running
Running
| import os | |
| import random | |
| import zipfile | |
| import requests | |
| import platform | |
| from status import * | |
| from config import * | |
| DEFAULT_SONG_ARCHIVE_URLS = [] | |
| def close_running_selenium_instances() -> None: | |
| """ | |
| Closes any running Selenium instances. | |
| Returns: | |
| None | |
| """ | |
| try: | |
| info(" => Closing running Selenium instances...") | |
| # Kill all running Firefox instances | |
| if platform.system() == "Windows": | |
| os.system("taskkill /f /im firefox.exe") | |
| else: | |
| os.system("pkill firefox") | |
| success(" => Closed running Selenium instances.") | |
| except Exception as e: | |
| error(f"Error occurred while closing running Selenium instances: {str(e)}") | |
| def build_url(youtube_video_id: str) -> str: | |
| """ | |
| Builds the URL to the YouTube video. | |
| Args: | |
| youtube_video_id (str): The YouTube video ID. | |
| Returns: | |
| url (str): The URL to the YouTube video. | |
| """ | |
| return f"https://www.youtube.com/watch?v={youtube_video_id}" | |
| def rem_temp_files() -> None: | |
| """ | |
| Removes temporary files in the `.mp` directory. | |
| Returns: | |
| None | |
| """ | |
| # Path to the `.mp` directory | |
| mp_dir = os.path.join(ROOT_DIR, ".mp") | |
| files = os.listdir(mp_dir) | |
| for file in files: | |
| if not file.endswith(".json"): | |
| os.remove(os.path.join(mp_dir, file)) | |
| def fetch_songs() -> None: | |
| """ | |
| Downloads songs into songs/ directory to use with geneated videos. | |
| Returns: | |
| None | |
| """ | |
| try: | |
| info(f" => Fetching songs...") | |
| files_dir = os.path.join(ROOT_DIR, "Songs") | |
| if not os.path.exists(files_dir): | |
| os.mkdir(files_dir) | |
| if get_verbose(): | |
| info(f" => Created directory: {files_dir}") | |
| else: | |
| existing_audio_files = [ | |
| name | |
| for name in os.listdir(files_dir) | |
| if os.path.isfile(os.path.join(files_dir, name)) | |
| and name.lower().endswith((".mp3", ".wav", ".m4a", ".aac", ".ogg")) | |
| ] | |
| if len(existing_audio_files) > 0: | |
| return | |
| configured_url = get_zip_url().strip() | |
| download_urls = [configured_url] if configured_url else [] | |
| download_urls.extend(DEFAULT_SONG_ARCHIVE_URLS) | |
| archive_path = os.path.join(files_dir, "songs.zip") | |
| downloaded = False | |
| for download_url in download_urls: | |
| try: | |
| response = requests.get(download_url, timeout=60) | |
| response.raise_for_status() | |
| with open(archive_path, "wb") as file: | |
| file.write(response.content) | |
| SAFE_EXTENSIONS = (".mp3", ".wav", ".m4a", ".aac", ".ogg", ".flac") | |
| with zipfile.ZipFile(archive_path, "r") as zf: | |
| for member in zf.namelist(): | |
| basename = os.path.basename(member) | |
| if not basename or not basename.lower().endswith(SAFE_EXTENSIONS): | |
| warning(f"Skipping non-audio file in archive: {member}") | |
| continue | |
| if ".." in member or member.startswith("/"): | |
| warning(f"Skipping suspicious path in archive: {member}") | |
| continue | |
| zf.extract(member, files_dir) | |
| downloaded = True | |
| break | |
| except Exception as err: | |
| warning(f"Failed to fetch songs from {download_url}: {err}") | |
| if not downloaded: | |
| raise RuntimeError( | |
| "Could not download a valid songs archive from any configured URL" | |
| ) | |
| # Remove the zip file | |
| if os.path.exists(archive_path): | |
| os.remove(archive_path) | |
| success(" => Downloaded Songs to ../Songs.") | |
| except Exception as e: | |
| error(f"Error occurred while fetching songs: {str(e)}") | |
| def choose_random_song() -> str: | |
| """ | |
| Chooses a random song from the songs/ directory. | |
| Returns: | |
| str: The path to the chosen song. | |
| """ | |
| try: | |
| songs_dir = os.path.join(ROOT_DIR, "Songs") | |
| songs = [ | |
| name | |
| for name in os.listdir(songs_dir) | |
| if os.path.isfile(os.path.join(songs_dir, name)) | |
| and name.lower().endswith((".mp3", ".wav", ".m4a", ".aac", ".ogg")) | |
| ] | |
| if len(songs) == 0: | |
| raise RuntimeError("No audio files found in Songs directory") | |
| song = random.choice(songs) | |
| success(f" => Chose song: {song}") | |
| return os.path.join(ROOT_DIR, "Songs", song) | |
| except Exception as e: | |
| error(f"Error occurred while choosing random song: {str(e)}") | |
| raise | |