webssh / handler.py
airsltd's picture
hi
1d2aa43
import os
import sys
import json
import asyncio
import pty
import select
import fcntl
import struct
import termios
import signal
import logging
import hashlib
import time
import base64
import urllib.parse
from datetime import datetime
from functools import wraps
LOG_DIR = "./logs"
os.makedirs(LOG_DIR, exist_ok=True)
import tornado.log
import tornado.web
import tornado.websocket
import tornado.ioloop
import tornado.autoreload
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler(f"{LOG_DIR}/webssh.log"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
USERS = {
"admin": os.environ.get("WEBSSH_ADMIN_PASSWORD", "admin123"),
"user": os.environ.get("WEBSSH_USER_PASSWORD", "user123"),
}
SESSION_SECRET = "your-secret-key-change-this"
COMMAND_WHITELIST = [
"ls",
"cd",
"pwd",
"cat",
"echo",
"grep",
"find",
"ps",
"top",
"df",
"free",
"whoami",
"uname",
"pip",
"python",
"git",
]
BLOCKED_COMMANDS = ["rm -rf", "mkfs", "dd if=", ":(){:|:&};:", "chown", "chmod 777"]
def require_auth(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
auth_header = self.request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Basic "):
self.set_status(401)
self.set_header("WWW-Authenticate", 'Basic realm="WebSSH"')
self.finish({"error": "Authentication required"})
return
try:
import base64
credentials = base64.b64decode(auth_header[6:]).decode("utf-8")
username, password = credentials.split(":", 1)
if USERS.get(username) != password:
self.set_status(401)
self.finish({"error": "Invalid credentials"})
return
self.username = username
except:
self.set_status(401)
self.finish({"error": "Invalid credentials"})
return
return func(self, *args, **kwargs)
return wrapper
def log_command(username, command):
log_entry = {
"timestamp": datetime.now().isoformat(),
"user": username,
"command": command,
}
logger.info(f"COMMAND: {json.dumps(log_entry)}")
def check_command(command):
cmd_lower = command.lower().strip()
for blocked in BLOCKED_COMMANDS:
if blocked in cmd_lower:
return False, f"Command '{command}' is blocked"
return True, ""
class ShellHandler(tornado.websocket.WebSocketHandler):
connections = set()
username = None
def open(self):
auth_header = self.get_argument("auth", None)
if not auth_header:
auth_header = self.request.headers.get("X-Auth-Token", "")
# URL decode if needed (handles %3D -> =)
if auth_header:
auth_header = urllib.parse.unquote(auth_header)
logger.info(f"Auth: {auth_header[:50] if auth_header else 'None'}")
if auth_header:
try:
credentials = base64.b64decode(auth_header).decode("utf-8")
username, password = credentials.split(":", 1)
logger.info(f"Login attempt: {username}")
if USERS.get(username) == password:
self.username = username
ShellHandler.connections.add(self)
self.ptyslave = None
self.shell_pid = None
self.session_start = time.time()
logger.info(f"User {username} connected")
self.start_shell()
return
except Exception as e:
logger.error(f"Auth failed: {e}")
self.close(reason=b"Authentication required")
def start_shell(self):
pid, fd = pty.fork()
if pid == 0:
os.execv("/bin/sh", ["-sh"])
else:
self.ptyslave = fd
self.shell_pid = pid
def on_message(self, data):
if not self.ptyslave:
return
try:
msg = json.loads(data)
if "resize" in msg:
cols, rows = msg["resize"]
try:
fcntl.ioctl(
self.ptyslave,
termios.TIOCSWINSZ,
struct.pack("HHHH", rows, cols, 0, 0),
)
except:
pass
elif "ping" in msg:
self.write_message(json.dumps({"pong": True}))
else:
os.write(self.ptyslave, data.encode())
if self.username:
log_command(self.username, data)
except:
os.write(self.ptyslave, data.encode())
def on_close(self):
ShellHandler.connections.discard(self)
if self.ptyslave:
try:
os.close(self.ptyslave)
except:
pass
if self.shell_pid:
try:
os.kill(self.shell_pid, signal.SIGTERM)
except:
pass
if self.username:
duration = time.time() - self.session_start
logger.info(
f"User {self.username} disconnected, session duration: {duration:.2f}s"
)
def check_origin(self, origin):
return True
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.redirect("/index.html")
class AuthHandler(tornado.web.RequestHandler):
def post(self):
data = json.loads(self.request.body)
username = data.get("username")
password = data.get("password")
if USERS.get(username) == password:
import base64
token = base64.b64encode(f"{username}:{password}".encode()).decode()
self.write({"status": "ok", "token": token})
else:
self.set_status(401)
self.write({"status": "error", "message": "Invalid credentials"})
class LogHandler(tornado.web.RequestHandler):
@require_auth
def get(self):
if os.path.exists(f"{LOG_DIR}/webssh.log"):
with open(f"{LOG_DIR}/webssh.log", "r") as f:
lines = f.readlines()
self.write({"logs": lines[-100:]})
else:
self.write({"logs": []})
class HealthHandler(tornado.web.RequestHandler):
def get(self):
self.write(
{
"status": "ok",
"connections": len(ShellHandler.connections),
"uptime": time.time(),
}
)
def make_app():
return tornado.web.Application(
[
(r"/", MainHandler),
(r"/auth", AuthHandler),
(r"/logs", LogHandler),
(r"/health", HealthHandler),
(r"/ws", ShellHandler),
(r"/static/(.*)", tornado.web.StaticFileHandler, {"path": "./static"}),
(r"/(.*)", tornado.web.StaticFileHandler, {"path": "./templates"}),
]
)
async def read_from_shells():
while True:
for handler in list(ShellHandler.connections):
if handler.ptyslave:
try:
ready, _, _ = select.select([handler.ptyslave], [], [], 0.05)
if handler.ptyslave in ready:
data = os.read(handler.ptyslave, 4096)
if data:
handler.write_message(
data.decode("utf-8", errors="replace")
)
except:
pass
await asyncio.sleep(0.01)
async def monitor_connections():
while True:
try:
for handler in list(ShellHandler.connections):
if handler.session_start:
duration = time.time() - handler.session_start
if duration > 3600:
logger.warning(
f"Closing stale connection for user {handler.username}"
)
handler.close()
except:
pass
await asyncio.sleep(60)
async def main():
app = make_app()
ssl_options = None
if os.path.exists(f"{LOG_DIR}/cert.pem") and os.path.exists(f"{LOG_DIR}/key.pem"):
ssl_options = {
"certfile": f"{LOG_DIR}/cert.pem",
"keyfile": f"{LOG_DIR}/key.pem",
}
logger.info("HTTPS mode enabled")
else:
logger.info("HTTP mode (no SSL certificates found)")
app.listen(7860, "0.0.0.0", ssl_options=ssl_options)
asyncio.create_task(read_from_shells())
asyncio.create_task(monitor_connections())
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())