| | from fastapi import FastAPI, Request |
| | from fastapi.staticfiles import StaticFiles |
| | from typing import Dict, List |
| | from uuid import uuid4 |
| |
|
| | from .domain import MeetingSchedule |
| | from .converters import MeetingScheduleModel, schedule_to_model, model_to_schedule |
| | from .demo_data import generate_demo_data |
| | from .solver import solver_manager, solution_manager |
| | from .score_analysis import ConstraintAnalysisDTO, MatchAnalysisDTO |
| |
|
| | app = FastAPI(docs_url="/q/swagger-ui") |
| |
|
| | |
| | data_sets: Dict[str, MeetingSchedule] = {} |
| |
|
| |
|
| | @app.get("/demo-data") |
| | async def list_demo_data() -> List[str]: |
| | """List available demo data sets.""" |
| | return ["SMALL", "MEDIUM", "LARGE"] |
| |
|
| |
|
| | @app.get("/demo-data/{dataset_id}") |
| | async def get_demo_data(dataset_id: str) -> MeetingScheduleModel: |
| | """Get a demo data set by ID.""" |
| | domain_schedule = generate_demo_data() |
| | return schedule_to_model(domain_schedule) |
| |
|
| |
|
| | @app.get("/schedules") |
| | async def list_schedules() -> List[str]: |
| | """List all job IDs of submitted schedules.""" |
| | return list(data_sets.keys()) |
| |
|
| |
|
| | @app.get("/schedules/{schedule_id}") |
| | async def get_schedule(schedule_id: str) -> MeetingScheduleModel: |
| | """Get the solution and score for a given job ID.""" |
| | if schedule_id not in data_sets: |
| | raise ValueError(f"No schedule found with ID {schedule_id}") |
| |
|
| | schedule = data_sets[schedule_id] |
| | solver_status = solver_manager.get_solver_status(schedule_id) |
| | schedule.solver_status = solver_status |
| |
|
| | return schedule_to_model(schedule) |
| |
|
| |
|
| | @app.get("/schedules/{problem_id}/status") |
| | async def get_status(problem_id: str) -> Dict: |
| | """Get the schedule status and score for a given job ID.""" |
| | if problem_id not in data_sets: |
| | raise ValueError(f"No schedule found with ID {problem_id}") |
| |
|
| | schedule = data_sets[problem_id] |
| | solver_status = solver_manager.get_solver_status(problem_id) |
| |
|
| | return { |
| | "score": { |
| | "hardScore": schedule.score.hard_score if schedule.score else 0, |
| | "mediumScore": schedule.score.medium_score if schedule.score else 0, |
| | "softScore": schedule.score.soft_score if schedule.score else 0, |
| | }, |
| | "solverStatus": solver_status.name, |
| | } |
| |
|
| |
|
| | @app.delete("/schedules/{problem_id}") |
| | async def terminate_solving(problem_id: str) -> MeetingScheduleModel: |
| | """Terminate solving for a given job ID.""" |
| | if problem_id not in data_sets: |
| | raise ValueError(f"No schedule found with ID {problem_id}") |
| |
|
| | try: |
| | solver_manager.terminate_early(problem_id) |
| | except Exception as e: |
| | print(f"Warning: terminate_early failed for {problem_id}: {e}") |
| |
|
| | return await get_schedule(problem_id) |
| |
|
| |
|
| | def update_schedule(problem_id: str, schedule: MeetingSchedule) -> None: |
| | """Update the schedule in the data sets.""" |
| | global data_sets |
| | data_sets[problem_id] = schedule |
| |
|
| |
|
| | @app.post("/schedules") |
| | async def solve_schedule(request: Request) -> str: |
| | json_data = await request.json() |
| | job_id = str(uuid4()) |
| |
|
| | |
| | schedule_model = MeetingScheduleModel.model_validate(json_data) |
| |
|
| | |
| | domain_schedule = model_to_schedule(schedule_model) |
| |
|
| | data_sets[job_id] = domain_schedule |
| | solver_manager.solve_and_listen( |
| | job_id, domain_schedule, lambda solution: update_schedule(job_id, solution) |
| | ) |
| | return job_id |
| |
|
| |
|
| | @app.put("/schedules/analyze") |
| | async def analyze_schedule(request: Request) -> Dict: |
| | """Submit a schedule to analyze its score.""" |
| | json_data = await request.json() |
| |
|
| | |
| | schedule_model = MeetingScheduleModel.model_validate(json_data) |
| |
|
| | |
| | domain_schedule = model_to_schedule(schedule_model) |
| |
|
| | analysis = solution_manager.analyze(domain_schedule) |
| |
|
| | def serialize_justification(justification): |
| | """Convert justification facts to serializable dicts.""" |
| | if justification is None: |
| | return None |
| | facts = [] |
| | for fact in getattr(justification, 'facts', []): |
| | fact_dict = {'id': getattr(fact, 'id', None)} |
| | |
| | if hasattr(fact, 'meeting'): |
| | fact_dict['type'] = 'assignment' |
| | fact_dict['meeting'] = getattr(fact.meeting, 'id', None) if fact.meeting else None |
| | |
| | elif hasattr(fact, 'person') and hasattr(fact, 'meeting_id'): |
| | fact_dict['type'] = 'attendance' |
| | fact_dict['personId'] = getattr(fact.person, 'id', None) if fact.person else None |
| | fact_dict['meetingId'] = fact.meeting_id |
| | facts.append(fact_dict) |
| | return {'facts': facts} |
| |
|
| | |
| | constraints = [] |
| | for constraint in analysis.constraint_analyses: |
| | matches = [ |
| | MatchAnalysisDTO( |
| | name=match.constraint_ref.constraint_name, |
| | score=match.score, |
| | justification=serialize_justification(match.justification), |
| | ) |
| | for match in constraint.matches |
| | ] |
| |
|
| | constraint_dto = ConstraintAnalysisDTO( |
| | name=constraint.constraint_name, |
| | weight=constraint.weight, |
| | score=constraint.score, |
| | matches=matches, |
| | ) |
| | constraints.append(constraint_dto) |
| |
|
| | return {"constraints": [constraint.model_dump() for constraint in constraints]} |
| |
|
| |
|
| | |
| | app.mount("/", StaticFiles(directory="static", html=True), name="static") |
| |
|