Spaces:
Runtime error
Runtime error
File size: 4,726 Bytes
18b952c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | """
Utility Functions and Constants
Helper functions for file streaming and chunk management
"""
import uuid
from typing import Tuple
# Constants
CHUNK_SIZE = 1024 * 1024 # 1MB chunks for streaming
MAX_PART_SIZE = 2000 * 1024 * 1024 # 2000MB (2GB) per part for safety
TELEGRAM_FILE_LIMIT = 2048 * 1024 * 1024 # Telegram's 2GB limit
def generate_unique_id() -> str:
"""Generate a unique identifier for files"""
return uuid.uuid4().hex[:16]
def calculate_part_and_offset(
byte_position: int,
parts_info: list
) -> Tuple[int, int, int]:
"""
Calculate which Telegram part contains a byte position
and the offset within that part
Args:
byte_position: Absolute byte position in the file
parts_info: List of dicts with 'part_number', 'size', 'file_id'
Returns:
Tuple of (part_index, offset_in_part, part_file_id)
"""
current_position = 0
for i, part in enumerate(parts_info):
part_start = current_position
part_end = current_position + part["size"]
if byte_position >= part_start and byte_position < part_end:
offset = byte_position - part_start
return (i, offset, part["file_id"])
current_position += part["size"]
# If we reach here, position is beyond file size
raise ValueError(f"Byte position {byte_position} exceeds file size")
def format_size(size_bytes: int) -> str:
"""Format byte size to human-readable string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def validate_range(start: int, end: int, total_size: int) -> bool:
"""Validate HTTP range request parameters"""
if start < 0 or end < 0:
return False
if start > end:
return False
if end >= total_size:
return False
return True
def split_into_parts(total_size: int, part_size: int = MAX_PART_SIZE) -> list:
"""
Calculate how a file should be split into parts
Args:
total_size: Total file size in bytes
part_size: Maximum size per part
Returns:
List of tuples (part_number, start_byte, end_byte)
"""
parts = []
current_position = 0
part_number = 1
while current_position < total_size:
end_position = min(current_position + part_size, total_size)
parts.append((part_number, current_position, end_position))
current_position = end_position
part_number += 1
return parts
class CircularBuffer:
"""Circular buffer for efficient memory management"""
def __init__(self, size: int):
self.size = size
self.buffer = bytearray(size)
self.write_pos = 0
self.read_pos = 0
self.available = 0
def write(self, data: bytes) -> int:
"""Write data to buffer, returns bytes written"""
space = self.size - self.available
to_write = min(len(data), space)
if to_write == 0:
return 0
# Handle wrap-around
end_pos = self.write_pos + to_write
if end_pos <= self.size:
self.buffer[self.write_pos:end_pos] = data[:to_write]
else:
first_part = self.size - self.write_pos
self.buffer[self.write_pos:] = data[:first_part]
self.buffer[:to_write - first_part] = data[first_part:to_write]
self.write_pos = (self.write_pos + to_write) % self.size
self.available += to_write
return to_write
def read(self, length: int) -> bytes:
"""Read data from buffer"""
to_read = min(length, self.available)
if to_read == 0:
return b''
# Handle wrap-around
end_pos = self.read_pos + to_read
if end_pos <= self.size:
data = bytes(self.buffer[self.read_pos:end_pos])
else:
first_part = self.size - self.read_pos
data = (
bytes(self.buffer[self.read_pos:]) +
bytes(self.buffer[:to_read - first_part])
)
self.read_pos = (self.read_pos + to_read) % self.size
self.available -= to_read
return data
def is_full(self) -> bool:
"""Check if buffer is full"""
return self.available == self.size
def is_empty(self) -> bool:
"""Check if buffer is empty"""
return self.available == 0
def clear(self):
"""Clear the buffer"""
self.write_pos = 0
self.read_pos = 0
self.available = 0
|