| from typing import TypedDict, Annotated, Sequence |
| import operator |
| import re |
| from langgraph.graph import StateGraph, END |
| from .ai_tools import Calculator, DocRetriever, WebSearcher |
|
|
| class AgentState(TypedDict): |
| input: str |
| context: Annotated[Sequence[str], operator.add] |
| last_tool: str |
| output: str |
|
|
| class GaiaGraph: |
| def __init__(self, model, tokenizer, tools): |
| self.model = model |
| self.tokenizer = tokenizer |
| self.tools = tools |
| self.tool_map = {tool.name: tool for tool in tools} |
| self.graph = self._build_graph() |
| |
| def _build_graph(self): |
| graph = StateGraph(AgentState) |
| |
| graph.add_node("agent", self._agent_node) |
| graph.add_node("tool", self._tool_node) |
| graph.set_entry_point("agent") |
| |
| graph.add_edge("agent", "tool") |
| graph.add_conditional_edges( |
| "tool", |
| self._route_action, |
| {"continue": "agent", "end": END} |
| ) |
| |
| return graph.compile() |
| |
| def _agent_node(self, state: AgentState) -> dict: |
| tool_list = "\n".join([f"- {t.name}: {t.description}" for t in self.tools]) |
| prompt = f"""<|system|> |
| You're an expert problem solver. Use these tools when needed: |
| {tool_list} |
| |
| Respond ONLY in this format: |
| Thought: <your reasoning> |
| Action: <tool_name or 'FINISH'> |
| Action Input: <input for tool> |
| </s> |
| <|user|> |
| {state['input']} |
| Context: {state['context']} |
| </s> |
| <|assistant|> |
| """ |
| |
| response = self.model( |
| prompt, |
| max_new_tokens=200, |
| do_sample=True, |
| temperature=0.2, |
| pad_token_id=self.tokenizer.eos_token_id |
| )[0]['generated_text'] |
| |
| |
| action_match = re.search(r"Action: (\w+)", response) |
| action_input_match = re.search(r"Action Input: (.+?)\n", response, re.DOTALL) |
| |
| if action_match and action_input_match: |
| tool_name = action_match.group(1) |
| tool_input = action_input_match.group(1).strip() |
| return { |
| "last_tool": tool_name, |
| "tool_input": tool_input, |
| "thought": response |
| } |
| else: |
| return {"last_tool": "FINISH", "output": response} |
| |
| def _tool_node(self, state: AgentState) -> dict: |
| if state["last_tool"] == "FINISH": |
| return {"output": state.get("output", "No output generated")} |
| |
| tool = self.tool_map.get(state["last_tool"]) |
| if not tool: |
| return {"context": f"Error: Unknown tool {state['last_tool']}"} |
| |
| result = tool.run(state["tool_input"]) |
| return {"context": f"Tool {tool.name} returned: {result}"} |
| |
| def _route_action(self, state: AgentState) -> str: |
| return "end" if state["last_tool"] == "FINISH" else "continue" |
| |
| def run(self, input: str) -> str: |
| state = {"input": input, "context": [], "last_tool": "", "output": ""} |
| for step in self.graph.stream(state): |
| for node, value in step.items(): |
| if node == "__end__": |
| return value["output"] |
| return "Execution completed without output" |