Spaces:
Running
Running
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| API Integration Debugging Environment Implementation. | |
| A real-world environment where an AI agent diagnoses and fixes broken | |
| API integrations by reading error logs, inspecting configurations, | |
| and submitting corrected configurations. | |
| """ | |
| import copy | |
| from typing import Any, Dict, List, Optional, Set | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import ApiDebugAction, ApiDebugObservation | |
| from ..scenarios import Issue, Scenario, get_all_task_ids, get_scenario | |
| except ImportError: | |
| from models import ApiDebugAction, ApiDebugObservation | |
| from scenarios import Issue, Scenario, get_all_task_ids, get_scenario | |
| class ApiDebugEnvironment(Environment): | |
| """ | |
| API Integration Debugging Environment. | |
| An agent must diagnose and fix broken API integrations by: | |
| 1. Inspecting error logs to identify issues | |
| 2. Inspecting service configurations | |
| 3. Testing endpoints to observe failures | |
| 4. Submitting configuration fixes | |
| Supports 3 difficulty levels (easy, medium, hard) with different | |
| numbers of issues and complexity. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self, task_id: str = "easy"): | |
| """ | |
| Initialize the environment. | |
| Args: | |
| task_id: One of 'easy', 'medium', 'hard' | |
| """ | |
| self._task_id = task_id | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._scenario: Optional[Scenario] = None | |
| self._current_configs: Dict[str, Dict[str, Any]] = {} | |
| self._issues_found: Set[str] = set() | |
| self._issues_fixed: Set[str] = set() | |
| self._inspected_targets: Set[str] = set() | |
| self._done = False | |
| self._last_action_result = "" | |
| self._cumulative_reward = 0.0 | |
| def reset(self, task_id: Optional[str] = None) -> ApiDebugObservation: | |
| """ | |
| Reset the environment, optionally with a new task. | |
| Args: | |
| task_id: Override the task difficulty. One of 'easy', 'medium', 'hard'. | |
| Returns: | |
| Initial observation with task description and available targets. | |
| """ | |
| if task_id is not None: | |
| self._task_id = task_id | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._scenario = get_scenario(self._task_id) | |
| self._current_configs = copy.deepcopy(self._scenario.configs) | |
| self._issues_found = set() | |
| self._issues_fixed = set() | |
| self._inspected_targets = set() | |
| self._done = False | |
| self._last_action_result = "" | |
| self._cumulative_reward = 0.0 | |
| return ApiDebugObservation( | |
| task_id=self._task_id, | |
| task_description=self._scenario.description, | |
| logs=[], | |
| config_snapshot={}, | |
| api_response=None, | |
| hints=self._get_hints(), | |
| remaining_steps=self._scenario.max_steps, | |
| issues_found=0, | |
| issues_fixed=0, | |
| issues_total=len(self._scenario.issues), | |
| action_result="Environment reset. Use 'inspect_logs' or 'inspect_config' to start debugging.", | |
| available_targets=self._scenario.services, | |
| done=False, | |
| reward=0.0, | |
| ) | |
| def step(self, action: ApiDebugAction) -> ApiDebugObservation: # type: ignore[override] | |
| """ | |
| Execute one debugging step. | |
| Args: | |
| action: ApiDebugAction with action_type, target, and optional fix_payload | |
| Returns: | |
| ApiDebugObservation with results of the action | |
| """ | |
| if self._scenario is None: | |
| # Auto-reset if not initialized | |
| self.reset() | |
| assert self._scenario is not None # for type checker | |
| self._state.step_count += 1 | |
| reward = 0.0 | |
| logs: List[str] = [] | |
| config_snapshot: Dict[str, Any] = {} | |
| api_response: Optional[Dict[str, Any]] = None | |
| # Validate target | |
| if action.target not in self._scenario.services: | |
| self._last_action_result = ( | |
| f"Invalid target '{action.target}'. " | |
| f"Valid targets: {self._scenario.services}" | |
| ) | |
| reward = -0.05 | |
| elif action.action_type == "inspect_logs": | |
| logs, reward = self._handle_inspect_logs(action.target) | |
| elif action.action_type == "inspect_config": | |
| config_snapshot, reward = self._handle_inspect_config(action.target) | |
| elif action.action_type == "inspect_endpoint": | |
| api_response, reward = self._handle_inspect_endpoint(action.target) | |
| elif action.action_type == "submit_fix": | |
| reward = self._handle_submit_fix(action.target, action.fix_payload or {}) | |
| else: | |
| self._last_action_result = ( | |
| f"Invalid action_type '{action.action_type}'. " | |
| "Valid types: inspect_logs, inspect_config, inspect_endpoint, submit_fix" | |
| ) | |
| reward = -0.05 | |
| self._cumulative_reward += reward | |
| # Check episode termination | |
| remaining = self._scenario.max_steps - self._state.step_count | |
| all_fixed = len(self._issues_fixed) == len(self._scenario.issues) | |
| if all_fixed: | |
| self._done = True | |
| reward += 0.2 # completion bonus | |
| self._cumulative_reward += 0.2 | |
| self._last_action_result += " π All issues fixed! Episode complete." | |
| if remaining <= 0 and not self._done: | |
| self._done = True | |
| self._last_action_result += " β° Out of steps. Episode ended." | |
| return ApiDebugObservation( | |
| task_id=self._task_id, | |
| task_description=self._scenario.description, | |
| logs=logs, | |
| config_snapshot=config_snapshot, | |
| api_response=api_response, | |
| hints=self._get_hints(), | |
| remaining_steps=max(0, remaining), | |
| issues_found=len(self._issues_found), | |
| issues_fixed=len(self._issues_fixed), | |
| issues_total=len(self._scenario.issues), | |
| action_result=self._last_action_result, | |
| available_targets=self._scenario.services, | |
| done=self._done, | |
| reward=reward, | |
| metadata={ | |
| "cumulative_reward": self._cumulative_reward, | |
| "step": self._state.step_count, | |
| "issues_found_ids": list(self._issues_found), | |
| "issues_fixed_ids": list(self._issues_fixed), | |
| }, | |
| ) | |
| def state(self) -> State: | |
| """Get current environment state.""" | |
| return self._state | |
| # βββ Action Handlers ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _handle_inspect_logs(self, target: str) -> tuple: | |
| """Return logs for a service and reward for relevant inspection.""" | |
| assert self._scenario is not None | |
| logs = self._scenario.logs.get(target, []) | |
| self._inspected_targets.add(f"logs:{target}") | |
| # Check if any unfound issues have log hints in these logs | |
| found_new = False | |
| for issue in self._scenario.issues: | |
| if issue.issue_id not in self._issues_found: | |
| for log_line in logs: | |
| if issue.log_hint in log_line: | |
| self._issues_found.add(issue.issue_id) | |
| found_new = True | |
| if found_new: | |
| reward = 0.15 | |
| self._last_action_result = f"Inspected logs for '{target}'. Found relevant error patterns!" | |
| elif logs: | |
| reward = 0.05 | |
| self._last_action_result = f"Inspected logs for '{target}'. {len(logs)} log entries found." | |
| else: | |
| reward = 0.0 | |
| self._last_action_result = f"No logs available for '{target}'." | |
| return logs, reward | |
| def _handle_inspect_config(self, target: str) -> tuple: | |
| """Return current config for a service.""" | |
| assert self._scenario is not None | |
| config = self._current_configs.get(target, {}) | |
| self._inspected_targets.add(f"config:{target}") | |
| # Small reward for inspecting a service that has issues | |
| has_issues = any(i.service == target for i in self._scenario.issues if i.issue_id not in self._issues_fixed) | |
| reward = 0.05 if has_issues else 0.02 | |
| self._last_action_result = f"Inspected config for '{target}'. Configuration retrieved." | |
| return config, reward | |
| def _handle_inspect_endpoint(self, target: str) -> tuple: | |
| """Simulate testing an endpoint and return the response.""" | |
| assert self._scenario is not None | |
| # Find unfixed issues for this service | |
| unfixed = [ | |
| i for i in self._scenario.issues | |
| if i.service == target and i.issue_id not in self._issues_fixed | |
| ] | |
| if unfixed: | |
| # Simulate a failure based on the first unfixed issue | |
| issue = unfixed[0] | |
| api_response = { | |
| "status": "error", | |
| "status_code": 401 if "auth" in issue.issue_id else 500, | |
| "error": issue.description, | |
| "hint": f"Check the {issue.fix_key} configuration", | |
| } | |
| reward = 0.05 | |
| self._last_action_result = f"Tested endpoint on '{target}'. Got error response." | |
| else: | |
| api_response = { | |
| "status": "success", | |
| "status_code": 200, | |
| "message": f"{target} is working correctly.", | |
| } | |
| reward = 0.02 | |
| self._last_action_result = f"Tested endpoint on '{target}'. Service responding OK." | |
| return api_response, reward | |
| def _handle_submit_fix(self, target: str, fix_payload: Dict[str, Any]) -> float: | |
| """Process a fix submission and score it.""" | |
| assert self._scenario is not None | |
| if not fix_payload: | |
| self._last_action_result = "Fix rejected: fix_payload cannot be empty." | |
| return -0.1 | |
| # Find issues for this target service | |
| target_issues = [ | |
| i for i in self._scenario.issues | |
| if i.service == target and i.issue_id not in self._issues_fixed | |
| ] | |
| if not target_issues: | |
| self._last_action_result = f"No unfixed issues found for '{target}'." | |
| return -0.05 | |
| reward = 0.0 | |
| fixed_any = False | |
| for issue in target_issues: | |
| if self._check_fix(issue, fix_payload): | |
| self._issues_fixed.add(issue.issue_id) | |
| self._issues_found.add(issue.issue_id) # finding + fixing counts | |
| self._apply_fix(target, fix_payload) | |
| reward += 0.25 | |
| fixed_any = True | |
| if fixed_any: | |
| fixed_count = sum(1 for i in target_issues if i.issue_id in self._issues_fixed) | |
| self._last_action_result = ( | |
| f"Fix accepted for '{target}'! " | |
| f"Fixed {fixed_count} issue(s). " | |
| f"Total fixed: {len(self._issues_fixed)}/{len(self._scenario.issues)}" | |
| ) | |
| else: | |
| self._last_action_result = ( | |
| f"Fix rejected for '{target}'. The payload doesn't address any known issues. " | |
| "Try inspecting logs and config to identify the correct fix." | |
| ) | |
| reward = -0.1 | |
| return reward | |
| # βββ Helper Methods βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _check_fix(self, issue: Issue, fix_payload: Dict[str, Any]) -> bool: | |
| """ | |
| Check if a fix payload correctly addresses an issue. | |
| Uses fuzzy matching β the fix is accepted if: | |
| 1. The fix_key is present in the payload, OR | |
| 2. Any expected_fix key is present in the payload with a reasonable value | |
| """ | |
| # Direct key match | |
| if issue.fix_key in fix_payload: | |
| return True | |
| # Check nested key (e.g., "headers.Authorization" -> check payload for "Authorization") | |
| if "." in issue.fix_key: | |
| parts = issue.fix_key.split(".") | |
| leaf_key = parts[-1] | |
| if leaf_key in fix_payload: | |
| return True | |
| # Check expected fix keys | |
| for key in issue.expected_fix: | |
| if key in fix_payload: | |
| return True | |
| if "." in key: | |
| leaf = key.split(".")[-1] | |
| if leaf in fix_payload: | |
| return True | |
| return False | |
| def _apply_fix(self, target: str, fix_payload: Dict[str, Any]) -> None: | |
| """Apply a fix to the current configuration.""" | |
| if target not in self._current_configs: | |
| return | |
| config = self._current_configs[target] | |
| for key, value in fix_payload.items(): | |
| if "." in key: | |
| # Nested key: e.g., "headers.Authorization" | |
| parts = key.split(".") | |
| obj = config | |
| for part in parts[:-1]: | |
| if part not in obj: | |
| obj[part] = {} | |
| obj = obj[part] | |
| obj[parts[-1]] = value | |
| else: | |
| config[key] = value | |
| def _get_hints(self) -> List[str]: | |
| """Return progressive hints based on step count.""" | |
| if self._scenario is None: | |
| return [] | |
| hints = [] | |
| step = self._state.step_count | |
| total_issues = len(self._scenario.issues) | |
| unfixed = total_issues - len(self._issues_fixed) | |
| if step == 0: | |
| hints.append("Start by inspecting error logs for each service to find clues.") | |
| hints.append(f"There are {total_issues} issues to find and fix.") | |
| elif step > 0 and len(self._issues_found) == 0: | |
| hints.append("Try 'inspect_logs' on different services to find error patterns.") | |
| elif len(self._issues_found) > 0 and len(self._issues_fixed) == 0: | |
| hints.append("You've found issues! Use 'inspect_config' to see current settings, then 'submit_fix'.") | |
| elif unfixed > 0: | |
| hints.append(f"{unfixed} issue(s) remaining. Check services you haven't inspected yet.") | |
| # Late-game hints | |
| if self._scenario.max_steps - step <= 5 and unfixed > 0: | |
| # Give more specific hints when running low on steps | |
| for issue in self._scenario.issues: | |
| if issue.issue_id not in self._issues_fixed: | |
| hints.append(f"Hint: Check '{issue.service}' β look for '{issue.fix_key}' in the config.") | |
| return hints | |
| # βββ Grading ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def grade(self) -> float: | |
| """ | |
| Grade the agent's performance on the current episode. | |
| Score = (issues_fixed / issues_total) * efficiency_bonus + exploration_bonus | |
| Efficiency bonus = 1.0 + (remaining_steps / max_steps * 0.3) | |
| Exploration bonus = small credit for inspecting services (max 0.05) | |
| Returns: | |
| Score strictly between 0 and 1 (exclusive): in range (0.001, 0.999) | |
| """ | |
| if self._scenario is None: | |
| return 0.001 | |
| total = len(self._scenario.issues) | |
| if total == 0: | |
| return 0.999 | |
| fix_ratio = len(self._issues_fixed) / total | |
| remaining = max(0, self._scenario.max_steps - self._state.step_count) | |
| efficiency_bonus = 1.0 + (remaining / self._scenario.max_steps * 0.3) | |
| # Small partial credit for exploration even if no fixes submitted | |
| exploration_bonus = min(0.05, len(self._inspected_targets) * 0.005) | |
| score = fix_ratio * efficiency_bonus + exploration_bonus | |
| # Clamp strictly to (0.001, 0.999) β NEVER exactly 0.0 or 1.0 | |
| return max(0.001, min(0.999, round(score, 4))) | |
| def get_task_info(self) -> Dict[str, Any]: | |
| """Return information about the current task.""" | |
| if self._scenario is None: | |
| return {"error": "Environment not initialized. Call reset() first."} | |
| return { | |
| "task_id": self._task_id, | |
| "difficulty": self._scenario.difficulty, | |
| "description": self._scenario.description, | |
| "max_steps": self._scenario.max_steps, | |
| "issues_total": len(self._scenario.issues), | |
| "services": self._scenario.services, | |
| "action_schema": { | |
| "action_type": { | |
| "type": "string", | |
| "enum": ["inspect_logs", "inspect_config", "inspect_endpoint", "submit_fix"], | |
| "description": "The type of debugging action to take", | |
| }, | |
| "target": { | |
| "type": "string", | |
| "enum": self._scenario.services, | |
| "description": "The service to act on", | |
| }, | |
| "fix_payload": { | |
| "type": "object", | |
| "description": "Configuration fix (required for submit_fix action)", | |
| "required": False, | |
| }, | |
| }, | |
| } | |