#!/usr/bin/env python3 """ Example Client for Telegram Multi-Part File Streamer Demonstrates how to upload and download files programmatically """ import asyncio import os import time from pathlib import Path import httpx class TelegramStreamerClient: """Client for interacting with Telegram File Streamer API""" def __init__(self, base_url: str = "http://localhost:8000"): self.base_url = base_url self.client = httpx.AsyncClient(timeout=300.0) async def close(self): """Close the HTTP client""" await self.client.aclose() async def upload_file( self, file_path: str, filename: str = None, chunk_size: int = 1024 * 1024 # 1MB chunks ) -> dict: """ Upload a file to the streamer Args: file_path: Path to the file to upload filename: Optional custom filename chunk_size: Size of chunks for streaming upload Returns: Upload response with unique_id and download_url """ file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if filename is None: filename = file_path.name file_size = file_path.stat().st_size print(f"šŸ“¤ Uploading: {filename}") print(f" Size: {self._format_size(file_size)}") async def file_stream(): """Stream file in chunks""" with open(file_path, "rb") as f: uploaded = 0 start_time = time.time() while True: chunk = f.read(chunk_size) if not chunk: break uploaded += len(chunk) # Progress elapsed = time.time() - start_time if elapsed > 0: speed = uploaded / elapsed progress = (uploaded / file_size) * 100 print( f"\r Progress: {progress:.1f}% " f"({self._format_size(uploaded)}/{self._format_size(file_size)}) " f"Speed: {self._format_size(speed)}/s", end="", flush=True ) yield chunk print() # New line after progress start_time = time.time() response = await self.client.post( f"{self.base_url}/upload", params={"filename": filename}, content=file_stream() ) elapsed = time.time() - start_time response.raise_for_status() result = response.json() print(f"āœ… Upload completed in {elapsed:.2f}s") print(f" Unique ID: {result['unique_id']}") print(f" Parts: {result['parts']}") print(f" Download URL: {self.base_url}{result['download_url']}") return result async def download_file( self, unique_id: str, output_path: str, chunk_size: int = 1024 * 1024 # 1MB chunks ): """ Download a file from the streamer Args: unique_id: Unique ID of the file output_path: Path to save the downloaded file chunk_size: Size of chunks for streaming download """ output_path = Path(output_path) # Get file info first info = await self.get_file_info(unique_id) total_size = info["total_size"] print(f"šŸ“„ Downloading: {info['filename']}") print(f" Size: {self._format_size(total_size)}") start_time = time.time() downloaded = 0 async with self.client.stream( "GET", f"{self.base_url}/dl/{unique_id}" ) as response: response.raise_for_status() with open(output_path, "wb") as f: async for chunk in response.aiter_bytes(chunk_size): f.write(chunk) downloaded += len(chunk) # Progress elapsed = time.time() - start_time if elapsed > 0: speed = downloaded / elapsed progress = (downloaded / total_size) * 100 print( f"\r Progress: {progress:.1f}% " f"({self._format_size(downloaded)}/{self._format_size(total_size)}) " f"Speed: {self._format_size(speed)}/s", end="", flush=True ) print() # New line after progress elapsed = time.time() - start_time print(f"āœ… Download completed in {elapsed:.2f}s") print(f" Saved to: {output_path}") async def download_range( self, unique_id: str, start: int, end: int, output_path: str ): """ Download a specific byte range from a file Args: unique_id: Unique ID of the file start: Start byte position end: End byte position (inclusive) output_path: Path to save the downloaded chunk """ output_path = Path(output_path) print(f"šŸ“„ Downloading range: bytes {start}-{end}") response = await self.client.get( f"{self.base_url}/dl/{unique_id}", headers={"Range": f"bytes={start}-{end}"} ) response.raise_for_status() if response.status_code != 206: print(f"āš ļø Warning: Expected 206 Partial Content, got {response.status_code}") with open(output_path, "wb") as f: f.write(response.content) print(f"āœ… Downloaded {len(response.content)} bytes to {output_path}") async def get_file_info(self, unique_id: str) -> dict: """Get file metadata""" response = await self.client.get(f"{self.base_url}/info/{unique_id}") response.raise_for_status() return response.json() async def delete_file(self, unique_id: str) -> dict: """Delete a file""" response = await self.client.delete(f"{self.base_url}/delete/{unique_id}") response.raise_for_status() return response.json() async def health_check(self) -> dict: """Check server health""" response = await self.client.get(f"{self.base_url}/health") response.raise_for_status() return response.json() @staticmethod def _format_size(size_bytes: int) -> str: """Format byte size to human-readable string""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} PB" async def example_upload(): """Example: Upload a file""" client = TelegramStreamerClient() try: # Create a test file test_file = "test_upload.bin" print(f"Creating test file: {test_file} (10MB)") with open(test_file, "wb") as f: f.write(os.urandom(10 * 1024 * 1024)) # 10MB # Upload result = await client.upload_file(test_file) unique_id = result["unique_id"] # Get info print("\nšŸ“Š File Info:") info = await client.get_file_info(unique_id) for key, value in info.items(): print(f" {key}: {value}") # Cleanup os.remove(test_file) return unique_id finally: await client.close() async def example_download(unique_id: str): """Example: Download a file""" client = TelegramStreamerClient() try: output_file = "downloaded_file.bin" await client.download_file(unique_id, output_file) # Cleanup if os.path.exists(output_file): os.remove(output_file) finally: await client.close() async def example_range_request(unique_id: str): """Example: Download a specific range""" client = TelegramStreamerClient() try: # Download first 1MB output_file = "range_chunk.bin" await client.download_range(unique_id, 0, 1024 * 1024 - 1, output_file) # Cleanup if os.path.exists(output_file): os.remove(output_file) finally: await client.close() async def main(): """Main example""" print("=" * 60) print("Telegram Multi-Part File Streamer - Example Client") print("=" * 60) print() # Check server health client = TelegramStreamerClient() try: health = await client.health_check() print(f"šŸ„ Server Status: {health['status']}") print(f" Sessions: {health['sessions']}") print(f" Database: {health['database']}") print() except Exception as e: print(f"āŒ Server not available: {str(e)}") print(" Make sure the server is running!") return finally: await client.close() # Example 1: Upload print("\n" + "=" * 60) print("Example 1: Upload") print("=" * 60) unique_id = await example_upload() # Example 2: Download print("\n" + "=" * 60) print("Example 2: Download") print("=" * 60) await example_download(unique_id) # Example 3: Range Request print("\n" + "=" * 60) print("Example 3: Range Request") print("=" * 60) await example_range_request(unique_id) # Cleanup: Delete the file print("\n" + "=" * 60) print("Cleanup") print("=" * 60) client = TelegramStreamerClient() try: result = await client.delete_file(unique_id) print(f"šŸ—‘ļø Deleted file: {unique_id}") print(f" Deleted parts: {result['deleted_parts']}/{result['total_parts']}") finally: await client.close() print("\nāœ… All examples completed!") if __name__ == "__main__": asyncio.run(main())