Spaces:
Running
Running
File size: 17,652 Bytes
36dac03 131e2d3 36dac03 131e2d3 36dac03 131e2d3 36dac03 131e2d3 36dac03 131e2d3 36dac03 131e2d3 2a9bd42 131e2d3 36dac03 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 | # Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
API Integration Debugging Environment Implementation.
A real-world environment where an AI agent diagnoses and fixes broken
API integrations by reading error logs, inspecting configurations,
and submitting corrected configurations.
"""
import copy
from typing import Any, Dict, List, Optional, Set
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import ApiDebugAction, ApiDebugObservation
from ..scenarios import Issue, Scenario, get_all_task_ids, get_scenario
except ImportError:
from models import ApiDebugAction, ApiDebugObservation
from scenarios import Issue, Scenario, get_all_task_ids, get_scenario
class ApiDebugEnvironment(Environment):
"""
API Integration Debugging Environment.
An agent must diagnose and fix broken API integrations by:
1. Inspecting error logs to identify issues
2. Inspecting service configurations
3. Testing endpoints to observe failures
4. Submitting configuration fixes
Supports 3 difficulty levels (easy, medium, hard) with different
numbers of issues and complexity.
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self, task_id: str = "easy"):
"""
Initialize the environment.
Args:
task_id: One of 'easy', 'medium', 'hard'
"""
self._task_id = task_id
self._state = State(episode_id=str(uuid4()), step_count=0)
self._scenario: Optional[Scenario] = None
self._current_configs: Dict[str, Dict[str, Any]] = {}
self._issues_found: Set[str] = set()
self._issues_fixed: Set[str] = set()
self._inspected_targets: Set[str] = set()
self._done = False
self._last_action_result = ""
self._cumulative_reward = 0.0
def reset(self, task_id: Optional[str] = None) -> ApiDebugObservation:
"""
Reset the environment, optionally with a new task.
Args:
task_id: Override the task difficulty. One of 'easy', 'medium', 'hard'.
Returns:
Initial observation with task description and available targets.
"""
if task_id is not None:
self._task_id = task_id
self._state = State(episode_id=str(uuid4()), step_count=0)
self._scenario = get_scenario(self._task_id)
self._current_configs = copy.deepcopy(self._scenario.configs)
self._issues_found = set()
self._issues_fixed = set()
self._inspected_targets = set()
self._done = False
self._last_action_result = ""
self._cumulative_reward = 0.0
return ApiDebugObservation(
task_id=self._task_id,
task_description=self._scenario.description,
logs=[],
config_snapshot={},
api_response=None,
hints=self._get_hints(),
remaining_steps=self._scenario.max_steps,
issues_found=0,
issues_fixed=0,
issues_total=len(self._scenario.issues),
action_result="Environment reset. Use 'inspect_logs' or 'inspect_config' to start debugging.",
available_targets=self._scenario.services,
done=False,
reward=0.0,
)
def step(self, action: ApiDebugAction) -> ApiDebugObservation: # type: ignore[override]
"""
Execute one debugging step.
Args:
action: ApiDebugAction with action_type, target, and optional fix_payload
Returns:
ApiDebugObservation with results of the action
"""
if self._scenario is None:
# Auto-reset if not initialized
self.reset()
assert self._scenario is not None # for type checker
self._state.step_count += 1
reward = 0.0
logs: List[str] = []
config_snapshot: Dict[str, Any] = {}
api_response: Optional[Dict[str, Any]] = None
# Validate target
if action.target not in self._scenario.services:
self._last_action_result = (
f"Invalid target '{action.target}'. "
f"Valid targets: {self._scenario.services}"
)
reward = -0.05
elif action.action_type == "inspect_logs":
logs, reward = self._handle_inspect_logs(action.target)
elif action.action_type == "inspect_config":
config_snapshot, reward = self._handle_inspect_config(action.target)
elif action.action_type == "inspect_endpoint":
api_response, reward = self._handle_inspect_endpoint(action.target)
elif action.action_type == "submit_fix":
reward = self._handle_submit_fix(action.target, action.fix_payload or {})
else:
self._last_action_result = (
f"Invalid action_type '{action.action_type}'. "
"Valid types: inspect_logs, inspect_config, inspect_endpoint, submit_fix"
)
reward = -0.05
self._cumulative_reward += reward
# Check episode termination
remaining = self._scenario.max_steps - self._state.step_count
all_fixed = len(self._issues_fixed) == len(self._scenario.issues)
if all_fixed:
self._done = True
reward += 0.2 # completion bonus
self._cumulative_reward += 0.2
self._last_action_result += " π All issues fixed! Episode complete."
if remaining <= 0 and not self._done:
self._done = True
self._last_action_result += " β° Out of steps. Episode ended."
return ApiDebugObservation(
task_id=self._task_id,
task_description=self._scenario.description,
logs=logs,
config_snapshot=config_snapshot,
api_response=api_response,
hints=self._get_hints(),
remaining_steps=max(0, remaining),
issues_found=len(self._issues_found),
issues_fixed=len(self._issues_fixed),
issues_total=len(self._scenario.issues),
action_result=self._last_action_result,
available_targets=self._scenario.services,
done=self._done,
reward=reward,
metadata={
"cumulative_reward": self._cumulative_reward,
"step": self._state.step_count,
"issues_found_ids": list(self._issues_found),
"issues_fixed_ids": list(self._issues_fixed),
},
)
@property
def state(self) -> State:
"""Get current environment state."""
return self._state
# βββ Action Handlers ββββββββββββββββββββββββββββββββββββββββββββββββββ
def _handle_inspect_logs(self, target: str) -> tuple:
"""Return logs for a service and reward for relevant inspection."""
assert self._scenario is not None
logs = self._scenario.logs.get(target, [])
self._inspected_targets.add(f"logs:{target}")
# Check if any unfound issues have log hints in these logs
found_new = False
for issue in self._scenario.issues:
if issue.issue_id not in self._issues_found:
for log_line in logs:
if issue.log_hint in log_line:
self._issues_found.add(issue.issue_id)
found_new = True
if found_new:
reward = 0.15
self._last_action_result = f"Inspected logs for '{target}'. Found relevant error patterns!"
elif logs:
reward = 0.05
self._last_action_result = f"Inspected logs for '{target}'. {len(logs)} log entries found."
else:
reward = 0.0
self._last_action_result = f"No logs available for '{target}'."
return logs, reward
def _handle_inspect_config(self, target: str) -> tuple:
"""Return current config for a service."""
assert self._scenario is not None
config = self._current_configs.get(target, {})
self._inspected_targets.add(f"config:{target}")
# Small reward for inspecting a service that has issues
has_issues = any(i.service == target for i in self._scenario.issues if i.issue_id not in self._issues_fixed)
reward = 0.05 if has_issues else 0.02
self._last_action_result = f"Inspected config for '{target}'. Configuration retrieved."
return config, reward
def _handle_inspect_endpoint(self, target: str) -> tuple:
"""Simulate testing an endpoint and return the response."""
assert self._scenario is not None
# Find unfixed issues for this service
unfixed = [
i for i in self._scenario.issues
if i.service == target and i.issue_id not in self._issues_fixed
]
if unfixed:
# Simulate a failure based on the first unfixed issue
issue = unfixed[0]
api_response = {
"status": "error",
"status_code": 401 if "auth" in issue.issue_id else 500,
"error": issue.description,
"hint": f"Check the {issue.fix_key} configuration",
}
reward = 0.05
self._last_action_result = f"Tested endpoint on '{target}'. Got error response."
else:
api_response = {
"status": "success",
"status_code": 200,
"message": f"{target} is working correctly.",
}
reward = 0.02
self._last_action_result = f"Tested endpoint on '{target}'. Service responding OK."
return api_response, reward
def _handle_submit_fix(self, target: str, fix_payload: Dict[str, Any]) -> float:
"""Process a fix submission and score it."""
assert self._scenario is not None
if not fix_payload:
self._last_action_result = "Fix rejected: fix_payload cannot be empty."
return -0.1
# Find issues for this target service
target_issues = [
i for i in self._scenario.issues
if i.service == target and i.issue_id not in self._issues_fixed
]
if not target_issues:
self._last_action_result = f"No unfixed issues found for '{target}'."
return -0.05
reward = 0.0
fixed_any = False
for issue in target_issues:
if self._check_fix(issue, fix_payload):
self._issues_fixed.add(issue.issue_id)
self._issues_found.add(issue.issue_id) # finding + fixing counts
self._apply_fix(target, fix_payload)
reward += 0.25
fixed_any = True
if fixed_any:
fixed_count = sum(1 for i in target_issues if i.issue_id in self._issues_fixed)
self._last_action_result = (
f"Fix accepted for '{target}'! "
f"Fixed {fixed_count} issue(s). "
f"Total fixed: {len(self._issues_fixed)}/{len(self._scenario.issues)}"
)
else:
self._last_action_result = (
f"Fix rejected for '{target}'. The payload doesn't address any known issues. "
"Try inspecting logs and config to identify the correct fix."
)
reward = -0.1
return reward
# βββ Helper Methods βββββββββββββββββββββββββββββββββββββββββββββββββββ
def _check_fix(self, issue: Issue, fix_payload: Dict[str, Any]) -> bool:
"""
Check if a fix payload correctly addresses an issue.
Uses fuzzy matching β the fix is accepted if:
1. The fix_key is present in the payload, OR
2. Any expected_fix key is present in the payload with a reasonable value
"""
# Direct key match
if issue.fix_key in fix_payload:
return True
# Check nested key (e.g., "headers.Authorization" -> check payload for "Authorization")
if "." in issue.fix_key:
parts = issue.fix_key.split(".")
leaf_key = parts[-1]
if leaf_key in fix_payload:
return True
# Check expected fix keys
for key in issue.expected_fix:
if key in fix_payload:
return True
if "." in key:
leaf = key.split(".")[-1]
if leaf in fix_payload:
return True
return False
def _apply_fix(self, target: str, fix_payload: Dict[str, Any]) -> None:
"""Apply a fix to the current configuration."""
if target not in self._current_configs:
return
config = self._current_configs[target]
for key, value in fix_payload.items():
if "." in key:
# Nested key: e.g., "headers.Authorization"
parts = key.split(".")
obj = config
for part in parts[:-1]:
if part not in obj:
obj[part] = {}
obj = obj[part]
obj[parts[-1]] = value
else:
config[key] = value
def _get_hints(self) -> List[str]:
"""Return progressive hints based on step count."""
if self._scenario is None:
return []
hints = []
step = self._state.step_count
total_issues = len(self._scenario.issues)
unfixed = total_issues - len(self._issues_fixed)
if step == 0:
hints.append("Start by inspecting error logs for each service to find clues.")
hints.append(f"There are {total_issues} issues to find and fix.")
elif step > 0 and len(self._issues_found) == 0:
hints.append("Try 'inspect_logs' on different services to find error patterns.")
elif len(self._issues_found) > 0 and len(self._issues_fixed) == 0:
hints.append("You've found issues! Use 'inspect_config' to see current settings, then 'submit_fix'.")
elif unfixed > 0:
hints.append(f"{unfixed} issue(s) remaining. Check services you haven't inspected yet.")
# Late-game hints
if self._scenario.max_steps - step <= 5 and unfixed > 0:
# Give more specific hints when running low on steps
for issue in self._scenario.issues:
if issue.issue_id not in self._issues_fixed:
hints.append(f"Hint: Check '{issue.service}' β look for '{issue.fix_key}' in the config.")
return hints
# βββ Grading ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def grade(self) -> float:
"""
Grade the agent's performance on the current episode.
Score = (issues_fixed / issues_total) * efficiency_bonus + exploration_bonus
Efficiency bonus = 1.0 + (remaining_steps / max_steps * 0.3)
Exploration bonus = small credit for inspecting services (max 0.05)
Returns:
Score strictly between 0 and 1 (exclusive): in range (0.001, 0.999)
"""
if self._scenario is None:
return 0.001
total = len(self._scenario.issues)
if total == 0:
return 0.999
fix_ratio = len(self._issues_fixed) / total
remaining = max(0, self._scenario.max_steps - self._state.step_count)
efficiency_bonus = 1.0 + (remaining / self._scenario.max_steps * 0.3)
# Small partial credit for exploration even if no fixes submitted
exploration_bonus = min(0.05, len(self._inspected_targets) * 0.005)
score = fix_ratio * efficiency_bonus + exploration_bonus
# Clamp strictly to (0.001, 0.999) β NEVER exactly 0.0 or 1.0
return max(0.001, min(0.999, round(score, 4)))
def get_task_info(self) -> Dict[str, Any]:
"""Return information about the current task."""
if self._scenario is None:
return {"error": "Environment not initialized. Call reset() first."}
return {
"task_id": self._task_id,
"difficulty": self._scenario.difficulty,
"description": self._scenario.description,
"max_steps": self._scenario.max_steps,
"issues_total": len(self._scenario.issues),
"services": self._scenario.services,
"action_schema": {
"action_type": {
"type": "string",
"enum": ["inspect_logs", "inspect_config", "inspect_endpoint", "submit_fix"],
"description": "The type of debugging action to take",
},
"target": {
"type": "string",
"enum": self._scenario.services,
"description": "The service to act on",
},
"fix_payload": {
"type": "object",
"description": "Configuration fix (required for submit_fix action)",
"required": False,
},
},
}
|