| import os |
| from indexer import indexer |
| import re |
| import urllib.parse |
| from tvdb import fetch_and_cache_json |
| from threading import Event, Thread |
| import asyncio |
| import time |
| import logging |
| from utils import convert_to_gb |
| from api import InstancesAPI |
|
|
| CACHE_DIR = os.getenv("CACHE_DIR") |
|
|
| class LoadBalancer: |
| def __init__(self, cache_dir, token, repo, polling_interval=4, max_retries=3, initial_delay=1): |
| self.version = "0.0.0.8 Alpha Debug" |
| self.instances = [] |
| self.instances_health = {} |
| self.polling_interval = polling_interval |
| self.max_retries = max_retries |
| self.initial_delay = initial_delay |
| self.stop_event = Event() |
| self.instances_api = InstancesAPI(self.instances) |
| self.CACHE_DIR = cache_dir |
| self.TOKEN = token |
| self.REPO = repo |
| self.FILM_STORE = {} |
| self.TV_STORE = {} |
| self.file_structure = None |
| self.previous_file_structure = None |
|
|
| |
| self.cached_films = [] |
| self.cached_tv_shows = {} |
|
|
| |
| if not os.path.exists(self.CACHE_DIR): |
| os.makedirs(self.CACHE_DIR) |
|
|
| |
| self.file_structure = indexer() |
| self.update_media_cache() |
| self.start_prefetching() |
|
|
| |
| polling_thread = Thread(target=self.start_polling) |
| polling_thread.daemon = True |
| polling_thread.start() |
|
|
| |
| asyncio.create_task(self.run_periodic_tasks()) |
|
|
| def update_media_cache(self): |
| """Update the cached films and TV shows from the current file_structure. |
| Only update if the new data is non-empty, preserving the last valid cache. |
| """ |
| new_films = [] |
| new_tv_shows = {} |
|
|
| for directory in self.file_structure: |
| if directory.get('type') == 'directory': |
| if directory.get('path') == 'films': |
| for sub_directory in directory.get('contents', []): |
| if sub_directory.get('type') == 'directory': |
| new_films.append(sub_directory.get('path')) |
| elif directory.get('path') == 'tv': |
| for sub_directory in directory.get('contents', []): |
| if sub_directory.get('type') == 'directory': |
| show_title = sub_directory.get('path').split('/')[-1] |
| episodes_list = [] |
| for season_directory in sub_directory.get('contents', []): |
| if season_directory.get('type') == 'directory': |
| season = season_directory.get('path').split('/')[-1] |
| for episode in season_directory.get('contents', []): |
| if episode.get('type') == 'file': |
| episodes_list.append({ |
| "season": season, |
| "episode": episode.get('path').split('/')[-1], |
| "path": episode.get('path') |
| }) |
| if episodes_list: |
| new_tv_shows[show_title] = episodes_list |
|
|
| if new_films: |
| self.cached_films = new_films |
| if new_tv_shows: |
| self.cached_tv_shows = new_tv_shows |
|
|
| async def run_periodic_tasks(self): |
| """Run indexer and prefetch functions every 5 minutes.""" |
| while not self.stop_event.is_set(): |
| new_file_structure = indexer() |
| |
| if new_file_structure: |
| self.file_structure = new_file_structure |
| self.update_media_cache() |
| await self.start_prefetching() |
| await asyncio.sleep(300) |
|
|
| def start_prefetching(self): |
| """Start the metadata prefetching in the FastAPI event loop.""" |
| return asyncio.create_task(self.prefetch_metadata()) |
|
|
| async def prefetch_metadata(self): |
| """Prefetch metadata for all items in the file structure.""" |
| tasks = [] |
| for item in self.file_structure: |
| if 'contents' in item: |
| for sub_item in item['contents']: |
| original_title = sub_item['path'].split('/')[-1] |
| media_type = 'series' if item['path'].startswith('tv') else 'movie' |
| title = original_title |
| year = None |
|
|
| |
| match = re.search(r'\((\d{4})\)', original_title) |
| if match: |
| year_str = match.group(1) |
| if year_str.isdigit() and len(year_str) == 4: |
| title = original_title[:match.start()].strip() |
| year = int(year_str) |
| else: |
| parts = original_title.rsplit(' ', 1) |
| if len(parts) > 1 and parts[-1].isdigit() and len(parts[-1]) == 4: |
| title = parts[0].strip() |
| year = int(parts[-1]) |
|
|
| |
| json_cache_path = os.path.join(self.CACHE_DIR, f"{urllib.parse.quote(original_title)}.json") |
| if not os.path.exists(json_cache_path): |
| tasks.append(fetch_and_cache_json(original_title, title, media_type, year)) |
| else: |
| logging.info(f"Skipping.. {original_title} metadata already cached") |
| |
| |
| await asyncio.gather(*tasks) |
|
|
| def get_reports(self): |
| reports = self.instances_api.fetch_reports() |
| temp_film_store = {} |
| temp_tv_store = {} |
|
|
| for instance_url in self.instances[:]: |
| if instance_url in reports: |
| report = reports[instance_url] |
| logging.info(f"Report from {instance_url}: {report}") |
| self.process_report(instance_url, report, temp_film_store, temp_tv_store) |
| else: |
| logging.error(f"Failed to get report from {instance_url}. Removing instance.") |
| self.remove_instance(instance_url) |
|
|
| self.FILM_STORE = temp_film_store |
| self.TV_STORE = temp_tv_store |
|
|
| def process_report(self, instance_url, report, temp_film_store, temp_tv_store): |
| film_store = report.get('film_store', {}) |
| tv_store = report.get('tv_store', {}) |
| cache_size = report.get('cache_size') |
|
|
| logging.info(f"Processing report from {instance_url}") |
|
|
| |
| for title, path in film_store.items(): |
| url = f"{instance_url}/api/get/film/{title.replace(' ', '%20')}" |
| temp_film_store[title] = url |
|
|
| |
| for title, seasons in tv_store.items(): |
| if title not in temp_tv_store: |
| temp_tv_store[title] = {} |
| for season, episodes in seasons.items(): |
| if season not in temp_tv_store[title]: |
| temp_tv_store[title][season] = {} |
| for episode, path in episodes.items(): |
| url = f"{instance_url}/api/get/tv/{title.replace(' ', '%20')}/{season.replace(' ', '%20')}/{episode.replace(' ', '%20')}" |
| temp_tv_store[title][season][episode] = url |
|
|
| logging.info("Film and TV Stores processed successfully.") |
| self.update_instances_health(instance=instance_url, cache_size=cache_size) |
|
|
| def start_polling(self): |
| logging.info("Starting polling.") |
| while not self.stop_event.is_set(): |
| self.get_reports() |
| time.sleep(self.polling_interval) |
| logging.info("Polling stopped.") |
|
|
| def stop_polling(self): |
| logging.info("Stopping polling.") |
| self.stop_event.set() |
|
|
| def register_instance(self, instance_url): |
| if instance_url not in self.instances: |
| self.instances.append(instance_url) |
| logging.info(f"Registered instance {instance_url}") |
| else: |
| logging.info(f"Instance {instance_url} is already registered.") |
|
|
| def remove_instance(self, instance_url): |
| if instance_url in self.instances: |
| self.instances.remove(instance_url) |
| self.instances_health.pop(instance_url, None) |
| logging.info(f"Removed instance {instance_url}") |
| else: |
| logging.info(f"Instance {instance_url} not found for removal.") |
|
|
| def update_instances_health(self, instance, cache_size): |
| self.instances_health[instance] = {"used": cache_size["cache_size"], |
| "total": "50 GB"} |
| logging.info(f"Updated instance {instance} with cache size {cache_size}") |
|
|
| def download_film_to_best_instance(self, title): |
| """ |
| Downloads a film to the first instance that has more free space on the self.instances_health list. |
| """ |
| best_instance = None |
| max_free_space = -1 |
| |
| |
| for instance_url, space_info in self.instances_health.items(): |
| total_space = convert_to_gb(space_info['total']) |
| used_space = convert_to_gb(space_info['used']) |
| free_space = total_space - used_space |
| |
| if free_space > max_free_space: |
| max_free_space = free_space |
| best_instance = instance_url |
| |
| if best_instance: |
| result = self.instances_api.download_film(best_instance, title) |
| film_id = result["film_id"] |
| status = result["status"] |
| progress_url = f'{best_instance}/api/get/progress/{film_id}' |
| response = { |
| "film_id": film_id, |
| "status": status, |
| "progress_url": progress_url |
| } |
| return response |
| else: |
| logging.error("No suitable instance found for downloading the film.") |
| return {"error": "No suitable instance found for downloading the film."} |
|
|
| def download_episode_to_best_instance(self, title, season, episode): |
| """ |
| Downloads an episode to the first instance that has more free space on the self.instances_health list. |
| """ |
| best_instance = None |
| max_free_space = -1 |
| |
| |
| for instance_url, space_info in self.instances_health.items(): |
| total_space = convert_to_gb(space_info['total']) |
| used_space = convert_to_gb(space_info['used']) |
| free_space = total_space - used_space |
| |
| if free_space > max_free_space: |
| max_free_space = free_space |
| best_instance = instance_url |
| |
| if best_instance: |
| result = self.instances_api.download_episode(best_instance, title, season, episode) |
| episode_id = result["episode_id"] |
| status = result["status"] |
| progress_url = f'{best_instance}/api/get/progress/{episode_id}' |
| response = { |
| "episode_id": episode_id, |
| "status": status, |
| "progress_url": progress_url |
| } |
| return response |
| else: |
| logging.error("No suitable instance found for downloading the film.") |
| return {"error": "No suitable instance found for downloading the film."} |
|
|
| def find_movie_path(self, title): |
| """Find the path of the movie in the JSON data based on the title.""" |
| for directory in self.file_structure: |
| if directory.get('type') == 'directory' and directory.get('path') == 'films': |
| for sub_directory in directory.get('contents', []): |
| if sub_directory.get('type') == 'directory': |
| for item in sub_directory.get('contents', []): |
| if item.get('type') == 'file' and title.lower() in item.get('path').lower(): |
| return item.get('path') |
| return None |
|
|
| def find_tv_path(self, title): |
| """Find the path of the TV show in the JSON data based on the title.""" |
| for directory in self.file_structure: |
| if directory.get('type') == 'directory' and directory.get('path') == 'tv': |
| for sub_directory in directory.get('contents', []): |
| if sub_directory.get('type') == 'directory' and title.lower() in sub_directory.get('path').lower(): |
| return sub_directory.get('path') |
| return None |
|
|
| def get_tv_structure(self, title): |
| """Find the path of the TV show in the JSON data based on the title.""" |
| for directory in self.file_structure: |
| if directory.get('type') == 'directory' and directory.get('path') == 'tv': |
| for sub_directory in directory.get('contents', []): |
| if sub_directory.get('type') == 'directory' and title.lower() in sub_directory.get('path').lower(): |
| return sub_directory |
| return None |
|
|
| def get_film_id(self, title): |
| """Generate a film ID based on the title.""" |
| return title.replace(" ", "_").lower() |
|
|
| def get_all_tv_shows(self): |
| """Return the cached TV shows.""" |
| return self.cached_tv_shows |
|
|
| def get_all_films(self): |
| """Return the cached films.""" |
| return self.cached_films |
|
|