#!/usr/bin/env python3 """ Test Script for Telegram Multi-Part File Streamer Validates setup and performs basic functionality tests """ import asyncio import os import sys import time from io import BytesIO import httpx from dotenv import load_dotenv # Load environment variables load_dotenv() class Colors: """ANSI color codes""" GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BLUE = '\033[94m' ENDC = '\033[0m' BOLD = '\033[1m' def print_header(text: str): """Print section header""" print(f"\n{Colors.BOLD}{Colors.BLUE}{'=' * 60}{Colors.ENDC}") print(f"{Colors.BOLD}{Colors.BLUE}{text.center(60)}{Colors.ENDC}") print(f"{Colors.BOLD}{Colors.BLUE}{'=' * 60}{Colors.ENDC}\n") def print_success(text: str): """Print success message""" print(f"{Colors.GREEN}✓ {text}{Colors.ENDC}") def print_error(text: str): """Print error message""" print(f"{Colors.RED}✗ {text}{Colors.ENDC}") def print_warning(text: str): """Print warning message""" print(f"{Colors.YELLOW}⚠ {text}{Colors.ENDC}") def print_info(text: str): """Print info message""" print(f"{Colors.BLUE}ℹ {text}{Colors.ENDC}") async def test_environment_variables(): """Test 1: Check environment variables""" print_header("Test 1: Environment Variables") required_vars = { "API_ID": "Telegram API ID", "API_HASH": "Telegram API Hash", "BOT_TOKEN": "Telegram Bot Token", "MONGO_URI": "MongoDB Connection String" } optional_vars = { "SESSION_STRINGS": "Pyrogram Session Strings (for load balancing)" } all_valid = True print_info("Checking required variables...") for var, description in required_vars.items(): value = os.getenv(var) if value: masked_value = value[:8] + "..." if len(value) > 8 else value print_success(f"{var}: {masked_value} ({description})") else: print_error(f"{var}: Missing! ({description})") all_valid = False print() print_info("Checking optional variables...") for var, description in optional_vars.items(): value = os.getenv(var) if value: count = len(value.split(",")) print_success(f"{var}: {count} session(s) configured ({description})") else: print_warning(f"{var}: Not set ({description})") print() return all_valid async def test_mongodb_connection(): """Test 2: MongoDB connection""" print_header("Test 2: MongoDB Connection") try: from motor.motor_asyncio import AsyncIOMotorClient mongo_uri = os.getenv("MONGO_URI") if not mongo_uri: print_error("MONGO_URI not set") return False print_info(f"Connecting to MongoDB...") client = AsyncIOMotorClient(mongo_uri, serverSelectionTimeoutMS=5000) # Test connection await client.admin.command('ping') # Get database info db_name = os.getenv("MONGO_DATABASE", "telegram_streamer") db = client[db_name] print_success(f"Connected to MongoDB!") print_info(f"Database: {db_name}") # List collections collections = await db.list_collection_names() print_info(f"Collections: {collections if collections else 'None (fresh database)'}") client.close() return True except Exception as e: print_error(f"MongoDB connection failed: {str(e)}") return False async def test_api_server(): """Test 3: API server availability""" print_header("Test 3: API Server") base_url = "http://localhost:8000" print_info(f"Testing API server at {base_url}...") try: async with httpx.AsyncClient(timeout=10.0) as client: # Test root endpoint response = await client.get(base_url) if response.status_code == 200: data = response.json() print_success(f"Root endpoint: {data.get('status', 'unknown')}") else: print_error(f"Root endpoint returned {response.status_code}") return False # Test health endpoint response = await client.get(f"{base_url}/health") if response.status_code == 200: data = response.json() print_success(f"Health check: {data.get('status', 'unknown')}") print_info(f"Active sessions: {data.get('sessions', 0)}") print_info(f"Database: {data.get('database', 'unknown')}") else: print_warning(f"Health endpoint returned {response.status_code}") return True except httpx.ConnectError: print_error("Cannot connect to API server!") print_info("Make sure the server is running:") print_info(" uvicorn main:app --reload") return False except Exception as e: print_error(f"API test failed: {str(e)}") return False async def test_upload_download(): """Test 4: Upload and download functionality""" print_header("Test 4: Upload/Download") base_url = "http://localhost:8000" print_info("Creating test file (1MB)...") test_data = os.urandom(1024 * 1024) # 1MB random data test_filename = "test_file.bin" try: async with httpx.AsyncClient(timeout=60.0) as client: # Upload print_info("Uploading test file...") start_time = time.time() response = await client.post( f"{base_url}/upload", params={"filename": test_filename}, content=test_data ) upload_time = time.time() - start_time if response.status_code != 200: print_error(f"Upload failed: {response.status_code}") print_error(f"Response: {response.text}") return False result = response.json() unique_id = result.get("unique_id") print_success(f"Upload completed in {upload_time:.2f}s") print_info(f"Unique ID: {unique_id}") print_info(f"Parts: {result.get('parts', 0)}") print_info(f"Size: {result.get('total_size', 0)} bytes") # Download print_info("Downloading test file...") start_time = time.time() response = await client.get(f"{base_url}/dl/{unique_id}") download_time = time.time() - start_time if response.status_code != 200: print_error(f"Download failed: {response.status_code}") return False downloaded_data = response.content print_success(f"Download completed in {download_time:.2f}s") # Verify if downloaded_data == test_data: print_success("Data integrity verified! ✓") else: print_error("Data integrity check failed!") return False # Test range request print_info("Testing range request (bytes 0-1023)...") response = await client.get( f"{base_url}/dl/{unique_id}", headers={"Range": "bytes=0-1023"} ) if response.status_code == 206: print_success("Range request supported! ✓") if len(response.content) == 1024: print_success("Range request data correct! ✓") else: print_error(f"Range request returned {len(response.content)} bytes, expected 1024") else: print_warning(f"Range request returned {response.status_code} (expected 206)") # Cleanup print_info("Cleaning up test file...") response = await client.delete(f"{base_url}/delete/{unique_id}") if response.status_code == 200: print_success("Test file deleted") return True except Exception as e: print_error(f"Upload/Download test failed: {str(e)}") return False async def main(): """Run all tests""" print() print(f"{Colors.BOLD}{Colors.BLUE}") print("╔════════════════════════════════════════════════════════════╗") print("║ Telegram Multi-Part File Streamer - Test Suite ║") print("╚════════════════════════════════════════════════════════════╝") print(f"{Colors.ENDC}") tests = [ ("Environment Variables", test_environment_variables), ("MongoDB Connection", test_mongodb_connection), ("API Server", test_api_server), ("Upload/Download", test_upload_download) ] results = [] for name, test_func in tests: try: result = await test_func() results.append((name, result)) except Exception as e: print_error(f"Test '{name}' crashed: {str(e)}") results.append((name, False)) # Summary print_header("Test Summary") passed = sum(1 for _, result in results if result) total = len(results) for name, result in results: if result: print_success(f"{name}") else: print_error(f"{name}") print() if passed == total: print_success(f"All tests passed! ({passed}/{total})") print() print_info("Your setup is ready! 🚀") print_info("You can now start uploading and streaming files.") print() return 0 else: print_error(f"Some tests failed: {passed}/{total} passed") print() print_info("Please fix the issues above before proceeding.") print() return 1 if __name__ == "__main__": try: exit_code = asyncio.run(main()) sys.exit(exit_code) except KeyboardInterrupt: print(f"\n\n{Colors.YELLOW}Tests cancelled by user.{Colors.ENDC}\n") sys.exit(1)