| """
|
| Docking@HOME Server - Complete AutoDock Integration
|
|
|
| This module provides the backend server that executes AutoDock docking simulations,
|
| manages jobs, and coordinates with the GUI.
|
|
|
| Authors: OpenPeer AI, Riemann Computing Inc., Bleunomics, Andrew Magdy Kamal
|
| """
|
|
|
| import os
|
| import sys
|
| import json
|
| import uuid
|
| import asyncio
|
| import subprocess
|
| import tempfile
|
| import shutil
|
| from pathlib import Path
|
| from typing import Dict, List, Optional, Tuple
|
| from datetime import datetime
|
| import logging
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
| )
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class AutoDockExecutor:
|
| """
|
| Executes AutoDock docking simulations with GPU acceleration support
|
| """
|
|
|
| def __init__(self, autodock_path: Optional[str] = None, use_gpu: bool = True):
|
| """
|
| Initialize AutoDock executor
|
|
|
| Args:
|
| autodock_path: Path to AutoDock executable (autodock4 or autodock_gpu)
|
| use_gpu: Whether to use GPU acceleration
|
| """
|
| self.use_gpu = use_gpu
|
| self.autodock_path = autodock_path or self._find_autodock()
|
|
|
| if not self.autodock_path:
|
| logger.warning("AutoDock not found in PATH. Will use simulation mode.")
|
|
|
| def _find_autodock(self) -> Optional[str]:
|
| """Find AutoDock executable in system PATH"""
|
| executables = ['autodock_gpu', 'autodock4', 'autodock']
|
|
|
| for exe in executables:
|
| if shutil.which(exe):
|
| logger.info(f"Found AutoDock executable: {exe}")
|
| return exe
|
|
|
|
|
| build_dir = Path(__file__).parent.parent.parent / "build"
|
| if build_dir.exists():
|
| for exe in ['autodock_gpu', 'autodock4']:
|
| exe_path = build_dir / exe
|
| if exe_path.exists():
|
| logger.info(f"Found AutoDock in build directory: {exe_path}")
|
| return str(exe_path)
|
|
|
| return None
|
|
|
| async def run_docking(
|
| self,
|
| ligand_file: str,
|
| receptor_file: str,
|
| output_dir: str,
|
| num_runs: int = 100,
|
| exhaustiveness: int = 8,
|
| grid_center: Optional[Tuple[float, float, float]] = None,
|
| grid_size: Optional[Tuple[int, int, int]] = None,
|
| progress_callback=None
|
| ) -> Dict:
|
| """
|
| Run AutoDock docking simulation
|
|
|
| Args:
|
| ligand_file: Path to ligand PDBQT file
|
| receptor_file: Path to receptor PDBQT file
|
| output_dir: Directory for output files
|
| num_runs: Number of docking runs
|
| exhaustiveness: Search exhaustiveness
|
| grid_center: Grid center coordinates (x, y, z)
|
| grid_size: Grid box size (x, y, z)
|
| progress_callback: Callback function for progress updates
|
|
|
| Returns:
|
| Dictionary with docking results
|
| """
|
| output_path = Path(output_dir)
|
| output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| dpf_file = output_path / "docking.dpf"
|
| glg_file = output_path / "docking.glg"
|
| dlg_file = output_path / "docking.dlg"
|
|
|
|
|
| if not self.autodock_path:
|
| logger.info("Running in simulation mode (AutoDock not installed)")
|
| return await self._simulate_docking(
|
| ligand_file, receptor_file, output_dir, num_runs, progress_callback
|
| )
|
|
|
| try:
|
|
|
| self._create_dpf(
|
| dpf_file, ligand_file, receptor_file, dlg_file,
|
| num_runs, exhaustiveness, grid_center, grid_size
|
| )
|
|
|
|
|
| logger.info(f"Starting AutoDock with {num_runs} runs")
|
| logger.info(f"Ligand: {ligand_file}")
|
| logger.info(f"Receptor: {receptor_file}")
|
|
|
| cmd = [self.autodock_path, '-p', str(dpf_file), '-l', str(glg_file)]
|
|
|
| if self.use_gpu and 'gpu' in self.autodock_path.lower():
|
| cmd.extend(['--nrun', str(num_runs)])
|
|
|
|
|
| process = await asyncio.create_subprocess_exec(
|
| *cmd,
|
| stdout=asyncio.subprocess.PIPE,
|
| stderr=asyncio.subprocess.PIPE
|
| )
|
|
|
|
|
| async def monitor_progress():
|
| line_count = 0
|
| async for line in process.stdout:
|
| line_count += 1
|
| if progress_callback and line_count % 10 == 0:
|
|
|
| estimated_progress = min(95, (line_count / (num_runs * 5)) * 100)
|
| await progress_callback(estimated_progress)
|
|
|
| await asyncio.gather(
|
| monitor_progress(),
|
| process.wait()
|
| )
|
|
|
| if progress_callback:
|
| await progress_callback(100)
|
|
|
|
|
| results = self._parse_dlg_file(dlg_file)
|
|
|
| logger.info(f"Docking completed. Best energy: {results.get('best_energy', 'N/A')}")
|
|
|
| return results
|
|
|
| except Exception as e:
|
| logger.error(f"Error running AutoDock: {e}")
|
|
|
| return await self._simulate_docking(
|
| ligand_file, receptor_file, output_dir, num_runs, progress_callback
|
| )
|
|
|
| def _create_dpf(
|
| self,
|
| dpf_file: Path,
|
| ligand_file: str,
|
| receptor_file: str,
|
| output_file: Path,
|
| num_runs: int,
|
| exhaustiveness: int,
|
| grid_center: Optional[Tuple[float, float, float]],
|
| grid_size: Optional[Tuple[int, int, int]]
|
| ):
|
| """Create AutoDock DPF (Docking Parameter File)"""
|
|
|
|
|
| if grid_center is None:
|
| grid_center = (0.0, 0.0, 0.0)
|
| if grid_size is None:
|
| grid_size = (40, 40, 40)
|
|
|
| dpf_content = f"""# AutoDock DPF - Generated by Docking@HOME
|
| autodock_parameter_version 4.2
|
|
|
| outlev 1
|
| parameter_file AD4_parameters.dat
|
|
|
| ligand {ligand_file}
|
| receptor {receptor_file}
|
|
|
| npts {grid_size[0]} {grid_size[1]} {grid_size[2]}
|
| gridcenter {grid_center[0]} {grid_center[1]} {grid_center[2]}
|
| spacing 0.375
|
|
|
| seed pid time
|
|
|
| ga_pop_size 150
|
| ga_num_evals 2500000
|
| ga_num_generations 27000
|
| ga_elitism 1
|
| ga_mutation_rate 0.02
|
| ga_crossover_rate 0.8
|
| ga_window_size 10
|
| ga_cauchy_alpha 0.0
|
| ga_cauchy_beta 1.0
|
| set_ga
|
|
|
| ga_run {num_runs}
|
| analysis
|
|
|
| """
|
| dpf_file.write_text(dpf_content)
|
| logger.debug(f"Created DPF file: {dpf_file}")
|
|
|
| def _parse_dlg_file(self, dlg_file: Path) -> Dict:
|
| """Parse AutoDock DLG output file"""
|
|
|
| if not dlg_file.exists():
|
| logger.warning(f"DLG file not found: {dlg_file}")
|
| return {"error": "Output file not found"}
|
|
|
| results = {
|
| "poses": [],
|
| "best_energy": None,
|
| "clusters": []
|
| }
|
|
|
| try:
|
| with open(dlg_file, 'r') as f:
|
| content = f.read()
|
|
|
|
|
| import re
|
|
|
|
|
| energy_pattern = r"Estimated Free Energy of Binding\s*=\s*([-\d.]+)"
|
| energies = re.findall(energy_pattern, content)
|
|
|
| if energies:
|
| energies = [float(e) for e in energies]
|
| results["best_energy"] = min(energies)
|
| results["mean_energy"] = sum(energies) / len(energies)
|
| results["poses"] = [{"energy": e} for e in energies]
|
|
|
|
|
| cluster_pattern = r"CLUSTERING HISTOGRAM"
|
| if re.search(cluster_pattern, content):
|
| results["clusters"] = self._parse_clusters(content)
|
|
|
| except Exception as e:
|
| logger.error(f"Error parsing DLG file: {e}")
|
| results["error"] = str(e)
|
|
|
| return results
|
|
|
| def _parse_clusters(self, content: str) -> List[Dict]:
|
| """Parse cluster information from DLG content"""
|
| clusters = []
|
|
|
|
|
| import re
|
| cluster_lines = re.findall(
|
| r"RANKING.*?\n(.*?)\n\n",
|
| content,
|
| re.DOTALL
|
| )
|
|
|
| for i, cluster_text in enumerate(cluster_lines[:5]):
|
| clusters.append({
|
| "cluster_id": i + 1,
|
| "size": len(cluster_text.split('\n')),
|
| "representative_energy": None
|
| })
|
|
|
| return clusters
|
|
|
| async def _simulate_docking(
|
| self,
|
| ligand_file: str,
|
| receptor_file: str,
|
| output_dir: str,
|
| num_runs: int,
|
| progress_callback=None
|
| ) -> Dict:
|
| """
|
| Simulate docking when AutoDock is not available
|
| For development and testing purposes
|
| """
|
| logger.info("Running simulated docking...")
|
|
|
| import random
|
|
|
| poses = []
|
|
|
| for i in range(num_runs):
|
|
|
| await asyncio.sleep(0.01)
|
|
|
|
|
| energy = random.uniform(-12.5, -6.5)
|
| poses.append({
|
| "run": i + 1,
|
| "energy": round(energy, 2),
|
| "rmsd": round(random.uniform(0.5, 5.0), 2)
|
| })
|
|
|
|
|
| if progress_callback and (i + 1) % 5 == 0:
|
| progress = ((i + 1) / num_runs) * 100
|
| await progress_callback(progress)
|
|
|
|
|
| poses.sort(key=lambda x: x["energy"])
|
|
|
| results = {
|
| "poses": poses,
|
| "best_energy": poses[0]["energy"],
|
| "mean_energy": sum(p["energy"] for p in poses) / len(poses),
|
| "num_runs": num_runs,
|
| "simulation_mode": True,
|
| "clusters": [
|
| {"cluster_id": 1, "size": len(poses) // 3, "best_energy": poses[0]["energy"]},
|
| {"cluster_id": 2, "size": len(poses) // 3, "best_energy": poses[len(poses)//3]["energy"]},
|
| {"cluster_id": 3, "size": len(poses) - 2*(len(poses)//3), "best_energy": poses[2*(len(poses)//3)]["energy"]}
|
| ]
|
| }
|
|
|
|
|
| output_file = Path(output_dir) / "results.json"
|
| with open(output_file, 'w') as f:
|
| json.dump(results, f, indent=2)
|
|
|
| logger.info(f"Simulated docking completed. Best energy: {results['best_energy']} kcal/mol")
|
|
|
| return results
|
|
|
|
|
| class DockingJobManager:
|
| """
|
| Manages docking jobs, queue, and execution
|
| """
|
|
|
| def __init__(self, max_concurrent_jobs: int = 4):
|
| self.jobs: Dict[str, Dict] = {}
|
| self.job_queue = asyncio.Queue()
|
| self.max_concurrent_jobs = max_concurrent_jobs
|
| self.executor = AutoDockExecutor()
|
| self.workers = []
|
|
|
| async def start_workers(self):
|
| """Start worker tasks to process jobs"""
|
| logger.info(f"Starting {self.max_concurrent_jobs} worker tasks")
|
|
|
| for i in range(self.max_concurrent_jobs):
|
| worker = asyncio.create_task(self._worker(i))
|
| self.workers.append(worker)
|
|
|
| async def _worker(self, worker_id: int):
|
| """Worker task that processes jobs from the queue"""
|
| logger.info(f"Worker {worker_id} started")
|
|
|
| while True:
|
| try:
|
| job_id = await self.job_queue.get()
|
| logger.info(f"Worker {worker_id} processing job {job_id}")
|
|
|
| await self._process_job(job_id)
|
|
|
| self.job_queue.task_done()
|
|
|
| except asyncio.CancelledError:
|
| logger.info(f"Worker {worker_id} cancelled")
|
| break
|
| except Exception as e:
|
| logger.error(f"Worker {worker_id} error: {e}")
|
|
|
| async def _process_job(self, job_id: str):
|
| """Process a single docking job"""
|
|
|
| if job_id not in self.jobs:
|
| logger.error(f"Job {job_id} not found")
|
| return
|
|
|
| job = self.jobs[job_id]
|
| job["status"] = "running"
|
| job["started_at"] = datetime.now().isoformat()
|
|
|
| try:
|
|
|
| async def update_progress(progress: float):
|
| job["progress"] = progress
|
| logger.debug(f"Job {job_id} progress: {progress:.1f}%")
|
|
|
|
|
| results = await self.executor.run_docking(
|
| ligand_file=job["ligand_file"],
|
| receptor_file=job["receptor_file"],
|
| output_dir=job["output_dir"],
|
| num_runs=job.get("num_runs", 100),
|
| progress_callback=update_progress
|
| )
|
|
|
| job["status"] = "completed"
|
| job["progress"] = 100.0
|
| job["results"] = results
|
| job["completed_at"] = datetime.now().isoformat()
|
|
|
| logger.info(f"Job {job_id} completed successfully")
|
|
|
| except Exception as e:
|
| logger.error(f"Job {job_id} failed: {e}")
|
| job["status"] = "failed"
|
| job["error"] = str(e)
|
|
|
| async def submit_job(
|
| self,
|
| ligand_file: str,
|
| receptor_file: str,
|
| num_runs: int = 100,
|
| use_gpu: bool = True,
|
| job_name: Optional[str] = None
|
| ) -> str:
|
| """
|
| Submit a new docking job
|
|
|
| Returns:
|
| job_id: Unique identifier for the job
|
| """
|
| job_id = str(uuid.uuid4())[:8]
|
|
|
| output_dir = str(Path("results") / job_id)
|
| Path(output_dir).mkdir(parents=True, exist_ok=True)
|
|
|
| job = {
|
| "job_id": job_id,
|
| "job_name": job_name or f"Docking_{job_id}",
|
| "ligand_file": ligand_file,
|
| "receptor_file": receptor_file,
|
| "num_runs": num_runs,
|
| "use_gpu": use_gpu,
|
| "output_dir": output_dir,
|
| "status": "pending",
|
| "progress": 0.0,
|
| "created_at": datetime.now().isoformat(),
|
| "results": None
|
| }
|
|
|
| self.jobs[job_id] = job
|
| await self.job_queue.put(job_id)
|
|
|
| logger.info(f"Job {job_id} submitted to queue")
|
|
|
| return job_id
|
|
|
| def get_job(self, job_id: str) -> Optional[Dict]:
|
| """Get job details"""
|
| return self.jobs.get(job_id)
|
|
|
| def get_all_jobs(self) -> List[Dict]:
|
| """Get all jobs"""
|
| return list(self.jobs.values())
|
|
|
| def get_stats(self) -> Dict:
|
| """Get server statistics"""
|
| total = len(self.jobs)
|
| pending = sum(1 for j in self.jobs.values() if j["status"] == "pending")
|
| running = sum(1 for j in self.jobs.values() if j["status"] == "running")
|
| completed = sum(1 for j in self.jobs.values() if j["status"] == "completed")
|
| failed = sum(1 for j in self.jobs.values() if j["status"] == "failed")
|
|
|
| return {
|
| "total_jobs": total,
|
| "pending": pending,
|
| "running": running,
|
| "completed": completed,
|
| "failed": failed,
|
| "queue_size": self.job_queue.qsize(),
|
| "workers": self.max_concurrent_jobs
|
| }
|
|
|
|
|
|
|
| job_manager = DockingJobManager(max_concurrent_jobs=2)
|
|
|
|
|
| async def initialize_server():
|
| """Initialize the docking server"""
|
| logger.info("Initializing Docking@HOME server...")
|
| await job_manager.start_workers()
|
| logger.info("Server ready!")
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| async def test():
|
| await initialize_server()
|
|
|
|
|
| job_id = await job_manager.submit_job(
|
| ligand_file="test_ligand.pdbqt",
|
| receptor_file="test_receptor.pdbqt",
|
| num_runs=50
|
| )
|
|
|
| print(f"Submitted job: {job_id}")
|
|
|
|
|
| while True:
|
| job = job_manager.get_job(job_id)
|
| print(f"Status: {job['status']}, Progress: {job['progress']:.1f}%")
|
|
|
| if job["status"] in ["completed", "failed"]:
|
| break
|
|
|
| await asyncio.sleep(1)
|
|
|
| print(f"Final results: {job.get('results')}")
|
|
|
| asyncio.run(test())
|
|
|