""" Utility Functions and Constants Helper functions for file streaming and chunk management """ import uuid from typing import Tuple # Constants CHUNK_SIZE = 1024 * 1024 # 1MB chunks for streaming MAX_PART_SIZE = 2000 * 1024 * 1024 # 2000MB (2GB) per part for safety TELEGRAM_FILE_LIMIT = 2048 * 1024 * 1024 # Telegram's 2GB limit def generate_unique_id() -> str: """Generate a unique identifier for files""" return uuid.uuid4().hex[:16] def calculate_part_and_offset( byte_position: int, parts_info: list ) -> Tuple[int, int, int]: """ Calculate which Telegram part contains a byte position and the offset within that part Args: byte_position: Absolute byte position in the file parts_info: List of dicts with 'part_number', 'size', 'file_id' Returns: Tuple of (part_index, offset_in_part, part_file_id) """ current_position = 0 for i, part in enumerate(parts_info): part_start = current_position part_end = current_position + part["size"] if byte_position >= part_start and byte_position < part_end: offset = byte_position - part_start return (i, offset, part["file_id"]) current_position += part["size"] # If we reach here, position is beyond file size raise ValueError(f"Byte position {byte_position} exceeds file size") def format_size(size_bytes: int) -> str: """Format byte size to human-readable string""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} PB" def validate_range(start: int, end: int, total_size: int) -> bool: """Validate HTTP range request parameters""" if start < 0 or end < 0: return False if start > end: return False if end >= total_size: return False return True def split_into_parts(total_size: int, part_size: int = MAX_PART_SIZE) -> list: """ Calculate how a file should be split into parts Args: total_size: Total file size in bytes part_size: Maximum size per part Returns: List of tuples (part_number, start_byte, end_byte) """ parts = [] current_position = 0 part_number = 1 while current_position < total_size: end_position = min(current_position + part_size, total_size) parts.append((part_number, current_position, end_position)) current_position = end_position part_number += 1 return parts class CircularBuffer: """Circular buffer for efficient memory management""" def __init__(self, size: int): self.size = size self.buffer = bytearray(size) self.write_pos = 0 self.read_pos = 0 self.available = 0 def write(self, data: bytes) -> int: """Write data to buffer, returns bytes written""" space = self.size - self.available to_write = min(len(data), space) if to_write == 0: return 0 # Handle wrap-around end_pos = self.write_pos + to_write if end_pos <= self.size: self.buffer[self.write_pos:end_pos] = data[:to_write] else: first_part = self.size - self.write_pos self.buffer[self.write_pos:] = data[:first_part] self.buffer[:to_write - first_part] = data[first_part:to_write] self.write_pos = (self.write_pos + to_write) % self.size self.available += to_write return to_write def read(self, length: int) -> bytes: """Read data from buffer""" to_read = min(length, self.available) if to_read == 0: return b'' # Handle wrap-around end_pos = self.read_pos + to_read if end_pos <= self.size: data = bytes(self.buffer[self.read_pos:end_pos]) else: first_part = self.size - self.read_pos data = ( bytes(self.buffer[self.read_pos:]) + bytes(self.buffer[:to_read - first_part]) ) self.read_pos = (self.read_pos + to_read) % self.size self.available -= to_read return data def is_full(self) -> bool: """Check if buffer is full""" return self.available == self.size def is_empty(self) -> bool: """Check if buffer is empty""" return self.available == 0 def clear(self): """Clear the buffer""" self.write_pos = 0 self.read_pos = 0 self.available = 0