| import os |
| import sys |
| import json |
| import asyncio |
| import pty |
| import select |
| import fcntl |
| import struct |
| import termios |
| import signal |
| import logging |
| import hashlib |
| import time |
| import base64 |
| import urllib.parse |
| from datetime import datetime |
| from functools import wraps |
|
|
| LOG_DIR = "./logs" |
| os.makedirs(LOG_DIR, exist_ok=True) |
|
|
| import tornado.log |
| import tornado.web |
| import tornado.websocket |
| import tornado.ioloop |
| import tornado.autoreload |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(levelname)s - %(message)s", |
| handlers=[logging.FileHandler(f"{LOG_DIR}/webssh.log"), logging.StreamHandler()], |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| USERS = { |
| "admin": os.environ.get("WEBSSH_ADMIN_PASSWORD", "admin123"), |
| "user": os.environ.get("WEBSSH_USER_PASSWORD", "user123"), |
| } |
|
|
| SESSION_SECRET = "your-secret-key-change-this" |
| COMMAND_WHITELIST = [ |
| "ls", |
| "cd", |
| "pwd", |
| "cat", |
| "echo", |
| "grep", |
| "find", |
| "ps", |
| "top", |
| "df", |
| "free", |
| "whoami", |
| "uname", |
| "pip", |
| "python", |
| "git", |
| ] |
| BLOCKED_COMMANDS = ["rm -rf", "mkfs", "dd if=", ":(){:|:&};:", "chown", "chmod 777"] |
|
|
|
|
| def require_auth(func): |
| @wraps(func) |
| def wrapper(self, *args, **kwargs): |
| auth_header = self.request.headers.get("Authorization") |
| if not auth_header or not auth_header.startswith("Basic "): |
| self.set_status(401) |
| self.set_header("WWW-Authenticate", 'Basic realm="WebSSH"') |
| self.finish({"error": "Authentication required"}) |
| return |
| try: |
| import base64 |
|
|
| credentials = base64.b64decode(auth_header[6:]).decode("utf-8") |
| username, password = credentials.split(":", 1) |
| if USERS.get(username) != password: |
| self.set_status(401) |
| self.finish({"error": "Invalid credentials"}) |
| return |
| self.username = username |
| except: |
| self.set_status(401) |
| self.finish({"error": "Invalid credentials"}) |
| return |
| return func(self, *args, **kwargs) |
|
|
| return wrapper |
|
|
|
|
| def log_command(username, command): |
| log_entry = { |
| "timestamp": datetime.now().isoformat(), |
| "user": username, |
| "command": command, |
| } |
| logger.info(f"COMMAND: {json.dumps(log_entry)}") |
|
|
|
|
| def check_command(command): |
| cmd_lower = command.lower().strip() |
| for blocked in BLOCKED_COMMANDS: |
| if blocked in cmd_lower: |
| return False, f"Command '{command}' is blocked" |
| return True, "" |
|
|
|
|
| class ShellHandler(tornado.websocket.WebSocketHandler): |
| connections = set() |
| username = None |
|
|
| def open(self): |
| auth_header = self.get_argument("auth", None) |
| if not auth_header: |
| auth_header = self.request.headers.get("X-Auth-Token", "") |
|
|
| |
| if auth_header: |
| auth_header = urllib.parse.unquote(auth_header) |
|
|
| logger.info(f"Auth: {auth_header[:50] if auth_header else 'None'}") |
|
|
| if auth_header: |
| try: |
| credentials = base64.b64decode(auth_header).decode("utf-8") |
| username, password = credentials.split(":", 1) |
| logger.info(f"Login attempt: {username}") |
| if USERS.get(username) == password: |
| self.username = username |
| ShellHandler.connections.add(self) |
| self.ptyslave = None |
| self.shell_pid = None |
| self.session_start = time.time() |
| logger.info(f"User {username} connected") |
| self.start_shell() |
| return |
| except Exception as e: |
| logger.error(f"Auth failed: {e}") |
|
|
| self.close(reason=b"Authentication required") |
|
|
| def start_shell(self): |
| pid, fd = pty.fork() |
| if pid == 0: |
| os.execv("/bin/sh", ["-sh"]) |
| else: |
| self.ptyslave = fd |
| self.shell_pid = pid |
|
|
| def on_message(self, data): |
| if not self.ptyslave: |
| return |
| try: |
| msg = json.loads(data) |
| if "resize" in msg: |
| cols, rows = msg["resize"] |
| try: |
| fcntl.ioctl( |
| self.ptyslave, |
| termios.TIOCSWINSZ, |
| struct.pack("HHHH", rows, cols, 0, 0), |
| ) |
| except: |
| pass |
| elif "ping" in msg: |
| self.write_message(json.dumps({"pong": True})) |
| else: |
| os.write(self.ptyslave, data.encode()) |
| if self.username: |
| log_command(self.username, data) |
| except: |
| os.write(self.ptyslave, data.encode()) |
|
|
| def on_close(self): |
| ShellHandler.connections.discard(self) |
| if self.ptyslave: |
| try: |
| os.close(self.ptyslave) |
| except: |
| pass |
| if self.shell_pid: |
| try: |
| os.kill(self.shell_pid, signal.SIGTERM) |
| except: |
| pass |
| if self.username: |
| duration = time.time() - self.session_start |
| logger.info( |
| f"User {self.username} disconnected, session duration: {duration:.2f}s" |
| ) |
|
|
| def check_origin(self, origin): |
| return True |
|
|
|
|
| class MainHandler(tornado.web.RequestHandler): |
| def get(self): |
| self.redirect("/index.html") |
|
|
|
|
| class AuthHandler(tornado.web.RequestHandler): |
| def post(self): |
| data = json.loads(self.request.body) |
| username = data.get("username") |
| password = data.get("password") |
|
|
| if USERS.get(username) == password: |
| import base64 |
|
|
| token = base64.b64encode(f"{username}:{password}".encode()).decode() |
| self.write({"status": "ok", "token": token}) |
| else: |
| self.set_status(401) |
| self.write({"status": "error", "message": "Invalid credentials"}) |
|
|
|
|
| class LogHandler(tornado.web.RequestHandler): |
| @require_auth |
| def get(self): |
| if os.path.exists(f"{LOG_DIR}/webssh.log"): |
| with open(f"{LOG_DIR}/webssh.log", "r") as f: |
| lines = f.readlines() |
| self.write({"logs": lines[-100:]}) |
| else: |
| self.write({"logs": []}) |
|
|
|
|
| class HealthHandler(tornado.web.RequestHandler): |
| def get(self): |
| self.write( |
| { |
| "status": "ok", |
| "connections": len(ShellHandler.connections), |
| "uptime": time.time(), |
| } |
| ) |
|
|
|
|
| def make_app(): |
| return tornado.web.Application( |
| [ |
| (r"/", MainHandler), |
| (r"/auth", AuthHandler), |
| (r"/logs", LogHandler), |
| (r"/health", HealthHandler), |
| (r"/ws", ShellHandler), |
| (r"/static/(.*)", tornado.web.StaticFileHandler, {"path": "./static"}), |
| (r"/(.*)", tornado.web.StaticFileHandler, {"path": "./templates"}), |
| ] |
| ) |
|
|
|
|
| async def read_from_shells(): |
| while True: |
| for handler in list(ShellHandler.connections): |
| if handler.ptyslave: |
| try: |
| ready, _, _ = select.select([handler.ptyslave], [], [], 0.05) |
| if handler.ptyslave in ready: |
| data = os.read(handler.ptyslave, 4096) |
| if data: |
| handler.write_message( |
| data.decode("utf-8", errors="replace") |
| ) |
| except: |
| pass |
| await asyncio.sleep(0.01) |
|
|
|
|
| async def monitor_connections(): |
| while True: |
| try: |
| for handler in list(ShellHandler.connections): |
| if handler.session_start: |
| duration = time.time() - handler.session_start |
| if duration > 3600: |
| logger.warning( |
| f"Closing stale connection for user {handler.username}" |
| ) |
| handler.close() |
| except: |
| pass |
| await asyncio.sleep(60) |
|
|
|
|
| async def main(): |
| app = make_app() |
|
|
| ssl_options = None |
| if os.path.exists(f"{LOG_DIR}/cert.pem") and os.path.exists(f"{LOG_DIR}/key.pem"): |
| ssl_options = { |
| "certfile": f"{LOG_DIR}/cert.pem", |
| "keyfile": f"{LOG_DIR}/key.pem", |
| } |
| logger.info("HTTPS mode enabled") |
| else: |
| logger.info("HTTP mode (no SSL certificates found)") |
|
|
| app.listen(7860, "0.0.0.0", ssl_options=ssl_options) |
| asyncio.create_task(read_from_shells()) |
| asyncio.create_task(monitor_connections()) |
| await asyncio.Event().wait() |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|