import json import os import random import threading import time from dataclasses import dataclass from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Set from urllib.parse import urlencode, urlparse import requests from huggingface_hub import InferenceClient from chat.handler import ChatHandler from ui.controller import UIController from ui.layout import create_demo class ConfigError(Exception): pass @dataclass class Config: moltbook_base_url: str hf_token: str hf_model: str hf_temperature: float hf_max_tokens: int heartbeat_interval_s: int heartbeat_jitter_s: int request_timeout_s: int dry_run: bool max_feed_items: int max_replies_per_cycle: int max_upvotes_per_cycle: int memory_notes_limit: int moltbook_token: str moltbook_feed_path: str moltbook_post_path: str moltbook_reply_path_template: str moltbook_upvote_path_template: str moltbook_me_activity_path: str enforce_https: bool block_unknown_network: bool @property def allowed_hosts(self) -> Set[str]: return { urlparse(self.moltbook_base_url).netloc.lower(), "api-inference.huggingface.co", } def _to_bool(value: Optional[str], default: bool = False) -> bool: if value is None or value == "": return default return str(value).strip().lower() in {"1", "true", "yes", "on"} def _to_int(value: Optional[str], default: int) -> int: if value is None or value == "": return default try: parsed = int(value) return parsed if parsed > 0 else default except ValueError: return default def _to_float(value: Optional[str], default: float) -> float: if value is None or value == "": return default try: parsed = float(value) return parsed if 0 <= parsed <= 2 else default except ValueError: return default def _require_env(name: str) -> str: value = os.getenv(name, "").strip() if not value: raise ConfigError(f"Missing required env var: {name}") return value def _validate_url(name: str, value: str, enforce_https: bool) -> str: parsed = urlparse(value) if parsed.scheme not in {"http", "https"}: raise ConfigError(f"{name} must be http/https URL") if enforce_https and parsed.scheme != "https": raise ConfigError(f"{name} must be https when ENFORCE_HTTPS=true") if not parsed.netloc: raise ConfigError(f"{name} missing host") return value.rstrip("/") def _normalize_path(value: str, fallback: str) -> str: path = (value or fallback).strip() if not path: path = fallback if not path.startswith("/"): path = "/" + path return path def load_config() -> Config: enforce_https = _to_bool(os.getenv("ENFORCE_HTTPS"), True) moltbook_base_url = _validate_url( "MOLTBOOK_BASE_URL", os.getenv("MOLTBOOK_BASE_URL", "https://www.moltbook.com"), enforce_https, ) if urlparse(moltbook_base_url).netloc.lower() == "moltbook.com": raise ConfigError("Use https://www.moltbook.com (www required)") return Config( moltbook_base_url=moltbook_base_url, hf_token=_require_env("HF_TOKEN"), hf_model=os.getenv("HF_MODEL", "meta-llama/Llama-3.1-8B-Instruct"), hf_temperature=_to_float(os.getenv("HF_TEMPERATURE"), 0.4), hf_max_tokens=_to_int(os.getenv("HF_MAX_TOKENS"), 450), heartbeat_interval_s=_to_int(os.getenv("HEARTBEAT_INTERVAL_S"), 90), heartbeat_jitter_s=_to_int(os.getenv("HEARTBEAT_JITTER_S"), 3), request_timeout_s=_to_int(os.getenv("REQUEST_TIMEOUT_S"), 15), dry_run=_to_bool(os.getenv("DRY_RUN"), True), max_feed_items=_to_int(os.getenv("MAX_FEED_ITEMS"), 20), max_replies_per_cycle=_to_int(os.getenv("MAX_REPLIES_PER_CYCLE"), 1), max_upvotes_per_cycle=_to_int(os.getenv("MAX_UPVOTES_PER_CYCLE"), 2), memory_notes_limit=_to_int(os.getenv("MEMORY_NOTES_LIMIT"), 20), moltbook_token=os.getenv("MOLTBOOK_API_TOKEN", ""), moltbook_feed_path=_normalize_path(os.getenv("MOLTBOOK_FEED_PATH", "/api/v1/feed"), "/api/v1/feed"), moltbook_post_path=_normalize_path(os.getenv("MOLTBOOK_POST_PATH", "/api/v1/posts"), "/api/v1/posts"), moltbook_reply_path_template=_normalize_path( os.getenv("MOLTBOOK_REPLY_PATH_TEMPLATE", "/api/v1/posts/{postId}/comments"), "/api/v1/posts/{postId}/comments", ), moltbook_upvote_path_template=_normalize_path( os.getenv("MOLTBOOK_UPVOTE_PATH_TEMPLATE", "/api/v1/posts/{postId}/upvote"), "/api/v1/posts/{postId}/upvote", ), moltbook_me_activity_path=_normalize_path(os.getenv("MOLTBOOK_ME_ACTIVITY_PATH", "/api/v1/agents/me"), "/api/v1/agents/me"), enforce_https=enforce_https, block_unknown_network=_to_bool(os.getenv("BLOCK_UNKNOWN_NETWORK"), True), ) class Logger: def __init__(self, max_lines: int = 300): self._max_lines = max_lines self._lines: List[str] = [] self._lock = threading.Lock() def _push(self, level: str, message: str, meta: Optional[Dict[str, Any]] = None) -> None: payload = { "time": datetime.now(timezone.utc).isoformat(), "level": level, "message": message, } if meta: payload["meta"] = meta line = json.dumps(payload, ensure_ascii=True) with self._lock: self._lines.append(line) self._lines = self._lines[-self._max_lines :] print(line, flush=True) def info(self, message: str, meta: Optional[Dict[str, Any]] = None) -> None: self._push("info", message, meta) def warn(self, message: str, meta: Optional[Dict[str, Any]] = None) -> None: self._push("warn", message, meta) def error(self, message: str, meta: Optional[Dict[str, Any]] = None) -> None: self._push("error", message, meta) def snapshot(self) -> str: with self._lock: return "\n".join(self._lines[-120:]) _ORIGINAL_REQUEST = None _REQUEST_GUARD_LOCK = threading.Lock() def install_requests_network_guard(config: Config) -> None: global _ORIGINAL_REQUEST with _REQUEST_GUARD_LOCK: if _ORIGINAL_REQUEST is not None: return _ORIGINAL_REQUEST = requests.sessions.Session.request def guarded_request(session: requests.Session, method: str, url: str, *args: Any, **kwargs: Any) -> requests.Response: parsed = urlparse(url) host = parsed.netloc.lower() if config.enforce_https and parsed.scheme != "https": raise RuntimeError(f"Blocked non-https request: {url}") if config.block_unknown_network and host not in config.allowed_hosts: raise RuntimeError(f"Blocked host outside allowlist: {host}") return _ORIGINAL_REQUEST(session, method, url, *args, **kwargs) requests.sessions.Session.request = guarded_request class GuardedHttp: def __init__(self, config: Config): self.config = config self.session = requests.Session() def request( self, method: str, url: str, headers: Dict[str, str], json_body: Optional[Dict[str, Any]] = None, ) -> requests.Response: parsed = urlparse(url) host = parsed.netloc.lower() if self.config.enforce_https and parsed.scheme != "https": raise RuntimeError(f"Blocked non-https request: {url}") if self.config.block_unknown_network and host not in self.config.allowed_hosts: raise RuntimeError(f"Blocked host outside allowlist: {host}") try: response = self.session.request( method=method, url=url, headers=headers, json=json_body, timeout=self.config.request_timeout_s, ) except requests.RequestException as exc: raise RuntimeError(f"Network request failed for {method} {url}: {exc}") from exc return response class MoltbookClient: def __init__(self, config: Config, http: GuardedHttp): self.cfg = config self.http = http def _url(self, path: str) -> str: return f"{self.cfg.moltbook_base_url}{path}" def _headers(self) -> Dict[str, str]: headers = {"content-type": "application/json"} if self.cfg.moltbook_token: headers["authorization"] = f"Bearer {self.cfg.moltbook_token}" return headers def _request(self, method: str, path: str, body: Optional[Dict[str, Any]] = None, query: Optional[Dict[str, Any]] = None) -> Any: url = self._url(path) if query: filtered_query = {k: v for k, v in query.items() if v is not None} if filtered_query: sep = "&" if "?" in url else "?" url += sep + urlencode(filtered_query) resp = self.http.request(method=method, url=url, headers=self._headers(), json_body=body) try: data = resp.json() if resp.text else {} except ValueError: data = {"raw": resp.text} if not resp.ok: raise RuntimeError(f"Moltbook API error {resp.status_code}: {data}") return data @staticmethod def _normalize_post(raw: Dict[str, Any]) -> Optional[Dict[str, Any]]: post_id = str(raw.get("id") or raw.get("post_id") or "").strip() text = str(raw.get("text") or raw.get("content") or "").strip() if not post_id or not text: return None replies_raw = raw.get("replies") if isinstance(raw.get("replies"), list) else raw.get("comments") replies = [] if isinstance(replies_raw, list): for item in replies_raw: if isinstance(item, dict): replies.append( { "id": str(item.get("id") or ""), "authorHandle": str(item.get("author_handle") or item.get("author", {}).get("handle") or "unknown"), "text": str(item.get("text") or item.get("content") or ""), } ) author = raw.get("author") if isinstance(raw.get("author"), dict) else {} return { "id": post_id, "authorHandle": str(raw.get("author_handle") or author.get("handle") or "unknown"), "text": text, "viewerHasUpvoted": bool(raw.get("viewer_has_upvoted") or raw.get("viewerHasUpvoted") or False), "replies": replies, } def read_feed(self, limit: int) -> List[Dict[str, Any]]: data = self._request("GET", self.cfg.moltbook_feed_path, query={"limit": limit}) items = data.get("items") if isinstance(data, dict) else data if isinstance(data, dict) and isinstance(data.get("feed"), list): items = data["feed"] if not isinstance(items, list): items = [] out: List[Dict[str, Any]] = [] for raw in items: if isinstance(raw, dict): norm = self._normalize_post(raw) if norm: out.append(norm) return out def get_activity(self, limit: int) -> List[Dict[str, Any]]: data = self._request("GET", self.cfg.moltbook_me_activity_path, query={"limit": limit}) if isinstance(data, dict) and isinstance(data.get("items"), list): return data["items"] return data if isinstance(data, list) else [] def post(self, content: str) -> None: self._request("POST", self.cfg.moltbook_post_path, body={"content": content}) def reply(self, post_id: str, content: str) -> None: path = self.cfg.moltbook_reply_path_template.replace("{postId}", post_id) self._request("POST", path, body={"content": content}) def upvote(self, post_id: str) -> None: path = self.cfg.moltbook_upvote_path_template.replace("{postId}", post_id) self._request("POST", path, body={}) class HfDecisionClient: def __init__(self, config: Config): self.cfg = config self.client = InferenceClient(token=self.cfg.hf_token, timeout=self.cfg.request_timeout_s) @staticmethod def _extract_content(response: Any) -> str: if isinstance(response, dict): content = ( response.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) else: choices = getattr(response, "choices", None) or [] message = choices[0].message if choices else None content = getattr(message, "content", "") if isinstance(content, list): content = "".join(str(part.get("text", "")) for part in content if isinstance(part, dict)) return str(content or "").strip() def decide(self, system_prompt: str, user_prompt: str) -> Dict[str, Any]: response = self.client.chat_completion( model=self.cfg.hf_model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], temperature=self.cfg.hf_temperature, max_tokens=self.cfg.hf_max_tokens, response_format={"type": "json_object"}, ) content = self._extract_content(response) if not isinstance(content, str) or not content.strip(): raise RuntimeError("HF response missing message content") try: parsed = json.loads(content) except json.JSONDecodeError as exc: raise RuntimeError(f"HF content is not valid JSON: {exc}") from exc if not isinstance(parsed, dict): raise RuntimeError("HF decision payload must be a JSON object") return parsed def chat(self, system_prompt: str, history: List[Any], user_message: str) -> str: messages: List[Dict[str, str]] = [{"role": "system", "content": system_prompt}] for item in history or []: if isinstance(item, dict): role = str(item.get("role") or "").strip().lower() content = str(item.get("content") or "").strip() if role in {"user", "assistant"} and content: messages.append({"role": role, "content": content}) continue if isinstance(item, (list, tuple)) and len(item) == 2: user_text = str(item[0] or "").strip() bot_text = str(item[1] or "").strip() if user_text: messages.append({"role": "user", "content": user_text}) if bot_text: messages.append({"role": "assistant", "content": bot_text}) messages.append({"role": "user", "content": user_message}) response = self.client.chat_completion( model=self.cfg.hf_model, messages=messages, temperature=self.cfg.hf_temperature, max_tokens=min(self.cfg.hf_max_tokens, 350), ) content = self._extract_content(response) if not content: raise RuntimeError("HF response missing message content") return content class EphemeralMemory: def __init__(self, agent_handle: str, max_notes: int): self.agent_handle = agent_handle self.max_notes = max_notes self.notes: List[str] = [] self.replied_post_ids: Set[str] = set() self.upvoted_post_ids: Set[str] = set() def rebuild(self, feed: List[Dict[str, Any]], activity: List[Dict[str, Any]]) -> None: self.replied_post_ids = set() self.upvoted_post_ids = set() for post in feed: if post.get("viewerHasUpvoted"): self.upvoted_post_ids.add(post["id"]) for reply in post.get("replies", []): if reply.get("authorHandle") == self.agent_handle: self.replied_post_ids.add(post["id"]) for event in activity: if not isinstance(event, dict): continue post_id = str(event.get("post_id") or event.get("postId") or "").strip() typ = str(event.get("type") or "").lower() if not post_id: continue if "reply" in typ or "comment" in typ: self.replied_post_ids.add(post_id) if "upvote" in typ or "like" in typ: self.upvoted_post_ids.add(post_id) def add_note(self, note: str) -> None: clean = " ".join(str(note).split()).strip() if not clean: return self.notes.append(f"{datetime.now(timezone.utc).isoformat()}: {clean[:160]}") self.notes = self.notes[-self.max_notes :] def to_context(self) -> Dict[str, Any]: return { "agentHandle": self.agent_handle, "notes": self.notes[-6:], "repliedPostIds": list(self.replied_post_ids), "upvotedPostIds": list(self.upvoted_post_ids), } class AutonomousAgent: def __init__(self, config: Config, logger: Logger): self.cfg = config self.logger = logger self.http = GuardedHttp(config) self.moltbook = MoltbookClient(config, self.http) self.llm = HfDecisionClient(config) self.memory = EphemeralMemory(agent_handle=os.getenv("AGENT_HANDLE", "clawdbot"), max_notes=config.memory_notes_limit) self.running = False self.thread: Optional[threading.Thread] = None self.last_status = "idle" self._state_lock = threading.Lock() self._stop_event = threading.Event() def _build_prompts(self, feed: List[Dict[str, Any]]) -> Dict[str, str]: system = ( "You are an autonomous Moltbook-only social bot. " "Return strict JSON only with schema: " '{"post": null|{"content": string}, "replies": [{"postId": string, "content": string}], "upvotes": [string], "memory_note": string}. ' "Limits: max 1 post, max 1 reply, max 2 upvotes, each content <= 220 chars." ) compact = [] for item in feed[: self.cfg.max_feed_items]: compact.append( { "id": item["id"], "author": f"@{item.get('authorHandle', 'unknown')}", "text": " ".join(item.get("text", "").split())[:280], "viewerHasUpvoted": bool(item.get("viewerHasUpvoted", False)), "hasAgentReply": item["id"] in self.memory.replied_post_ids, } ) user = json.dumps( { "task": "Choose Moltbook actions for this heartbeat", "memory": self.memory.to_context(), "feed": compact, "constraints": { "maxRepliesPerCycle": self.cfg.max_replies_per_cycle, "maxUpvotesPerCycle": self.cfg.max_upvotes_per_cycle, }, }, ensure_ascii=True, ) return {"system": system, "user": user} @staticmethod def _clip(text: Any, limit: int = 220) -> str: out = " ".join(str(text or "").split()).strip() if len(out) <= limit: return out return out[: limit - 3] + "..." def _sanitize_decision(self, decision: Dict[str, Any], feed_ids: Set[str]) -> Dict[str, Any]: replies = [] for raw in decision.get("replies", []) if isinstance(decision.get("replies"), list) else []: if not isinstance(raw, dict): continue post_id = str(raw.get("postId") or "").strip() content = self._clip(raw.get("content")) if not post_id or not content or post_id not in feed_ids: continue if post_id in self.memory.replied_post_ids: continue replies.append({"postId": post_id, "content": content}) if len(replies) >= self.cfg.max_replies_per_cycle: break upvotes = [] seen = set() for raw in decision.get("upvotes", []) if isinstance(decision.get("upvotes"), list) else []: post_id = str(raw).strip() if not post_id or post_id in seen or post_id not in feed_ids: continue if post_id in self.memory.upvoted_post_ids: continue seen.add(post_id) upvotes.append(post_id) if len(upvotes) >= self.cfg.max_upvotes_per_cycle: break post = None raw_post = decision.get("post") if isinstance(raw_post, dict): content = self._clip(raw_post.get("content")) if content: post = {"content": content} # Prefer reply over self-post to reduce spam. if replies: post = None note = self._clip(decision.get("memory_note") or decision.get("memoryNote") or "heartbeat complete", 160) return {"post": post, "replies": replies, "upvotes": upvotes, "memoryNote": note} def run_cycle(self) -> None: started = time.time() self._set_last_status("running") feed = self.moltbook.read_feed(self.cfg.max_feed_items) try: activity = self.moltbook.get_activity(self.cfg.max_feed_items) except Exception as exc: self.logger.warn("Unable to read activity, fallback to feed-only memory", {"error": str(exc)}) activity = [] self.memory.rebuild(feed, activity) prompts = self._build_prompts(feed) decision_raw = self.llm.decide(prompts["system"], prompts["user"]) decision = self._sanitize_decision(decision_raw, {p["id"] for p in feed}) summary = {"dryRun": self.cfg.dry_run, "upvotes": 0, "replies": 0, "posts": 0} if self.cfg.dry_run: self.logger.info("DRY_RUN enabled; no writes sent", {"decision": decision}) else: for post_id in decision["upvotes"]: self.moltbook.upvote(post_id) self.memory.upvoted_post_ids.add(post_id) summary["upvotes"] += 1 for reply in decision["replies"]: self.moltbook.reply(reply["postId"], reply["content"]) self.memory.replied_post_ids.add(reply["postId"]) summary["replies"] += 1 if decision["post"]: self.moltbook.post(decision["post"]["content"]) summary["posts"] += 1 self.memory.add_note(decision["memoryNote"]) elapsed_ms = int((time.time() - started) * 1000) self.logger.info("Heartbeat cycle complete", {"elapsedMs": elapsed_ms, "feedItems": len(feed), "summary": summary}) self._set_last_status("idle") def _set_last_status(self, value: str) -> None: with self._state_lock: self.last_status = value def status_snapshot(self) -> Dict[str, Any]: with self._state_lock: return { "running": self.running, "last_status": self.last_status, "dry_run": self.cfg.dry_run, } def _loop(self) -> None: self.logger.info("Agent loop started", {"dryRun": self.cfg.dry_run}) while not self._stop_event.is_set(): try: self.run_cycle() except Exception as exc: self._set_last_status("error") self.logger.error("Heartbeat cycle failed", {"error": str(exc)}) sleep_for = self.cfg.heartbeat_interval_s + random.randint(0, self.cfg.heartbeat_jitter_s) if self._stop_event.wait(timeout=sleep_for): break with self._state_lock: self.running = False self.logger.info("Agent loop stopped") def start(self) -> str: with self._state_lock: if self.running: return "already running" self.running = True self.last_status = "idle" self._stop_event.clear() self.thread = threading.Thread(target=self._loop, daemon=True) self.thread.start() return "started" def stop(self) -> str: with self._state_lock: if not self.running: return "already stopped" self._stop_event.set() return "stopping" logger = Logger() startup_error = None agent: Optional[AutonomousAgent] = None chat_handler: Optional[ChatHandler] = None try: config = load_config() install_requests_network_guard(config) agent = AutonomousAgent(config, logger) chat_handler = ChatHandler(agent.llm) logger.info("Configuration loaded", {"allowedHosts": list(config.allowed_hosts), "dryRun": config.dry_run}) except Exception as exc: # noqa: BLE001 startup_error = str(exc) chat_handler = ChatHandler(llm=None, startup_error=startup_error) logger.error("Startup configuration failed", {"error": startup_error}) controller = UIController( logger=logger, chat_handler=chat_handler, agent=agent, startup_error=startup_error, ) demo = create_demo(controller) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")))