TelegramSteamBot / test_setup.py
mrpoddaa's picture
Upload 13 files
18b952c verified
#!/usr/bin/env python3
"""
Test Script for Telegram Multi-Part File Streamer
Validates setup and performs basic functionality tests
"""
import asyncio
import os
import sys
import time
from io import BytesIO
import httpx
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class Colors:
"""ANSI color codes"""
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
ENDC = '\033[0m'
BOLD = '\033[1m'
def print_header(text: str):
"""Print section header"""
print(f"\n{Colors.BOLD}{Colors.BLUE}{'=' * 60}{Colors.ENDC}")
print(f"{Colors.BOLD}{Colors.BLUE}{text.center(60)}{Colors.ENDC}")
print(f"{Colors.BOLD}{Colors.BLUE}{'=' * 60}{Colors.ENDC}\n")
def print_success(text: str):
"""Print success message"""
print(f"{Colors.GREEN}βœ“ {text}{Colors.ENDC}")
def print_error(text: str):
"""Print error message"""
print(f"{Colors.RED}βœ— {text}{Colors.ENDC}")
def print_warning(text: str):
"""Print warning message"""
print(f"{Colors.YELLOW}⚠ {text}{Colors.ENDC}")
def print_info(text: str):
"""Print info message"""
print(f"{Colors.BLUE}β„Ή {text}{Colors.ENDC}")
async def test_environment_variables():
"""Test 1: Check environment variables"""
print_header("Test 1: Environment Variables")
required_vars = {
"API_ID": "Telegram API ID",
"API_HASH": "Telegram API Hash",
"BOT_TOKEN": "Telegram Bot Token",
"MONGO_URI": "MongoDB Connection String"
}
optional_vars = {
"SESSION_STRINGS": "Pyrogram Session Strings (for load balancing)"
}
all_valid = True
print_info("Checking required variables...")
for var, description in required_vars.items():
value = os.getenv(var)
if value:
masked_value = value[:8] + "..." if len(value) > 8 else value
print_success(f"{var}: {masked_value} ({description})")
else:
print_error(f"{var}: Missing! ({description})")
all_valid = False
print()
print_info("Checking optional variables...")
for var, description in optional_vars.items():
value = os.getenv(var)
if value:
count = len(value.split(","))
print_success(f"{var}: {count} session(s) configured ({description})")
else:
print_warning(f"{var}: Not set ({description})")
print()
return all_valid
async def test_mongodb_connection():
"""Test 2: MongoDB connection"""
print_header("Test 2: MongoDB Connection")
try:
from motor.motor_asyncio import AsyncIOMotorClient
mongo_uri = os.getenv("MONGO_URI")
if not mongo_uri:
print_error("MONGO_URI not set")
return False
print_info(f"Connecting to MongoDB...")
client = AsyncIOMotorClient(mongo_uri, serverSelectionTimeoutMS=5000)
# Test connection
await client.admin.command('ping')
# Get database info
db_name = os.getenv("MONGO_DATABASE", "telegram_streamer")
db = client[db_name]
print_success(f"Connected to MongoDB!")
print_info(f"Database: {db_name}")
# List collections
collections = await db.list_collection_names()
print_info(f"Collections: {collections if collections else 'None (fresh database)'}")
client.close()
return True
except Exception as e:
print_error(f"MongoDB connection failed: {str(e)}")
return False
async def test_api_server():
"""Test 3: API server availability"""
print_header("Test 3: API Server")
base_url = "http://localhost:8000"
print_info(f"Testing API server at {base_url}...")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# Test root endpoint
response = await client.get(base_url)
if response.status_code == 200:
data = response.json()
print_success(f"Root endpoint: {data.get('status', 'unknown')}")
else:
print_error(f"Root endpoint returned {response.status_code}")
return False
# Test health endpoint
response = await client.get(f"{base_url}/health")
if response.status_code == 200:
data = response.json()
print_success(f"Health check: {data.get('status', 'unknown')}")
print_info(f"Active sessions: {data.get('sessions', 0)}")
print_info(f"Database: {data.get('database', 'unknown')}")
else:
print_warning(f"Health endpoint returned {response.status_code}")
return True
except httpx.ConnectError:
print_error("Cannot connect to API server!")
print_info("Make sure the server is running:")
print_info(" uvicorn main:app --reload")
return False
except Exception as e:
print_error(f"API test failed: {str(e)}")
return False
async def test_upload_download():
"""Test 4: Upload and download functionality"""
print_header("Test 4: Upload/Download")
base_url = "http://localhost:8000"
print_info("Creating test file (1MB)...")
test_data = os.urandom(1024 * 1024) # 1MB random data
test_filename = "test_file.bin"
try:
async with httpx.AsyncClient(timeout=60.0) as client:
# Upload
print_info("Uploading test file...")
start_time = time.time()
response = await client.post(
f"{base_url}/upload",
params={"filename": test_filename},
content=test_data
)
upload_time = time.time() - start_time
if response.status_code != 200:
print_error(f"Upload failed: {response.status_code}")
print_error(f"Response: {response.text}")
return False
result = response.json()
unique_id = result.get("unique_id")
print_success(f"Upload completed in {upload_time:.2f}s")
print_info(f"Unique ID: {unique_id}")
print_info(f"Parts: {result.get('parts', 0)}")
print_info(f"Size: {result.get('total_size', 0)} bytes")
# Download
print_info("Downloading test file...")
start_time = time.time()
response = await client.get(f"{base_url}/dl/{unique_id}")
download_time = time.time() - start_time
if response.status_code != 200:
print_error(f"Download failed: {response.status_code}")
return False
downloaded_data = response.content
print_success(f"Download completed in {download_time:.2f}s")
# Verify
if downloaded_data == test_data:
print_success("Data integrity verified! βœ“")
else:
print_error("Data integrity check failed!")
return False
# Test range request
print_info("Testing range request (bytes 0-1023)...")
response = await client.get(
f"{base_url}/dl/{unique_id}",
headers={"Range": "bytes=0-1023"}
)
if response.status_code == 206:
print_success("Range request supported! βœ“")
if len(response.content) == 1024:
print_success("Range request data correct! βœ“")
else:
print_error(f"Range request returned {len(response.content)} bytes, expected 1024")
else:
print_warning(f"Range request returned {response.status_code} (expected 206)")
# Cleanup
print_info("Cleaning up test file...")
response = await client.delete(f"{base_url}/delete/{unique_id}")
if response.status_code == 200:
print_success("Test file deleted")
return True
except Exception as e:
print_error(f"Upload/Download test failed: {str(e)}")
return False
async def main():
"""Run all tests"""
print()
print(f"{Colors.BOLD}{Colors.BLUE}")
print("╔════════════════════════════════════════════════════════════╗")
print("β•‘ Telegram Multi-Part File Streamer - Test Suite β•‘")
print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
print(f"{Colors.ENDC}")
tests = [
("Environment Variables", test_environment_variables),
("MongoDB Connection", test_mongodb_connection),
("API Server", test_api_server),
("Upload/Download", test_upload_download)
]
results = []
for name, test_func in tests:
try:
result = await test_func()
results.append((name, result))
except Exception as e:
print_error(f"Test '{name}' crashed: {str(e)}")
results.append((name, False))
# Summary
print_header("Test Summary")
passed = sum(1 for _, result in results if result)
total = len(results)
for name, result in results:
if result:
print_success(f"{name}")
else:
print_error(f"{name}")
print()
if passed == total:
print_success(f"All tests passed! ({passed}/{total})")
print()
print_info("Your setup is ready! πŸš€")
print_info("You can now start uploading and streaming files.")
print()
return 0
else:
print_error(f"Some tests failed: {passed}/{total} passed")
print()
print_info("Please fix the issues above before proceeding.")
print()
return 1
if __name__ == "__main__":
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except KeyboardInterrupt:
print(f"\n\n{Colors.YELLOW}Tests cancelled by user.{Colors.ENDC}\n")
sys.exit(1)