| import aiohttp |
| import asyncio |
| from itertools import chain |
|
|
|
|
| class VideoGenerator: |
| def __init__(self): |
| self.base_urls = [f"https://yakova-depthflow-{i}.hf.space" for i in range(10)] |
| self.headers = {"accept": "application/json"} |
| self.default_params = { |
| "frame_rate": 30, |
| "duration": 3, |
| "quality": 1, |
| "ssaa": 0.8, |
| "raw": "false", |
| } |
|
|
| async def generate_video(self, base_url, params): |
| url = f"{base_url}/generate_video" |
|
|
| async with aiohttp.ClientSession() as session: |
| async with session.post( |
| url, params=params, headers=self.headers |
| ) as response: |
| if response.status == 200: |
| data = await response.json() |
| output_file = data.get("output_file") |
| return output_file |
| else: |
| print(f"Request to {url} failed with status: {response.status}") |
| return None |
|
|
| async def check_video_ready(self, base_url, output_file): |
| url = f"{base_url}/download/{output_file}" |
|
|
| async with aiohttp.ClientSession() as session: |
| while True: |
| async with session.get(url, headers=self.headers) as response: |
| if response.status == 200: |
| video_content = await response.read() |
| if len(video_content) > 0: |
| return url |
| else: |
| print( |
| f"Video {output_file} is ready but the file size is zero, retrying in 10 seconds..." |
| ) |
| await asyncio.sleep(10) |
| elif response.status == 404: |
| data = await response.json() |
| if data.get("detail") == "Video not found": |
| print( |
| f"Video {output_file} not ready yet, retrying in 10 seconds..." |
| ) |
| await asyncio.sleep(180) |
| else: |
| print(f"Unexpected response for {output_file}: {data}") |
| return None |
| else: |
| print(f"Request to {url} failed with status: {response.status}") |
| return None |
|
|
| async def process_image(self, base_url, image_link): |
| params = self.default_params.copy() |
| params["image_link"] = image_link |
|
|
| output_file = await self.generate_video(base_url, params) |
| if output_file: |
| print(f"Generated video file id: {output_file} for image {image_link}") |
| video_url = await self.check_video_ready(base_url, output_file) |
| if video_url: |
| print( |
| f"Video for {image_link} is ready and can be downloaded from: {video_url}" |
| ) |
| return video_url |
| else: |
| print(f"Failed to get the video URL for {image_link}") |
| return None |
| else: |
| print(f"Failed to generate the video for {image_link}") |
| return None |
|
|
| def flatten(self, nested_list): |
| return list(chain.from_iterable(nested_list)) |
|
|
| def nest(self, flat_list, nested_dims): |
| it = iter(flat_list) |
| return [[next(it) for _ in inner_list] for inner_list in nested_dims] |
|
|
| async def run(self, nested_image_links): |
| flat_image_links = self.flatten(nested_image_links) |
| tasks = [] |
| base_index = 0 |
|
|
| for image_link in flat_image_links: |
| base_url = self.base_urls[base_index % len(self.base_urls)] |
| tasks.append(self.process_image(base_url, image_link)) |
| base_index += 1 |
|
|
| flat_video_urls = await asyncio.gather(*tasks) |
| nested_video_urls = self.nest(flat_video_urls, nested_image_links) |
| return nested_video_urls |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
|
|