File size: 4,726 Bytes
18b952c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Utility Functions and Constants
Helper functions for file streaming and chunk management
"""

import uuid
from typing import Tuple

# Constants
CHUNK_SIZE = 1024 * 1024  # 1MB chunks for streaming
MAX_PART_SIZE = 2000 * 1024 * 1024  # 2000MB (2GB) per part for safety
TELEGRAM_FILE_LIMIT = 2048 * 1024 * 1024  # Telegram's 2GB limit


def generate_unique_id() -> str:
    """Generate a unique identifier for files"""
    return uuid.uuid4().hex[:16]


def calculate_part_and_offset(
    byte_position: int,
    parts_info: list
) -> Tuple[int, int, int]:
    """
    Calculate which Telegram part contains a byte position
    and the offset within that part
    
    Args:
        byte_position: Absolute byte position in the file
        parts_info: List of dicts with 'part_number', 'size', 'file_id'
    
    Returns:
        Tuple of (part_index, offset_in_part, part_file_id)
    """
    current_position = 0
    
    for i, part in enumerate(parts_info):
        part_start = current_position
        part_end = current_position + part["size"]
        
        if byte_position >= part_start and byte_position < part_end:
            offset = byte_position - part_start
            return (i, offset, part["file_id"])
        
        current_position += part["size"]
    
    # If we reach here, position is beyond file size
    raise ValueError(f"Byte position {byte_position} exceeds file size")


def format_size(size_bytes: int) -> str:
    """Format byte size to human-readable string"""
    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.2f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.2f} PB"


def validate_range(start: int, end: int, total_size: int) -> bool:
    """Validate HTTP range request parameters"""
    if start < 0 or end < 0:
        return False
    if start > end:
        return False
    if end >= total_size:
        return False
    return True


def split_into_parts(total_size: int, part_size: int = MAX_PART_SIZE) -> list:
    """
    Calculate how a file should be split into parts
    
    Args:
        total_size: Total file size in bytes
        part_size: Maximum size per part
    
    Returns:
        List of tuples (part_number, start_byte, end_byte)
    """
    parts = []
    current_position = 0
    part_number = 1
    
    while current_position < total_size:
        end_position = min(current_position + part_size, total_size)
        parts.append((part_number, current_position, end_position))
        current_position = end_position
        part_number += 1
    
    return parts


class CircularBuffer:
    """Circular buffer for efficient memory management"""
    
    def __init__(self, size: int):
        self.size = size
        self.buffer = bytearray(size)
        self.write_pos = 0
        self.read_pos = 0
        self.available = 0
    
    def write(self, data: bytes) -> int:
        """Write data to buffer, returns bytes written"""
        space = self.size - self.available
        to_write = min(len(data), space)
        
        if to_write == 0:
            return 0
        
        # Handle wrap-around
        end_pos = self.write_pos + to_write
        if end_pos <= self.size:
            self.buffer[self.write_pos:end_pos] = data[:to_write]
        else:
            first_part = self.size - self.write_pos
            self.buffer[self.write_pos:] = data[:first_part]
            self.buffer[:to_write - first_part] = data[first_part:to_write]
        
        self.write_pos = (self.write_pos + to_write) % self.size
        self.available += to_write
        
        return to_write
    
    def read(self, length: int) -> bytes:
        """Read data from buffer"""
        to_read = min(length, self.available)
        
        if to_read == 0:
            return b''
        
        # Handle wrap-around
        end_pos = self.read_pos + to_read
        if end_pos <= self.size:
            data = bytes(self.buffer[self.read_pos:end_pos])
        else:
            first_part = self.size - self.read_pos
            data = (
                bytes(self.buffer[self.read_pos:]) +
                bytes(self.buffer[:to_read - first_part])
            )
        
        self.read_pos = (self.read_pos + to_read) % self.size
        self.available -= to_read
        
        return data
    
    def is_full(self) -> bool:
        """Check if buffer is full"""
        return self.available == self.size
    
    def is_empty(self) -> bool:
        """Check if buffer is empty"""
        return self.available == 0
    
    def clear(self):
        """Clear the buffer"""
        self.write_pos = 0
        self.read_pos = 0
        self.available = 0