TelegramSteamBot / utils.py
mrpoddaa's picture
Upload 13 files
18b952c verified
"""
Utility Functions and Constants
Helper functions for file streaming and chunk management
"""
import uuid
from typing import Tuple
# Constants
CHUNK_SIZE = 1024 * 1024 # 1MB chunks for streaming
MAX_PART_SIZE = 2000 * 1024 * 1024 # 2000MB (2GB) per part for safety
TELEGRAM_FILE_LIMIT = 2048 * 1024 * 1024 # Telegram's 2GB limit
def generate_unique_id() -> str:
"""Generate a unique identifier for files"""
return uuid.uuid4().hex[:16]
def calculate_part_and_offset(
byte_position: int,
parts_info: list
) -> Tuple[int, int, int]:
"""
Calculate which Telegram part contains a byte position
and the offset within that part
Args:
byte_position: Absolute byte position in the file
parts_info: List of dicts with 'part_number', 'size', 'file_id'
Returns:
Tuple of (part_index, offset_in_part, part_file_id)
"""
current_position = 0
for i, part in enumerate(parts_info):
part_start = current_position
part_end = current_position + part["size"]
if byte_position >= part_start and byte_position < part_end:
offset = byte_position - part_start
return (i, offset, part["file_id"])
current_position += part["size"]
# If we reach here, position is beyond file size
raise ValueError(f"Byte position {byte_position} exceeds file size")
def format_size(size_bytes: int) -> str:
"""Format byte size to human-readable string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def validate_range(start: int, end: int, total_size: int) -> bool:
"""Validate HTTP range request parameters"""
if start < 0 or end < 0:
return False
if start > end:
return False
if end >= total_size:
return False
return True
def split_into_parts(total_size: int, part_size: int = MAX_PART_SIZE) -> list:
"""
Calculate how a file should be split into parts
Args:
total_size: Total file size in bytes
part_size: Maximum size per part
Returns:
List of tuples (part_number, start_byte, end_byte)
"""
parts = []
current_position = 0
part_number = 1
while current_position < total_size:
end_position = min(current_position + part_size, total_size)
parts.append((part_number, current_position, end_position))
current_position = end_position
part_number += 1
return parts
class CircularBuffer:
"""Circular buffer for efficient memory management"""
def __init__(self, size: int):
self.size = size
self.buffer = bytearray(size)
self.write_pos = 0
self.read_pos = 0
self.available = 0
def write(self, data: bytes) -> int:
"""Write data to buffer, returns bytes written"""
space = self.size - self.available
to_write = min(len(data), space)
if to_write == 0:
return 0
# Handle wrap-around
end_pos = self.write_pos + to_write
if end_pos <= self.size:
self.buffer[self.write_pos:end_pos] = data[:to_write]
else:
first_part = self.size - self.write_pos
self.buffer[self.write_pos:] = data[:first_part]
self.buffer[:to_write - first_part] = data[first_part:to_write]
self.write_pos = (self.write_pos + to_write) % self.size
self.available += to_write
return to_write
def read(self, length: int) -> bytes:
"""Read data from buffer"""
to_read = min(length, self.available)
if to_read == 0:
return b''
# Handle wrap-around
end_pos = self.read_pos + to_read
if end_pos <= self.size:
data = bytes(self.buffer[self.read_pos:end_pos])
else:
first_part = self.size - self.read_pos
data = (
bytes(self.buffer[self.read_pos:]) +
bytes(self.buffer[:to_read - first_part])
)
self.read_pos = (self.read_pos + to_read) % self.size
self.available -= to_read
return data
def is_full(self) -> bool:
"""Check if buffer is full"""
return self.available == self.size
def is_empty(self) -> bool:
"""Check if buffer is empty"""
return self.available == 0
def clear(self):
"""Clear the buffer"""
self.write_pos = 0
self.read_pos = 0
self.available = 0