import os import sys import json import asyncio import pty import select import fcntl import struct import termios import signal import logging import hashlib import time import base64 import urllib.parse from datetime import datetime from functools import wraps LOG_DIR = "./logs" os.makedirs(LOG_DIR, exist_ok=True) import tornado.log import tornado.web import tornado.websocket import tornado.ioloop import tornado.autoreload logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler(f"{LOG_DIR}/webssh.log"), logging.StreamHandler()], ) logger = logging.getLogger(__name__) USERS = { "admin": os.environ.get("WEBSSH_ADMIN_PASSWORD", "admin123"), "user": os.environ.get("WEBSSH_USER_PASSWORD", "user123"), } SESSION_SECRET = "your-secret-key-change-this" COMMAND_WHITELIST = [ "ls", "cd", "pwd", "cat", "echo", "grep", "find", "ps", "top", "df", "free", "whoami", "uname", "pip", "python", "git", ] BLOCKED_COMMANDS = ["rm -rf", "mkfs", "dd if=", ":(){:|:&};:", "chown", "chmod 777"] def require_auth(func): @wraps(func) def wrapper(self, *args, **kwargs): auth_header = self.request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Basic "): self.set_status(401) self.set_header("WWW-Authenticate", 'Basic realm="WebSSH"') self.finish({"error": "Authentication required"}) return try: import base64 credentials = base64.b64decode(auth_header[6:]).decode("utf-8") username, password = credentials.split(":", 1) if USERS.get(username) != password: self.set_status(401) self.finish({"error": "Invalid credentials"}) return self.username = username except: self.set_status(401) self.finish({"error": "Invalid credentials"}) return return func(self, *args, **kwargs) return wrapper def log_command(username, command): log_entry = { "timestamp": datetime.now().isoformat(), "user": username, "command": command, } logger.info(f"COMMAND: {json.dumps(log_entry)}") def check_command(command): cmd_lower = command.lower().strip() for blocked in BLOCKED_COMMANDS: if blocked in cmd_lower: return False, f"Command '{command}' is blocked" return True, "" class ShellHandler(tornado.websocket.WebSocketHandler): connections = set() username = None def open(self): auth_header = self.get_argument("auth", None) if not auth_header: auth_header = self.request.headers.get("X-Auth-Token", "") # URL decode if needed (handles %3D -> =) if auth_header: auth_header = urllib.parse.unquote(auth_header) logger.info(f"Auth: {auth_header[:50] if auth_header else 'None'}") if auth_header: try: credentials = base64.b64decode(auth_header).decode("utf-8") username, password = credentials.split(":", 1) logger.info(f"Login attempt: {username}") if USERS.get(username) == password: self.username = username ShellHandler.connections.add(self) self.ptyslave = None self.shell_pid = None self.session_start = time.time() logger.info(f"User {username} connected") self.start_shell() return except Exception as e: logger.error(f"Auth failed: {e}") self.close(reason=b"Authentication required") def start_shell(self): pid, fd = pty.fork() if pid == 0: os.execv("/bin/sh", ["-sh"]) else: self.ptyslave = fd self.shell_pid = pid def on_message(self, data): if not self.ptyslave: return try: msg = json.loads(data) if "resize" in msg: cols, rows = msg["resize"] try: fcntl.ioctl( self.ptyslave, termios.TIOCSWINSZ, struct.pack("HHHH", rows, cols, 0, 0), ) except: pass elif "ping" in msg: self.write_message(json.dumps({"pong": True})) else: os.write(self.ptyslave, data.encode()) if self.username: log_command(self.username, data) except: os.write(self.ptyslave, data.encode()) def on_close(self): ShellHandler.connections.discard(self) if self.ptyslave: try: os.close(self.ptyslave) except: pass if self.shell_pid: try: os.kill(self.shell_pid, signal.SIGTERM) except: pass if self.username: duration = time.time() - self.session_start logger.info( f"User {self.username} disconnected, session duration: {duration:.2f}s" ) def check_origin(self, origin): return True class MainHandler(tornado.web.RequestHandler): def get(self): self.redirect("/index.html") class AuthHandler(tornado.web.RequestHandler): def post(self): data = json.loads(self.request.body) username = data.get("username") password = data.get("password") if USERS.get(username) == password: import base64 token = base64.b64encode(f"{username}:{password}".encode()).decode() self.write({"status": "ok", "token": token}) else: self.set_status(401) self.write({"status": "error", "message": "Invalid credentials"}) class LogHandler(tornado.web.RequestHandler): @require_auth def get(self): if os.path.exists(f"{LOG_DIR}/webssh.log"): with open(f"{LOG_DIR}/webssh.log", "r") as f: lines = f.readlines() self.write({"logs": lines[-100:]}) else: self.write({"logs": []}) class HealthHandler(tornado.web.RequestHandler): def get(self): self.write( { "status": "ok", "connections": len(ShellHandler.connections), "uptime": time.time(), } ) def make_app(): return tornado.web.Application( [ (r"/", MainHandler), (r"/auth", AuthHandler), (r"/logs", LogHandler), (r"/health", HealthHandler), (r"/ws", ShellHandler), (r"/static/(.*)", tornado.web.StaticFileHandler, {"path": "./static"}), (r"/(.*)", tornado.web.StaticFileHandler, {"path": "./templates"}), ] ) async def read_from_shells(): while True: for handler in list(ShellHandler.connections): if handler.ptyslave: try: ready, _, _ = select.select([handler.ptyslave], [], [], 0.05) if handler.ptyslave in ready: data = os.read(handler.ptyslave, 4096) if data: handler.write_message( data.decode("utf-8", errors="replace") ) except: pass await asyncio.sleep(0.01) async def monitor_connections(): while True: try: for handler in list(ShellHandler.connections): if handler.session_start: duration = time.time() - handler.session_start if duration > 3600: logger.warning( f"Closing stale connection for user {handler.username}" ) handler.close() except: pass await asyncio.sleep(60) async def main(): app = make_app() ssl_options = None if os.path.exists(f"{LOG_DIR}/cert.pem") and os.path.exists(f"{LOG_DIR}/key.pem"): ssl_options = { "certfile": f"{LOG_DIR}/cert.pem", "keyfile": f"{LOG_DIR}/key.pem", } logger.info("HTTPS mode enabled") else: logger.info("HTTP mode (no SSL certificates found)") app.listen(7860, "0.0.0.0", ssl_options=ssl_options) asyncio.create_task(read_from_shells()) asyncio.create_task(monitor_connections()) await asyncio.Event().wait() if __name__ == "__main__": asyncio.run(main())