| """
|
| BOINC Client Integration
|
| Handles distributed computing task submission and monitoring
|
| """
|
|
|
| import os
|
| import json
|
| import time
|
| import requests
|
| from typing import Dict, List, Optional
|
| from pathlib import Path
|
| from dataclasses import dataclass, asdict
|
| from datetime import datetime
|
| import yaml
|
|
|
| @dataclass
|
| class WorkUnit:
|
| """Represents a BOINC work unit"""
|
| id: str
|
| name: str
|
| workunit_type: str
|
| input_file: str
|
| status: str
|
| created_at: str
|
| completed_at: Optional[str] = None
|
| result_file: Optional[str] = None
|
| error: Optional[str] = None
|
|
|
| class BOINCClient:
|
| """BOINC client for distributed computing integration"""
|
|
|
| def __init__(self, config_path: str = "config.yml"):
|
| with open(config_path, 'r') as f:
|
| self.config = yaml.safe_load(f)['boinc']
|
|
|
| self.project_url = self.config['project_url']
|
| self.work_dir = Path(self.config['work_dir'])
|
| self.work_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| self.tasks_file = self.work_dir / "tasks.json"
|
| self.tasks = self._load_tasks()
|
|
|
| def _load_tasks(self) -> Dict[str, WorkUnit]:
|
| """Load existing tasks from disk"""
|
| if self.tasks_file.exists():
|
| with open(self.tasks_file, 'r') as f:
|
| data = json.load(f)
|
| return {k: WorkUnit(**v) for k, v in data.items()}
|
| return {}
|
|
|
| def _save_tasks(self):
|
| """Save tasks to disk"""
|
| with open(self.tasks_file, 'w') as f:
|
| data = {k: asdict(v) for k, v in self.tasks.items()}
|
| json.dump(data, f, indent=2)
|
|
|
| def submit_task(
|
| self,
|
| workunit_type: str,
|
| input_file: str,
|
| name: Optional[str] = None
|
| ) -> str:
|
| """
|
| Submit a new work unit to BOINC
|
|
|
| Args:
|
| workunit_type: Type of analysis (variant_calling, blast_search, etc.)
|
| input_file: Path to input data file
|
| name: Optional custom name for the work unit
|
|
|
| Returns:
|
| Work unit ID
|
| """
|
| task_id = f"wu_{int(time.time() * 1000)}"
|
|
|
| if name is None:
|
| name = f"{workunit_type}_{task_id}"
|
|
|
|
|
| work_unit = WorkUnit(
|
| id=task_id,
|
| name=name,
|
| workunit_type=workunit_type,
|
| input_file=input_file,
|
| status="pending",
|
| created_at=datetime.now().isoformat()
|
| )
|
|
|
|
|
|
|
| self._simulate_submission(work_unit)
|
|
|
| self.tasks[task_id] = work_unit
|
| self._save_tasks()
|
|
|
| return task_id
|
|
|
| def _simulate_submission(self, work_unit: WorkUnit):
|
| """
|
| Simulate BOINC submission (for development/demo purposes)
|
| In production, replace with actual BOINC API calls
|
| """
|
|
|
| task_dir = self.work_dir / work_unit.id
|
| task_dir.mkdir(exist_ok=True)
|
|
|
|
|
| input_path = Path(work_unit.input_file)
|
| if input_path.exists():
|
| import shutil
|
| shutil.copy(input_path, task_dir / input_path.name)
|
|
|
|
|
| metadata = {
|
| "task_id": work_unit.id,
|
| "type": work_unit.workunit_type,
|
| "input": work_unit.input_file,
|
| "submitted": work_unit.created_at
|
| }
|
|
|
| with open(task_dir / "metadata.json", 'w') as f:
|
| json.dump(metadata, f, indent=2)
|
|
|
| def get_task_status(self, task_id: str) -> Optional[WorkUnit]:
|
| """Get status of a specific task"""
|
| return self.tasks.get(task_id)
|
|
|
| def list_tasks(
|
| self,
|
| status: Optional[str] = None,
|
| workunit_type: Optional[str] = None
|
| ) -> List[WorkUnit]:
|
| """
|
| List all tasks with optional filtering
|
|
|
| Args:
|
| status: Filter by status (pending, running, completed, failed)
|
| workunit_type: Filter by work unit type
|
| """
|
| tasks = list(self.tasks.values())
|
|
|
| if status:
|
| tasks = [t for t in tasks if t.status == status]
|
|
|
| if workunit_type:
|
| tasks = [t for t in tasks if t.workunit_type == workunit_type]
|
|
|
| return sorted(tasks, key=lambda t: t.created_at, reverse=True)
|
|
|
| def update_task_status(self, task_id: str, status: str, **kwargs):
|
| """Update task status and additional fields"""
|
| if task_id in self.tasks:
|
| self.tasks[task_id].status = status
|
|
|
| for key, value in kwargs.items():
|
| if hasattr(self.tasks[task_id], key):
|
| setattr(self.tasks[task_id], key, value)
|
|
|
| if status == "completed":
|
| self.tasks[task_id].completed_at = datetime.now().isoformat()
|
|
|
| self._save_tasks()
|
|
|
| def cancel_task(self, task_id: str) -> bool:
|
| """Cancel a pending or running task"""
|
| if task_id in self.tasks:
|
| task = self.tasks[task_id]
|
| if task.status in ["pending", "running"]:
|
| task.status = "cancelled"
|
| self._save_tasks()
|
| return True
|
| return False
|
|
|
| def get_results(self, task_id: str) -> Optional[Path]:
|
| """Get results file for a completed task"""
|
| if task_id in self.tasks:
|
| task = self.tasks[task_id]
|
| if task.status == "completed" and task.result_file:
|
| result_path = Path(task.result_file)
|
| if result_path.exists():
|
| return result_path
|
| return None
|
|
|
| def get_statistics(self) -> Dict:
|
| """Get overall statistics about BOINC tasks"""
|
| total = len(self.tasks)
|
| by_status = {}
|
| by_type = {}
|
|
|
| for task in self.tasks.values():
|
| by_status[task.status] = by_status.get(task.status, 0) + 1
|
| by_type[task.workunit_type] = by_type.get(task.workunit_type, 0) + 1
|
|
|
| completed = [t for t in self.tasks.values() if t.completed_at]
|
|
|
| if completed:
|
| avg_time = sum([
|
| (datetime.fromisoformat(t.completed_at) -
|
| datetime.fromisoformat(t.created_at)).total_seconds()
|
| for t in completed
|
| ]) / len(completed)
|
| else:
|
| avg_time = 0
|
|
|
| return {
|
| "total_tasks": total,
|
| "by_status": by_status,
|
| "by_type": by_type,
|
| "completed_tasks": len(completed),
|
| "average_completion_time_seconds": avg_time
|
| }
|
|
|
|
|
| class BOINCTaskManager:
|
| """High-level task manager for common workflows"""
|
|
|
| def __init__(self):
|
| self.client = BOINCClient()
|
|
|
| def submit_variant_calling(self, fastq_file: str) -> str:
|
| """Submit variant calling task"""
|
| return self.client.submit_task(
|
| workunit_type="variant_calling",
|
| input_file=fastq_file,
|
| name=f"variant_calling_{Path(fastq_file).stem}"
|
| )
|
|
|
| def submit_blast_search(self, sequence_file: str) -> str:
|
| """Submit BLAST search task"""
|
| return self.client.submit_task(
|
| workunit_type="blast_search",
|
| input_file=sequence_file,
|
| name=f"blast_{Path(sequence_file).stem}"
|
| )
|
|
|
| def submit_alignment(self, fastq_file: str) -> str:
|
| """Submit sequence alignment task"""
|
| return self.client.submit_task(
|
| workunit_type="alignment",
|
| input_file=fastq_file,
|
| name=f"alignment_{Path(fastq_file).stem}"
|
| )
|
|
|
| def submit_annotation(self, vcf_file: str) -> str:
|
| """Submit variant annotation task"""
|
| return self.client.submit_task(
|
| workunit_type="annotation",
|
| input_file=vcf_file,
|
| name=f"annotation_{Path(vcf_file).stem}"
|
| )
|
|
|
| def batch_submit(
|
| self,
|
| workunit_type: str,
|
| input_files: List[str]
|
| ) -> List[str]:
|
| """Submit multiple tasks at once"""
|
| task_ids = []
|
| for input_file in input_files:
|
| task_id = self.client.submit_task(workunit_type, input_file)
|
| task_ids.append(task_id)
|
| return task_ids
|
|
|