MoneyPrinterV2 / src /utils.py
SeaWolf-AI's picture
Deploy MoneyPrinterV2 YouTube Shorts Generator to HF Spaces
a8fdab7 verified
import os
import random
import zipfile
import requests
import platform
from status import *
from config import *
DEFAULT_SONG_ARCHIVE_URLS = []
def close_running_selenium_instances() -> None:
"""
Closes any running Selenium instances.
Returns:
None
"""
try:
info(" => Closing running Selenium instances...")
# Kill all running Firefox instances
if platform.system() == "Windows":
os.system("taskkill /f /im firefox.exe")
else:
os.system("pkill firefox")
success(" => Closed running Selenium instances.")
except Exception as e:
error(f"Error occurred while closing running Selenium instances: {str(e)}")
def build_url(youtube_video_id: str) -> str:
"""
Builds the URL to the YouTube video.
Args:
youtube_video_id (str): The YouTube video ID.
Returns:
url (str): The URL to the YouTube video.
"""
return f"https://www.youtube.com/watch?v={youtube_video_id}"
def rem_temp_files() -> None:
"""
Removes temporary files in the `.mp` directory.
Returns:
None
"""
# Path to the `.mp` directory
mp_dir = os.path.join(ROOT_DIR, ".mp")
files = os.listdir(mp_dir)
for file in files:
if not file.endswith(".json"):
os.remove(os.path.join(mp_dir, file))
def fetch_songs() -> None:
"""
Downloads songs into songs/ directory to use with geneated videos.
Returns:
None
"""
try:
info(f" => Fetching songs...")
files_dir = os.path.join(ROOT_DIR, "Songs")
if not os.path.exists(files_dir):
os.mkdir(files_dir)
if get_verbose():
info(f" => Created directory: {files_dir}")
else:
existing_audio_files = [
name
for name in os.listdir(files_dir)
if os.path.isfile(os.path.join(files_dir, name))
and name.lower().endswith((".mp3", ".wav", ".m4a", ".aac", ".ogg"))
]
if len(existing_audio_files) > 0:
return
configured_url = get_zip_url().strip()
download_urls = [configured_url] if configured_url else []
download_urls.extend(DEFAULT_SONG_ARCHIVE_URLS)
archive_path = os.path.join(files_dir, "songs.zip")
downloaded = False
for download_url in download_urls:
try:
response = requests.get(download_url, timeout=60)
response.raise_for_status()
with open(archive_path, "wb") as file:
file.write(response.content)
SAFE_EXTENSIONS = (".mp3", ".wav", ".m4a", ".aac", ".ogg", ".flac")
with zipfile.ZipFile(archive_path, "r") as zf:
for member in zf.namelist():
basename = os.path.basename(member)
if not basename or not basename.lower().endswith(SAFE_EXTENSIONS):
warning(f"Skipping non-audio file in archive: {member}")
continue
if ".." in member or member.startswith("/"):
warning(f"Skipping suspicious path in archive: {member}")
continue
zf.extract(member, files_dir)
downloaded = True
break
except Exception as err:
warning(f"Failed to fetch songs from {download_url}: {err}")
if not downloaded:
raise RuntimeError(
"Could not download a valid songs archive from any configured URL"
)
# Remove the zip file
if os.path.exists(archive_path):
os.remove(archive_path)
success(" => Downloaded Songs to ../Songs.")
except Exception as e:
error(f"Error occurred while fetching songs: {str(e)}")
def choose_random_song() -> str:
"""
Chooses a random song from the songs/ directory.
Returns:
str: The path to the chosen song.
"""
try:
songs_dir = os.path.join(ROOT_DIR, "Songs")
songs = [
name
for name in os.listdir(songs_dir)
if os.path.isfile(os.path.join(songs_dir, name))
and name.lower().endswith((".mp3", ".wav", ".m4a", ".aac", ".ogg"))
]
if len(songs) == 0:
raise RuntimeError("No audio files found in Songs directory")
song = random.choice(songs)
success(f" => Chose song: {song}")
return os.path.join(ROOT_DIR, "Songs", song)
except Exception as e:
error(f"Error occurred while choosing random song: {str(e)}")
raise