Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Test Script for Telegram Multi-Part File Streamer | |
| Validates setup and performs basic functionality tests | |
| """ | |
| import asyncio | |
| import os | |
| import sys | |
| import time | |
| from io import BytesIO | |
| import httpx | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| class Colors: | |
| """ANSI color codes""" | |
| GREEN = '\033[92m' | |
| YELLOW = '\033[93m' | |
| RED = '\033[91m' | |
| BLUE = '\033[94m' | |
| ENDC = '\033[0m' | |
| BOLD = '\033[1m' | |
| def print_header(text: str): | |
| """Print section header""" | |
| print(f"\n{Colors.BOLD}{Colors.BLUE}{'=' * 60}{Colors.ENDC}") | |
| print(f"{Colors.BOLD}{Colors.BLUE}{text.center(60)}{Colors.ENDC}") | |
| print(f"{Colors.BOLD}{Colors.BLUE}{'=' * 60}{Colors.ENDC}\n") | |
| def print_success(text: str): | |
| """Print success message""" | |
| print(f"{Colors.GREEN}β {text}{Colors.ENDC}") | |
| def print_error(text: str): | |
| """Print error message""" | |
| print(f"{Colors.RED}β {text}{Colors.ENDC}") | |
| def print_warning(text: str): | |
| """Print warning message""" | |
| print(f"{Colors.YELLOW}β {text}{Colors.ENDC}") | |
| def print_info(text: str): | |
| """Print info message""" | |
| print(f"{Colors.BLUE}βΉ {text}{Colors.ENDC}") | |
| async def test_environment_variables(): | |
| """Test 1: Check environment variables""" | |
| print_header("Test 1: Environment Variables") | |
| required_vars = { | |
| "API_ID": "Telegram API ID", | |
| "API_HASH": "Telegram API Hash", | |
| "BOT_TOKEN": "Telegram Bot Token", | |
| "MONGO_URI": "MongoDB Connection String" | |
| } | |
| optional_vars = { | |
| "SESSION_STRINGS": "Pyrogram Session Strings (for load balancing)" | |
| } | |
| all_valid = True | |
| print_info("Checking required variables...") | |
| for var, description in required_vars.items(): | |
| value = os.getenv(var) | |
| if value: | |
| masked_value = value[:8] + "..." if len(value) > 8 else value | |
| print_success(f"{var}: {masked_value} ({description})") | |
| else: | |
| print_error(f"{var}: Missing! ({description})") | |
| all_valid = False | |
| print() | |
| print_info("Checking optional variables...") | |
| for var, description in optional_vars.items(): | |
| value = os.getenv(var) | |
| if value: | |
| count = len(value.split(",")) | |
| print_success(f"{var}: {count} session(s) configured ({description})") | |
| else: | |
| print_warning(f"{var}: Not set ({description})") | |
| print() | |
| return all_valid | |
| async def test_mongodb_connection(): | |
| """Test 2: MongoDB connection""" | |
| print_header("Test 2: MongoDB Connection") | |
| try: | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| mongo_uri = os.getenv("MONGO_URI") | |
| if not mongo_uri: | |
| print_error("MONGO_URI not set") | |
| return False | |
| print_info(f"Connecting to MongoDB...") | |
| client = AsyncIOMotorClient(mongo_uri, serverSelectionTimeoutMS=5000) | |
| # Test connection | |
| await client.admin.command('ping') | |
| # Get database info | |
| db_name = os.getenv("MONGO_DATABASE", "telegram_streamer") | |
| db = client[db_name] | |
| print_success(f"Connected to MongoDB!") | |
| print_info(f"Database: {db_name}") | |
| # List collections | |
| collections = await db.list_collection_names() | |
| print_info(f"Collections: {collections if collections else 'None (fresh database)'}") | |
| client.close() | |
| return True | |
| except Exception as e: | |
| print_error(f"MongoDB connection failed: {str(e)}") | |
| return False | |
| async def test_api_server(): | |
| """Test 3: API server availability""" | |
| print_header("Test 3: API Server") | |
| base_url = "http://localhost:8000" | |
| print_info(f"Testing API server at {base_url}...") | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| # Test root endpoint | |
| response = await client.get(base_url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| print_success(f"Root endpoint: {data.get('status', 'unknown')}") | |
| else: | |
| print_error(f"Root endpoint returned {response.status_code}") | |
| return False | |
| # Test health endpoint | |
| response = await client.get(f"{base_url}/health") | |
| if response.status_code == 200: | |
| data = response.json() | |
| print_success(f"Health check: {data.get('status', 'unknown')}") | |
| print_info(f"Active sessions: {data.get('sessions', 0)}") | |
| print_info(f"Database: {data.get('database', 'unknown')}") | |
| else: | |
| print_warning(f"Health endpoint returned {response.status_code}") | |
| return True | |
| except httpx.ConnectError: | |
| print_error("Cannot connect to API server!") | |
| print_info("Make sure the server is running:") | |
| print_info(" uvicorn main:app --reload") | |
| return False | |
| except Exception as e: | |
| print_error(f"API test failed: {str(e)}") | |
| return False | |
| async def test_upload_download(): | |
| """Test 4: Upload and download functionality""" | |
| print_header("Test 4: Upload/Download") | |
| base_url = "http://localhost:8000" | |
| print_info("Creating test file (1MB)...") | |
| test_data = os.urandom(1024 * 1024) # 1MB random data | |
| test_filename = "test_file.bin" | |
| try: | |
| async with httpx.AsyncClient(timeout=60.0) as client: | |
| # Upload | |
| print_info("Uploading test file...") | |
| start_time = time.time() | |
| response = await client.post( | |
| f"{base_url}/upload", | |
| params={"filename": test_filename}, | |
| content=test_data | |
| ) | |
| upload_time = time.time() - start_time | |
| if response.status_code != 200: | |
| print_error(f"Upload failed: {response.status_code}") | |
| print_error(f"Response: {response.text}") | |
| return False | |
| result = response.json() | |
| unique_id = result.get("unique_id") | |
| print_success(f"Upload completed in {upload_time:.2f}s") | |
| print_info(f"Unique ID: {unique_id}") | |
| print_info(f"Parts: {result.get('parts', 0)}") | |
| print_info(f"Size: {result.get('total_size', 0)} bytes") | |
| # Download | |
| print_info("Downloading test file...") | |
| start_time = time.time() | |
| response = await client.get(f"{base_url}/dl/{unique_id}") | |
| download_time = time.time() - start_time | |
| if response.status_code != 200: | |
| print_error(f"Download failed: {response.status_code}") | |
| return False | |
| downloaded_data = response.content | |
| print_success(f"Download completed in {download_time:.2f}s") | |
| # Verify | |
| if downloaded_data == test_data: | |
| print_success("Data integrity verified! β") | |
| else: | |
| print_error("Data integrity check failed!") | |
| return False | |
| # Test range request | |
| print_info("Testing range request (bytes 0-1023)...") | |
| response = await client.get( | |
| f"{base_url}/dl/{unique_id}", | |
| headers={"Range": "bytes=0-1023"} | |
| ) | |
| if response.status_code == 206: | |
| print_success("Range request supported! β") | |
| if len(response.content) == 1024: | |
| print_success("Range request data correct! β") | |
| else: | |
| print_error(f"Range request returned {len(response.content)} bytes, expected 1024") | |
| else: | |
| print_warning(f"Range request returned {response.status_code} (expected 206)") | |
| # Cleanup | |
| print_info("Cleaning up test file...") | |
| response = await client.delete(f"{base_url}/delete/{unique_id}") | |
| if response.status_code == 200: | |
| print_success("Test file deleted") | |
| return True | |
| except Exception as e: | |
| print_error(f"Upload/Download test failed: {str(e)}") | |
| return False | |
| async def main(): | |
| """Run all tests""" | |
| print() | |
| print(f"{Colors.BOLD}{Colors.BLUE}") | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print("β Telegram Multi-Part File Streamer - Test Suite β") | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print(f"{Colors.ENDC}") | |
| tests = [ | |
| ("Environment Variables", test_environment_variables), | |
| ("MongoDB Connection", test_mongodb_connection), | |
| ("API Server", test_api_server), | |
| ("Upload/Download", test_upload_download) | |
| ] | |
| results = [] | |
| for name, test_func in tests: | |
| try: | |
| result = await test_func() | |
| results.append((name, result)) | |
| except Exception as e: | |
| print_error(f"Test '{name}' crashed: {str(e)}") | |
| results.append((name, False)) | |
| # Summary | |
| print_header("Test Summary") | |
| passed = sum(1 for _, result in results if result) | |
| total = len(results) | |
| for name, result in results: | |
| if result: | |
| print_success(f"{name}") | |
| else: | |
| print_error(f"{name}") | |
| print() | |
| if passed == total: | |
| print_success(f"All tests passed! ({passed}/{total})") | |
| print() | |
| print_info("Your setup is ready! π") | |
| print_info("You can now start uploading and streaming files.") | |
| print() | |
| return 0 | |
| else: | |
| print_error(f"Some tests failed: {passed}/{total} passed") | |
| print() | |
| print_info("Please fix the issues above before proceeding.") | |
| print() | |
| return 1 | |
| if __name__ == "__main__": | |
| try: | |
| exit_code = asyncio.run(main()) | |
| sys.exit(exit_code) | |
| except KeyboardInterrupt: | |
| print(f"\n\n{Colors.YELLOW}Tests cancelled by user.{Colors.ENDC}\n") | |
| sys.exit(1) | |