| import os |
| from src.state import AgentState |
| from src.tools.tools import preprocess_files |
| from typing import Optional |
| from langgraph.prebuilt import ToolNode |
|
|
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage |
|
|
| from src.prompts.prompts import ( |
| SYSTEM_PROMPT_PLANNER, |
| SYSTEM_EXECUTOR_PROMPT, |
| COMPLEXITY_ASSESSOR_PROMPT, |
| CRITIC_PROMPT, |
| ) |
|
|
| from src.config import llm_reasoning, TOOLS, planner_llm, llm_with_tools, llm_deterministic, llm_criticist, llm_simple_executor, llm_simple_with_tools |
| from src.schemas import PlannerPlan, ComplexityLevel, CritiqueFeedback, ExecutionReport, ToolExecution |
|
|
| from src.utils.utils import ( |
| format_final_answer, |
| clean_message_history, |
| log_stage, |
| log_key_values, |
| display_plan, |
| format_plan_overview, |
| ) |
|
|
| def _build_planner_prompt(state: AgentState, extra_context: Optional[str] = None) -> str: |
| tool_catalogue = ", ".join(sorted(tool.name for tool in TOOLS)) |
| file_paths = state.get("files", []) |
| file_list = ", ".join(os.path.basename(path) for path in file_paths) if file_paths else "none provided" |
| extra = extra_context.strip() if extra_context else "None" |
| return SYSTEM_PROMPT_PLANNER.format( |
| tool_catalogue=tool_catalogue, |
| file_list=file_list, |
| extra_context=extra, |
| ).strip() |
|
|
| def query_input(state : AgentState) -> AgentState: |
| log_stage("USER QUERY", icon="💡") |
|
|
| files = state.get("files", []) |
| if files: |
| log_stage("FILE PREPARATION", subtitle=f"Processing {len(files)} file(s)", icon="📁") |
| file_info = preprocess_files(files) |
| |
| for file_path, info in file_info.items(): |
| |
| log_key_values( |
| [ |
| ("path", file_path), |
| ("type", info["type"]), |
| ("size", f"{info['size']} bytes"), |
| ("suggested_tool", info["suggested_tool"]), |
| ] |
| ) |
| state["file_contents"] = file_info |
| file_context = "\n\n=== AVAILABLE FILES FOR ANALYSIS ===\n" |
| for file_path, info in file_info.items(): |
| filename = os.path.basename(file_path) |
| file_context += f"File: {filename}\n" |
| file_context += f" - Type: {info['type']}\n" |
| file_context += f" - Size: {info['size']} bytes\n" |
| file_context += f" - Suggested tool: {info['suggested_tool']}\n" |
| if info.get("preview"): |
| file_context += f" - Preview: {info['preview']}\n" |
| file_context += "\n" |
| |
| |
| file_context += "IMPORTANT: Use the suggested tools to analyze these files before processing their data.\n" |
| file_context += "File paths are available in the agent state and can be passed directly to analysis tools.\n" |
| |
| else: |
| log_key_values([("files", "none provided")]) |
| file_context = "" |
| original_query = state.get("query", "") |
| state["query"] = original_query + file_context |
| return state |
|
|
|
|
| def planner(state : AgentState) -> AgentState: |
|
|
| log_stage("PLANNING", icon="🧭") |
| planner_prompt = _build_planner_prompt(state) |
|
|
| sys_stack = [ |
| SystemMessage(content=planner_prompt), |
| HumanMessage(content=state["query"]), |
| ] |
| plan: PlannerPlan = planner_llm.invoke(sys_stack) |
| |
| |
| display_plan(plan) |
| return { |
| "messages": state["messages"] + sys_stack, |
| "plan": plan, |
| "current_step": 0, |
| "reasoning_done": False, |
| } |
|
|
|
|
| def agent(state: AgentState) -> AgentState: |
| |
| |
| current_step = state.get("current_step", 0) |
| reasoning_done = state.get("reasoning_done", False) |
| plan: Optional[PlannerPlan] = state.get("plan") |
| previous_tool_results = state.get("previous_tool_results", {}) |
|
|
| |
|
|
| if not plan or not hasattr(plan, 'steps'): |
| log_stage("PLAN VALIDATION", subtitle="Planner returned no actionable steps", icon="⚠️") |
| warning = AIMessage(content="No valid plan available. <FINAL_ANSWER>") |
| return { |
| "messages": state["messages"] + [warning], |
| "reasoning_done": False, |
| } |
| |
| steps = plan.steps |
| |
| total_steps = len(steps) |
|
|
| if total_steps == 0: |
| log_stage("PLAN VALIDATION", subtitle="Plan indicates direct answer", icon="ℹ️") |
| direct = AIMessage(content="Plan has no steps; respond directly. <FINAL_ANSWER>") |
| return { |
| "messages": state["messages"] + [direct], |
| "reasoning_done": False, |
| } |
|
|
| if current_step >= total_steps: |
| log_stage("PLAN COMPLETE", subtitle="All steps executed", icon="✅") |
| completion = AIMessage(content="All plan steps completed. <FINAL_ANSWER>") |
| return { |
| "messages": state["messages"] + [completion], |
| "reasoning_done": False, |
| } |
|
|
| current_step_info = steps[current_step] |
|
|
| log_stage( |
| "EXECUTION", |
| subtitle=f"Step {current_step + 1}/{total_steps}: {current_step_info.goal}", |
| icon="🤖", |
| ) |
| log_key_values( |
| [ |
| ("step_id", current_step_info.id), |
| ("tool", current_step_info.tool or "none"), |
| ("expected", current_step_info.expected_result), |
| ] |
| ) |
|
|
| plan_overview = format_plan_overview(plan) |
| tool_catalogue = ", ".join(sorted(tool.name for tool in TOOLS)) |
| file_contents = state.get("file_contents", {}) |
| file_list = ", ".join(file_contents.keys()) if file_contents else "none provided" |
|
|
| |
| previous_results_context = "" |
| if previous_tool_results: |
| previous_results_context = f"\n\nPREVIOUS CALCULATION RESULTS:\n" |
| for tool_call_id, result in previous_tool_results.items(): |
| previous_results_context += f"- {tool_call_id}: {result}\n" |
| previous_results_context += "You can reference these results in your calculations.\n" |
|
|
|
|
| system_message = SystemMessage( |
| content=SYSTEM_EXECUTOR_PROMPT.format( |
| plan_summary=plan.summary, |
| plan_overview=plan_overview, |
| current_step_id=current_step_info.id, |
| step_goal=current_step_info.goal, |
| step_tool=current_step_info.tool or "no tool (respond directly)", |
| tool_catalogue=tool_catalogue, |
| file_list=file_list, |
| ).strip() |
| ) |
|
|
|
|
| if not reasoning_done: |
|
|
| log_stage("REASONING", subtitle=f"{current_step_info.id}", icon="🧠") |
| |
|
|
| file_context = "" |
| file_contents = state.get("file_contents", {}) |
| if file_contents: |
| file_context = "\n\nAVAILABLE FILES IN CURRENT SESSION:\n" |
| for filepath, info in file_contents.items(): |
| filename = os.path.basename(filepath) |
| file_context += f"- {filename}: {info['type']} file, suggested tool: {info['suggested_tool']}\n" |
| file_context += f" Path: {filepath}\n" |
|
|
| reasoning_prompt = f""" |
| {SYSTEM_EXECUTOR_PROMPT} |
| |
| CURRENT TASK: You must perform reasoning for step {current_step + 1}. |
| |
| STEP INFO: {current_step_info}\n\n |
| |
| FILE CONTEXT: {file_contents} |
| |
| CRITICAL: You MUST output your reasoning in <REASONING> tags, but DO NOT call any tools yet. |
| Explain what you need to do and why, then end your response. |
| |
| REASONING IS IMPERATIVE BEFORE ANY TOOL CALLS. |
| FOR MORE COMPLEX UNDERSTANDING -> USE RESULTS AND INSIGHTS FROM PREVIOUS STEPS. |
| """ |
|
|
| sys_msg = SystemMessage(content = reasoning_prompt) |
| stack = [sys_msg] + state["messages"] |
|
|
| step = llm_reasoning.invoke(stack) |
| |
| |
|
|
| return { |
| "messages" : state["messages"] + [step], |
| "reasoning_done" : True |
| } |
| |
| else: |
| tool_prompt = f""" |
| Now execute the tool for step {current_step + 1}. |
| |
| You have already done the reasoning. Now call the appropriate tool with the correct parameters. |
| Available file paths: {list(state.get("file_contents", {}).keys())}\n |
| IMPORTANT NOTE: IF YOU DECIDED TO USE safe_code_run, MAKE SURE TO FINISH CALCULATIONS WITH print() or saving to a variable NAMED 'result' so that the output can be captured! |
| AVAILABLE TOOLS: {', '.join([tool.name for tool in TOOLS])} |
| """ |
|
|
| sys_msg = SystemMessage(content=tool_prompt) |
| stack = [sys_msg] + state["messages"] |
| |
| |
| step = llm_with_tools.invoke(stack) |
| print("=== TOOL EXECUTION ===") |
| |
| print(f"Tool calls: {step.tool_calls}") |
| |
| return { |
| "messages": state["messages"] + [step], |
| "current_step": current_step + 1 if step.tool_calls else current_step, |
| "reasoning_done": False |
| } |
| |
| def should_continue(state : AgentState) -> bool: |
| |
| last_message = state["messages"][-1] |
| |
| reasoning_done = state.get("reasoning_done", False) |
| plan = state.get("plan", None) |
| current_step = state.get("current_step", 0) |
|
|
| print(f"=== SHOULD_CONTINUE DEBUG ===") |
| print(f"Current step: {current_step}") |
| print(f"Plan steps: {len(plan.steps) if plan else 0}") |
| print(f"Reasoning done: {reasoning_done}") |
| print(f"Last message type: {type(last_message).__name__}") |
|
|
| |
| if hasattr(last_message, "tool_calls") and last_message.tool_calls: |
| return "tools" |
| |
| |
| if hasattr(last_message, "content") and "<FINAL_ANSWER>" in last_message.content: |
| return "final_answer" |
| |
| |
| if not reasoning_done and hasattr(last_message, 'content') and "<REASONING>" in last_message.content: |
| |
| return "agent" |
| elif reasoning_done: |
| |
| return "agent" |
| elif not reasoning_done: |
| |
| return "agent" |
| |
| |
| if plan and current_step >= len(plan.steps): |
| return "final_answer" |
| |
| |
| return "agent" |
|
|
| |
| class DebuggingToolNode(ToolNode): |
| def __init__(self, tools): |
| super().__init__(tools) |
| |
| def __call__(self, state): |
| print("=== TOOL EXECUTION STARTED ===") |
| result = super().__call__(state) |
| print("=== TOOL EXECUTION COMPLETED ===") |
| return result |
| |
|
|
| def enhanced_finalizer(state: AgentState) -> AgentState: |
| """Generate comprehensive execution report for critic evaluation.""" |
| print("=== GENERATING EXECUTION REPORT ===") |
| |
| |
| tools_executed = [] |
| data_sources = [] |
| |
| for msg in state["messages"]: |
| if hasattr(msg, 'tool_calls') and msg.tool_calls: |
| for tool_call in msg.tool_calls: |
| tools_executed.append(ToolExecution( |
| tool_name=tool_call['name'], |
| arguments=str(tool_call['args']), |
| call_id=tool_call['id'] |
| )) |
| |
| |
| if hasattr(msg, 'content') and isinstance(msg.content, str): |
| |
| import re |
| urls = re.findall(r'https?://[^\s]+', msg.content) |
| data_sources.extend(urls) |
| |
| |
| plan = state.get("plan") |
| approach_used = "Direct execution" |
| assumptions_made = [] |
| |
| if plan: |
| approach_used = f"{plan.task_type} approach with {len(plan.steps)} steps" |
| assumptions_made = plan.assumptions |
| |
| |
| report_generator_prompt = f""" |
| Generate a comprehensive execution report for the following query processing: |
| |
| ORIGINAL QUERY: {state['query']} |
| |
| EXECUTION CONTEXT: |
| - Complexity Level: {state.get('complexity_assessment', {}).level} |
| - Plan Used: {plan if plan else {}} |
| - Tools Executed: {tools_executed} |
| - Available Files: {list(state.get('file_contents', {}).keys())} |
| |
| CONVERSATION HISTORY: |
| {[msg.content[:200] + "..." if len(msg.content) > 200 else msg.content |
| for msg in state['messages'][-5:]]} # Last 5 messages for context |
| |
| Based on this information, create a structured execution report that includes: |
| 1. Query summary |
| 2. Approach used |
| 3. Key findings from the execution |
| 4. Data sources used |
| 5. Your confidence level in the results |
| 6. Any limitations or caveats |
| 7. The final answer |
| |
| Be thorough but concise. This report will be evaluated by a critic for quality assurance. |
| """ |
| |
| report_llm = llm_deterministic.with_structured_output(ExecutionReport) |
| |
| execution_report = report_llm.invoke([ |
| SystemMessage(content=report_generator_prompt), |
| HumanMessage(content="Generate the execution report.") |
| ]) |
| |
| print(f"Report generated - Confidence: {execution_report.confidence_level}") |
| print(f"Key findings: {len(execution_report.key_findings)}") |
| print(f"Data sources: {len(execution_report.data_sources)}") |
| |
| |
| formatted_answer = format_final_answer(execution_report, state.get('complexity_assessment', {})) |
| |
| print(f"FINAL ANSWER FOR EVALUATOR: {execution_report.final_answer}") |
| |
| return { |
| "execution_report": execution_report, |
| "final_answer": formatted_answer |
| } |
|
|
|
|
| def simple_executor(state: AgentState) -> AgentState: |
| """Handle simple queries directly without planning.""" |
| print("=== SIMPLE EXECUTION ===") |
| |
| |
| simple_prompt = f""" |
| Answer this simple query directly and efficiently: {state['query']} |
| |
| You have access to tools if needed, but try to answer directly when possible. |
| If you need files, they are available at: {list(state.get('file_contents', {}).keys())} |
| |
| Provide a clear, concise answer. |
| """ |
| |
| response = llm_simple_with_tools.invoke([ |
| SystemMessage(content=simple_prompt), |
| HumanMessage(content=state['query']) |
| ]) |
|
|
| print("Response generated for simple query.") |
| |
| return { |
| "messages": state["messages"] + [response], |
| "final_answer": response.content |
| } |
|
|
| def should_use_tools_simple_executor(state: AgentState) -> str: |
| """Decide whether to use tools or answer directly in simple executor.""" |
| last_message = state["messages"][-1] |
| |
| if hasattr(last_message, "tool_calls") and last_message.tool_calls: |
| return "tools" |
| |
| if hasattr(last_message, "content") and "<FINAL_ANSWER>" in last_message.content: |
| return "final_answer" |
| |
| return "final_answer" |
|
|
|
|
| def should_use_planning(state: AgentState) -> str: |
| """Route based on complexity assessment.""" |
| complexity = state["complexity_assessment"] |
| |
| if complexity.level == "simple" and not complexity.needs_planning: |
| return "simple_executor" |
| else: |
| return "planner" |
| |
|
|
| def critic_evaluator(state: AgentState) -> AgentState: |
| """Enhanced critic that evaluates execution reports.""" |
| print("=== ENHANCED ANSWER CRITIQUE ===") |
| |
| report = state.get("execution_report") |
| critic_llm = llm_criticist.with_structured_output(CritiqueFeedback) |
| |
| critique_prompt = CRITIC_PROMPT.format( |
| query=report.query_summary, |
| approach=report.approach_used, |
| tools=report.tools_executed, |
| findings=report.key_findings, |
| sources=report.data_sources, |
| confidence=report.confidence_level, |
| limitations=report.limitations, |
| answer=report.final_answer |
| ) |
| |
| critique = critic_llm.invoke([ |
| SystemMessage(content=critique_prompt), |
| HumanMessage(content="Evaluate this execution report thoroughly.") |
| ]) |
| |
| print(f"Quality Score: {critique.quality_score}/10") |
| print(f"Complete: {critique.is_complete}") |
| print(f"Accurate: {critique.is_accurate}") |
| |
| if critique.errors_found: |
| print(f"Issues found: {critique.errors_found}") |
| |
| if critique.needs_replanning: |
| print(f"Replanning needed: {critique.replan_instructions}") |
| |
| return { |
| "critique_feedback": critique, |
| "iteration_count": state.get("iteration_count", 0) + 1 |
| } |
|
|
|
|
|
|
| def should_replan(state: AgentState) -> str: |
| """Decide whether to accept answer, replan, or stop.""" |
| critique = state.get("critique_feedback") |
| iteration_count = state.get("iteration_count", 0) |
| max_iterations = state.get("max_iterations", 3) |
| activator = state.get("critic_replan", False) |
|
|
| print(f"=== REPLAN DECISION ===") |
| print(f"Iteration: {iteration_count}/{max_iterations}") |
| print(f"Quality score: {critique.quality_score if critique else 'N/A'}") |
| print(f"Needs replanning: {critique.needs_replanning if critique else 'N/A'}") |
|
|
| if not activator: |
| return "end" |
| |
| if not critique: |
| return "end" |
| |
| |
| if iteration_count >= max_iterations: |
| print(f"Max iterations ({max_iterations}) reached. Accepting current answer.") |
| return "end" |
| |
| |
| if critique.quality_score >= 7 or not critique.needs_replanning: |
| print("Quality acceptable, ending execution") |
| return "end" |
| |
| |
| if critique.needs_replanning and iteration_count < max_iterations: |
| print("Replanning due to critic feedback...") |
| return "replan" |
| |
| return "end" |
|
|
| def replanner_old(state: AgentState) -> AgentState: |
| """Create a revised plan based on critic feedback.""" |
| print("=== REPLANNING ===") |
| |
| critique = state["critique_feedback"] |
| previous_plan = state.get("plan") |
| |
| replan_prompt = f""" |
| {SYSTEM_PROMPT_PLANNER} |
| |
| REPLANNING CONTEXT: |
| Original Query: {state['query']} |
| Previous Plan: {previous_plan if previous_plan else {}} |
| |
| CRITIC FEEDBACK: |
| - Quality Score: {critique.quality_score}/10 |
| - Issues Found: {critique.errors_found} |
| - Missing Elements: {critique.missing_elements} |
| - Improvement Suggestions: {critique.suggested_improvements} |
| - Specific Instructions: {critique.replan_instructions} |
| |
| Create a REVISED plan that addresses these issues. Focus on fixing the identified problems. |
| """ |
| |
| revised_plan = planner_llm.invoke([ |
| SystemMessage(content=replan_prompt), |
| HumanMessage(content="Create a revised plan based on the feedback.") |
| ]) |
| |
| print("Plan revised based on critic feedback") |
| |
| |
| current_messages = state.get("messages", []) |
| cleaned_messages = clean_message_history(current_messages) |
| |
| |
| essential_messages = [] |
| for msg in cleaned_messages: |
| if isinstance(msg, (SystemMessage, HumanMessage)): |
| |
| if ("complexity" in msg.content.lower() or |
| "assess" in msg.content.lower() or |
| isinstance(msg, HumanMessage)): |
| essential_messages.append(msg) |
| |
| |
| |
| |
| |
| |
|
|
| return { |
| "plan": revised_plan, |
| "current_step": 0, |
| "reasoning_done": False, |
| "messages": essential_messages, |
| "execution_report": None |
| } |
|
|
| def replanner(state: AgentState) -> AgentState: |
| """Create a revised plan based on critic feedback.""" |
| print("=== REPLANNING ===") |
| |
| critique = state["critique_feedback"] |
| previous_plan = state.get("plan") |
| |
| replan_prompt = f""" |
| {SYSTEM_PROMPT_PLANNER} |
| |
| REPLANNING CONTEXT: |
| Original Query: {state['query']} |
| Previous Plan: {previous_plan if previous_plan else {}} |
| |
| CRITIC FEEDBACK: |
| - Quality Score: {critique.quality_score}/10 |
| - Issues Found: {critique.errors_found} |
| - Missing Elements: {critique.missing_elements} |
| - Improvement Suggestions: {critique.suggested_improvements} |
| - Specific Instructions: {critique.replan_instructions} |
| |
| Create a REVISED plan that addresses these issues. Focus on fixing the identified problems. |
| """ |
| |
| revised_plan = planner_llm.invoke([ |
| SystemMessage(content=replan_prompt), |
| HumanMessage(content="Create a revised plan based on the feedback.") |
| ]) |
| |
| print("Plan revised based on critic feedback") |
| |
| |
| current_messages = state.get("messages", []) |
| state["previous_final_answer"] = state.get("final_answer", "") |
| |
| preserved_messages = [] |
| tool_results = {} |
| |
| for i, msg in enumerate(current_messages): |
| |
| if isinstance(msg, (SystemMessage, HumanMessage)): |
| |
| if (isinstance(msg, HumanMessage) or |
| ("complexity" in msg.content.lower() and "assessor" in msg.content.lower())): |
| preserved_messages.append(msg) |
| |
| |
| elif isinstance(msg, ToolMessage) and msg.content and msg.content.strip(): |
| |
| try: |
| |
| float(msg.content.strip()) |
| preserved_messages.append(msg) |
| tool_results[msg.tool_call_id] = msg.content |
| |
| |
| for j in range(i-1, -1, -1): |
| if (isinstance(current_messages[j], AIMessage) and |
| hasattr(current_messages[j], 'tool_calls') and |
| current_messages[j].tool_calls): |
| for tool_call in current_messages[j].tool_calls: |
| if tool_call['id'] == msg.tool_call_id: |
| if current_messages[j] not in preserved_messages: |
| preserved_messages.insert(-1, current_messages[j]) |
| break |
| break |
| except (ValueError, AttributeError): |
| |
| if len(msg.content.strip()) > 1: |
| preserved_messages.append(msg) |
| |
| print(f"Preserved {len(tool_results)} tool results") |
| |
| |
| |
| if tool_results: |
| context_msg = HumanMessage( |
| content=f"Previous calculation results available: {tool_results}" |
| ) |
| preserved_messages.append(context_msg) |
|
|
| return { |
| "plan": revised_plan, |
| "current_step": 0, |
| "reasoning_done": False, |
| "messages": preserved_messages, |
| "execution_report": None, |
| |
| "previous_tool_results": tool_results |
| } |
|
|
| def complexity_assessor(state: AgentState) -> AgentState: |
| """Assess query complexity and determine if planning is needed.""" |
| print("=== COMPLEXITY ASSESSMENT ===") |
| |
| complexity_llm = llm_deterministic.with_structured_output(ComplexityLevel) |
| |
| assessment_message = [ |
| SystemMessage(content=COMPLEXITY_ASSESSOR_PROMPT.strip()), |
| HumanMessage(content=f"Query: {state['query']}") |
| ] |
| |
| assessment = complexity_llm.invoke(assessment_message) |
| |
| print(f"Complexity: {assessment.level}") |
| print(f"Needs planning: {assessment.needs_planning}") |
| print(f"Reasoning: {assessment.reasoning}") |
| |
| return { |
| "complexity_assessment": assessment, |
| "messages": state["messages"] + assessment_message |
| } |