Spaces:
Runtime error
Runtime error
| """ | |
| Utility Functions and Constants | |
| Helper functions for file streaming and chunk management | |
| """ | |
| import uuid | |
| from typing import Tuple | |
| # Constants | |
| CHUNK_SIZE = 1024 * 1024 # 1MB chunks for streaming | |
| MAX_PART_SIZE = 2000 * 1024 * 1024 # 2000MB (2GB) per part for safety | |
| TELEGRAM_FILE_LIMIT = 2048 * 1024 * 1024 # Telegram's 2GB limit | |
| def generate_unique_id() -> str: | |
| """Generate a unique identifier for files""" | |
| return uuid.uuid4().hex[:16] | |
| def calculate_part_and_offset( | |
| byte_position: int, | |
| parts_info: list | |
| ) -> Tuple[int, int, int]: | |
| """ | |
| Calculate which Telegram part contains a byte position | |
| and the offset within that part | |
| Args: | |
| byte_position: Absolute byte position in the file | |
| parts_info: List of dicts with 'part_number', 'size', 'file_id' | |
| Returns: | |
| Tuple of (part_index, offset_in_part, part_file_id) | |
| """ | |
| current_position = 0 | |
| for i, part in enumerate(parts_info): | |
| part_start = current_position | |
| part_end = current_position + part["size"] | |
| if byte_position >= part_start and byte_position < part_end: | |
| offset = byte_position - part_start | |
| return (i, offset, part["file_id"]) | |
| current_position += part["size"] | |
| # If we reach here, position is beyond file size | |
| raise ValueError(f"Byte position {byte_position} exceeds file size") | |
| def format_size(size_bytes: int) -> str: | |
| """Format byte size to human-readable string""" | |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: | |
| if size_bytes < 1024.0: | |
| return f"{size_bytes:.2f} {unit}" | |
| size_bytes /= 1024.0 | |
| return f"{size_bytes:.2f} PB" | |
| def validate_range(start: int, end: int, total_size: int) -> bool: | |
| """Validate HTTP range request parameters""" | |
| if start < 0 or end < 0: | |
| return False | |
| if start > end: | |
| return False | |
| if end >= total_size: | |
| return False | |
| return True | |
| def split_into_parts(total_size: int, part_size: int = MAX_PART_SIZE) -> list: | |
| """ | |
| Calculate how a file should be split into parts | |
| Args: | |
| total_size: Total file size in bytes | |
| part_size: Maximum size per part | |
| Returns: | |
| List of tuples (part_number, start_byte, end_byte) | |
| """ | |
| parts = [] | |
| current_position = 0 | |
| part_number = 1 | |
| while current_position < total_size: | |
| end_position = min(current_position + part_size, total_size) | |
| parts.append((part_number, current_position, end_position)) | |
| current_position = end_position | |
| part_number += 1 | |
| return parts | |
| class CircularBuffer: | |
| """Circular buffer for efficient memory management""" | |
| def __init__(self, size: int): | |
| self.size = size | |
| self.buffer = bytearray(size) | |
| self.write_pos = 0 | |
| self.read_pos = 0 | |
| self.available = 0 | |
| def write(self, data: bytes) -> int: | |
| """Write data to buffer, returns bytes written""" | |
| space = self.size - self.available | |
| to_write = min(len(data), space) | |
| if to_write == 0: | |
| return 0 | |
| # Handle wrap-around | |
| end_pos = self.write_pos + to_write | |
| if end_pos <= self.size: | |
| self.buffer[self.write_pos:end_pos] = data[:to_write] | |
| else: | |
| first_part = self.size - self.write_pos | |
| self.buffer[self.write_pos:] = data[:first_part] | |
| self.buffer[:to_write - first_part] = data[first_part:to_write] | |
| self.write_pos = (self.write_pos + to_write) % self.size | |
| self.available += to_write | |
| return to_write | |
| def read(self, length: int) -> bytes: | |
| """Read data from buffer""" | |
| to_read = min(length, self.available) | |
| if to_read == 0: | |
| return b'' | |
| # Handle wrap-around | |
| end_pos = self.read_pos + to_read | |
| if end_pos <= self.size: | |
| data = bytes(self.buffer[self.read_pos:end_pos]) | |
| else: | |
| first_part = self.size - self.read_pos | |
| data = ( | |
| bytes(self.buffer[self.read_pos:]) + | |
| bytes(self.buffer[:to_read - first_part]) | |
| ) | |
| self.read_pos = (self.read_pos + to_read) % self.size | |
| self.available -= to_read | |
| return data | |
| def is_full(self) -> bool: | |
| """Check if buffer is full""" | |
| return self.available == self.size | |
| def is_empty(self) -> bool: | |
| """Check if buffer is empty""" | |
| return self.available == 0 | |
| def clear(self): | |
| """Clear the buffer""" | |
| self.write_pos = 0 | |
| self.read_pos = 0 | |
| self.available = 0 | |