TelegramSteamBot / example_client.py
mrpoddaa's picture
Upload 13 files
18b952c verified
#!/usr/bin/env python3
"""
Example Client for Telegram Multi-Part File Streamer
Demonstrates how to upload and download files programmatically
"""
import asyncio
import os
import time
from pathlib import Path
import httpx
class TelegramStreamerClient:
"""Client for interacting with Telegram File Streamer API"""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=300.0)
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
async def upload_file(
self,
file_path: str,
filename: str = None,
chunk_size: int = 1024 * 1024 # 1MB chunks
) -> dict:
"""
Upload a file to the streamer
Args:
file_path: Path to the file to upload
filename: Optional custom filename
chunk_size: Size of chunks for streaming upload
Returns:
Upload response with unique_id and download_url
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
if filename is None:
filename = file_path.name
file_size = file_path.stat().st_size
print(f"πŸ“€ Uploading: {filename}")
print(f" Size: {self._format_size(file_size)}")
async def file_stream():
"""Stream file in chunks"""
with open(file_path, "rb") as f:
uploaded = 0
start_time = time.time()
while True:
chunk = f.read(chunk_size)
if not chunk:
break
uploaded += len(chunk)
# Progress
elapsed = time.time() - start_time
if elapsed > 0:
speed = uploaded / elapsed
progress = (uploaded / file_size) * 100
print(
f"\r Progress: {progress:.1f}% "
f"({self._format_size(uploaded)}/{self._format_size(file_size)}) "
f"Speed: {self._format_size(speed)}/s",
end="",
flush=True
)
yield chunk
print() # New line after progress
start_time = time.time()
response = await self.client.post(
f"{self.base_url}/upload",
params={"filename": filename},
content=file_stream()
)
elapsed = time.time() - start_time
response.raise_for_status()
result = response.json()
print(f"βœ… Upload completed in {elapsed:.2f}s")
print(f" Unique ID: {result['unique_id']}")
print(f" Parts: {result['parts']}")
print(f" Download URL: {self.base_url}{result['download_url']}")
return result
async def download_file(
self,
unique_id: str,
output_path: str,
chunk_size: int = 1024 * 1024 # 1MB chunks
):
"""
Download a file from the streamer
Args:
unique_id: Unique ID of the file
output_path: Path to save the downloaded file
chunk_size: Size of chunks for streaming download
"""
output_path = Path(output_path)
# Get file info first
info = await self.get_file_info(unique_id)
total_size = info["total_size"]
print(f"πŸ“₯ Downloading: {info['filename']}")
print(f" Size: {self._format_size(total_size)}")
start_time = time.time()
downloaded = 0
async with self.client.stream(
"GET",
f"{self.base_url}/dl/{unique_id}"
) as response:
response.raise_for_status()
with open(output_path, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size):
f.write(chunk)
downloaded += len(chunk)
# Progress
elapsed = time.time() - start_time
if elapsed > 0:
speed = downloaded / elapsed
progress = (downloaded / total_size) * 100
print(
f"\r Progress: {progress:.1f}% "
f"({self._format_size(downloaded)}/{self._format_size(total_size)}) "
f"Speed: {self._format_size(speed)}/s",
end="",
flush=True
)
print() # New line after progress
elapsed = time.time() - start_time
print(f"βœ… Download completed in {elapsed:.2f}s")
print(f" Saved to: {output_path}")
async def download_range(
self,
unique_id: str,
start: int,
end: int,
output_path: str
):
"""
Download a specific byte range from a file
Args:
unique_id: Unique ID of the file
start: Start byte position
end: End byte position (inclusive)
output_path: Path to save the downloaded chunk
"""
output_path = Path(output_path)
print(f"πŸ“₯ Downloading range: bytes {start}-{end}")
response = await self.client.get(
f"{self.base_url}/dl/{unique_id}",
headers={"Range": f"bytes={start}-{end}"}
)
response.raise_for_status()
if response.status_code != 206:
print(f"⚠️ Warning: Expected 206 Partial Content, got {response.status_code}")
with open(output_path, "wb") as f:
f.write(response.content)
print(f"βœ… Downloaded {len(response.content)} bytes to {output_path}")
async def get_file_info(self, unique_id: str) -> dict:
"""Get file metadata"""
response = await self.client.get(f"{self.base_url}/info/{unique_id}")
response.raise_for_status()
return response.json()
async def delete_file(self, unique_id: str) -> dict:
"""Delete a file"""
response = await self.client.delete(f"{self.base_url}/delete/{unique_id}")
response.raise_for_status()
return response.json()
async def health_check(self) -> dict:
"""Check server health"""
response = await self.client.get(f"{self.base_url}/health")
response.raise_for_status()
return response.json()
@staticmethod
def _format_size(size_bytes: int) -> str:
"""Format byte size to human-readable string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
async def example_upload():
"""Example: Upload a file"""
client = TelegramStreamerClient()
try:
# Create a test file
test_file = "test_upload.bin"
print(f"Creating test file: {test_file} (10MB)")
with open(test_file, "wb") as f:
f.write(os.urandom(10 * 1024 * 1024)) # 10MB
# Upload
result = await client.upload_file(test_file)
unique_id = result["unique_id"]
# Get info
print("\nπŸ“Š File Info:")
info = await client.get_file_info(unique_id)
for key, value in info.items():
print(f" {key}: {value}")
# Cleanup
os.remove(test_file)
return unique_id
finally:
await client.close()
async def example_download(unique_id: str):
"""Example: Download a file"""
client = TelegramStreamerClient()
try:
output_file = "downloaded_file.bin"
await client.download_file(unique_id, output_file)
# Cleanup
if os.path.exists(output_file):
os.remove(output_file)
finally:
await client.close()
async def example_range_request(unique_id: str):
"""Example: Download a specific range"""
client = TelegramStreamerClient()
try:
# Download first 1MB
output_file = "range_chunk.bin"
await client.download_range(unique_id, 0, 1024 * 1024 - 1, output_file)
# Cleanup
if os.path.exists(output_file):
os.remove(output_file)
finally:
await client.close()
async def main():
"""Main example"""
print("=" * 60)
print("Telegram Multi-Part File Streamer - Example Client")
print("=" * 60)
print()
# Check server health
client = TelegramStreamerClient()
try:
health = await client.health_check()
print(f"πŸ₯ Server Status: {health['status']}")
print(f" Sessions: {health['sessions']}")
print(f" Database: {health['database']}")
print()
except Exception as e:
print(f"❌ Server not available: {str(e)}")
print(" Make sure the server is running!")
return
finally:
await client.close()
# Example 1: Upload
print("\n" + "=" * 60)
print("Example 1: Upload")
print("=" * 60)
unique_id = await example_upload()
# Example 2: Download
print("\n" + "=" * 60)
print("Example 2: Download")
print("=" * 60)
await example_download(unique_id)
# Example 3: Range Request
print("\n" + "=" * 60)
print("Example 3: Range Request")
print("=" * 60)
await example_range_request(unique_id)
# Cleanup: Delete the file
print("\n" + "=" * 60)
print("Cleanup")
print("=" * 60)
client = TelegramStreamerClient()
try:
result = await client.delete_file(unique_id)
print(f"πŸ—‘οΈ Deleted file: {unique_id}")
print(f" Deleted parts: {result['deleted_parts']}/{result['total_parts']}")
finally:
await client.close()
print("\nβœ… All examples completed!")
if __name__ == "__main__":
asyncio.run(main())