| |
| import uuid |
| import logging |
| import time |
| import asyncio |
| from datetime import datetime |
| from typing import List, Dict, Optional |
| from concurrent.futures import ThreadPoolExecutor |
| import sys |
| import os |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| current_dir = os.path.dirname(os.path.abspath(__file__)) |
| parent_dir = os.path.dirname(current_dir) |
| sys.path.insert(0, parent_dir) |
| sys.path.insert(0, current_dir) |
|
|
| try: |
| from safety_threshold_matrix import should_trigger_user_choice |
| from safety_user_choice import create_safety_choice_prompt, process_safety_choice |
| from safety_choice_orchestrator import SafetyChoiceOrchestrator |
| SAFETY_CHOICE_AVAILABLE = True |
| logger.info("Safety choice modules loaded successfully") |
| except ImportError as e: |
| logger.warning(f"Safety choice modules not available: {e}") |
| SAFETY_CHOICE_AVAILABLE = False |
|
|
| class MVPOrchestrator: |
| def __init__(self, llm_router, context_manager, agents): |
| self.llm_router = llm_router |
| self.context_manager = context_manager |
| self.agents = agents |
| self.execution_trace = [] |
| |
| self._topic_cache = {} |
| self._topic_cache_max_size = 100 |
| |
| |
| self.safety_thresholds = { |
| "toxicity_or_harmful_language": 0.3, |
| "potential_biases_or_stereotypes": 0.05, |
| "privacy_or_security_concerns": 0.2, |
| "controversial_or_sensitive_topics": 0.3 |
| } |
| self.max_revision_attempts = 2 |
| self.revision_timeout = 30 |
| |
| |
| self.awaiting_safety_response = {} |
| self._pending_choices = {} |
| |
| |
| self._current_user_id = {} |
| |
| |
| self._context_cache = {} |
| |
| |
| self.recent_queries = [] |
| self.max_recent_queries = 50 |
| |
| |
| self.agent_call_count = 0 |
| self.agent_call_history = [] |
| self.max_agent_history = 50 |
| self.response_metrics_history = [] |
| self.metrics_history_max_size = 100 |
| |
| |
| self.context_classifier = None |
| self._classifier_initialized = False |
| |
| logger.info("MVPOrchestrator initialized with safety revision thresholds") |
| |
| def set_user_id(self, session_id: str, user_id: str): |
| """Set user_id with loop prevention""" |
| |
| old_user_id = self._current_user_id.get(session_id) |
| |
| if old_user_id != user_id: |
| self._current_user_id[session_id] = user_id |
| logger.info(f"Set user_id={user_id} for session {session_id} (was: {old_user_id})") |
| |
| |
| cache_key = f"context_{session_id}" |
| if cache_key in self._context_cache: |
| del self._context_cache[cache_key] |
| logger.info(f"Cleared context cache for session {session_id} due to user change") |
| else: |
| self._current_user_id[session_id] = user_id |
| |
| def _get_user_id_for_session(self, session_id: str) -> str: |
| """Get user_id without triggering context loops""" |
| |
| if hasattr(self, '_current_user_id') and session_id in self._current_user_id: |
| return self._current_user_id[session_id] |
| |
| |
| return "Test_Any" |
| |
| async def _get_or_create_context(self, session_id: str, user_input: str, user_id: str) -> dict: |
| """Get context with loop prevention and caching""" |
| |
| cache_key = f"context_{session_id}" |
| current_time = time.time() |
| |
| if hasattr(self, '_context_cache'): |
| cached = self._context_cache.get(cache_key) |
| if cached and (current_time - cached['timestamp']) < 5: |
| logger.info(f"Using cached context for session {session_id}") |
| return cached['context'] |
| |
| |
| context = await self.context_manager.manage_context(session_id, user_input, user_id=user_id) |
| |
| |
| if not hasattr(self, '_context_cache'): |
| self._context_cache = {} |
| |
| self._context_cache[cache_key] = { |
| 'context': context, |
| 'timestamp': current_time |
| } |
| |
| |
| if len(self._context_cache) > 100: |
| |
| sorted_items = sorted(self._context_cache.items(), key=lambda x: x[1]['timestamp']) |
| self._context_cache = dict(sorted_items[-50:]) |
| |
| return context |
| |
| async def process_request(self, session_id: str, user_input: str) -> dict: |
| """ |
| Main orchestration flow with loop prevention |
| """ |
| logger.info(f"Processing request for session {session_id}") |
| logger.info(f"User input: {user_input[:100]}") |
| |
| |
| user_input_upper = user_input.strip().upper() |
| is_binary_response = user_input_upper in ['YES', 'NO', 'APPLY', 'KEEP', 'Y', 'N'] |
| |
| |
| if is_binary_response and self.awaiting_safety_response.get(session_id, False): |
| logger.info(f"Binary safety response detected ({user_input_upper}) - bypassing recursive safety check") |
| |
| |
| self.awaiting_safety_response[session_id] = False |
| |
| |
| if hasattr(self, '_pending_choices'): |
| self._pending_choices.pop(session_id, None) |
| |
| |
| return { |
| 'is_safety_response': True, |
| 'response': user_input_upper, |
| 'requires_user_choice': False, |
| 'skip_safety_check': True, |
| 'final_response': f"Choice '{user_input_upper}' has been applied.", |
| 'bypass_reason': 'binary_safety_response' |
| } |
| |
| |
| self.execution_trace = [] |
| start_time = time.time() |
| |
| |
| reasoning_chain = { |
| "chain_of_thought": {}, |
| "alternative_paths": [], |
| "uncertainty_areas": [], |
| "evidence_sources": [], |
| "confidence_calibration": {} |
| } |
| |
| try: |
| |
| |
| similar_response = self.check_query_similarity(user_input, threshold=0.95) |
| if similar_response: |
| logger.info(f"Similar/duplicate query detected, using cached response") |
| |
| metrics_start = time.time() |
| self.track_response_metrics(metrics_start, similar_response) |
| return similar_response |
| |
| |
| interaction_id = self._generate_interaction_id(session_id) |
| logger.info(f"Generated interaction ID: {interaction_id}") |
| |
| |
| logger.info("Step 2: Managing context with loop prevention...") |
| |
| |
| user_id = self._get_user_id_for_session(session_id) |
| |
| |
| base_context = await self._get_or_create_context(session_id, user_input, user_id) |
| |
| |
| context_mode = 'fresh' |
| try: |
| if hasattr(self.context_manager, 'get_context_mode'): |
| context_mode = self.context_manager.get_context_mode(session_id) |
| except Exception as e: |
| logger.warning(f"Error getting context mode: {e}, using default 'fresh'") |
| |
| |
| relevance_classification = None |
| if context_mode == 'relevant': |
| try: |
| logger.info("Relevant context mode: Classifying and summarizing relevant sessions...") |
| |
| |
| if not self._classifier_initialized: |
| try: |
| from src.context_relevance_classifier import ContextRelevanceClassifier |
| self.context_classifier = ContextRelevanceClassifier(self.llm_router) |
| self._classifier_initialized = True |
| logger.info("Context relevance classifier initialized") |
| except ImportError as e: |
| logger.warning(f"Context relevance classifier not available: {e}") |
| self._classifier_initialized = True |
| |
| |
| if self.context_classifier: |
| all_session_contexts = [] |
| try: |
| if hasattr(self.context_manager, 'get_all_user_sessions'): |
| all_session_contexts = await self.context_manager.get_all_user_sessions(user_id) |
| else: |
| |
| all_session_contexts = await self._get_all_user_sessions(user_id) |
| except Exception as e: |
| logger.error(f"Error fetching user sessions: {e}", exc_info=True) |
| all_session_contexts = [] |
| |
| if all_session_contexts: |
| |
| relevance_classification = await self.context_classifier.classify_and_summarize_relevant_contexts( |
| current_input=user_input, |
| session_contexts=all_session_contexts, |
| user_id=user_id |
| ) |
| |
| logger.info( |
| f"Relevance classification complete: " |
| f"{len(relevance_classification.get('relevant_summaries', []))} sessions summarized, " |
| f"topic: '{relevance_classification.get('topic', 'unknown')}', " |
| f"time: {relevance_classification.get('processing_time', 0):.2f}s" |
| ) |
| else: |
| logger.info("No session contexts available for relevance classification") |
| else: |
| logger.debug("Context classifier not available, skipping relevance classification") |
| |
| except Exception as e: |
| logger.error(f"Error in relevance classification: {e}", exc_info=True) |
| |
| relevance_classification = None |
| |
| |
| try: |
| context = self.context_manager._optimize_context( |
| base_context, |
| relevance_classification=relevance_classification |
| ) |
| except Exception as e: |
| logger.error(f"Error optimizing context: {e}", exc_info=True) |
| |
| context = base_context |
| |
| interaction_contexts_count = len(context.get('interaction_contexts', [])) |
| logger.info(f"Context retrieved: {interaction_contexts_count} interaction contexts, mode: {context_mode}") |
| |
| |
| user_context = context.get('user_context', '') |
| has_user_context = bool(user_context) |
| |
| |
| main_topic = await self._extract_main_topic(user_input, context) |
| topic_continuity = await self._analyze_topic_continuity(context, user_input) |
| query_keywords = await self._extract_keywords(user_input) |
| |
| reasoning_chain["chain_of_thought"]["step_1"] = { |
| "hypothesis": f"User is asking about: '{main_topic}'", |
| "evidence": [ |
| f"Previous interaction contexts: {interaction_contexts_count}", |
| f"User context available: {has_user_context}", |
| f"Session duration: {self._calculate_session_duration(context)}", |
| f"Topic continuity: {topic_continuity}", |
| f"Query keywords: {query_keywords}" |
| ], |
| "confidence": 0.85, |
| "reasoning": f"Context analysis shows user is focused on {main_topic} with {interaction_contexts_count} previous interaction contexts and {'existing' if has_user_context else 'new'} user context" |
| } |
| |
| |
| |
| use_parallel = getattr(self, '_parallel_processing_enabled', True) |
| |
| if use_parallel: |
| logger.info("Step 3: Processing intent, skills, and safety in parallel...") |
| parallel_results = await self.process_request_parallel(session_id, user_input, context) |
| intent_result = parallel_results.get('intent', {}) |
| skills_result = parallel_results.get('skills', {}) |
| |
| else: |
| |
| logger.info("Step 3: Recognizing intent...") |
| self.execution_trace.append({ |
| "step": "intent_recognition", |
| "agent": "intent_recognition", |
| "status": "executing" |
| }) |
| intent_result = await self.agents['intent_recognition'].execute( |
| user_input=user_input, |
| context=context |
| ) |
| self.execution_trace[-1].update({ |
| "status": "completed", |
| "result": {"primary_intent": intent_result.get('primary_intent', 'unknown')} |
| }) |
| logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}") |
| |
| |
| logger.info("Step 3.5: Identifying relevant skills...") |
| self.execution_trace.append({ |
| "step": "skills_identification", |
| "agent": "skills_identification", |
| "status": "executing" |
| }) |
| skills_result = await self.agents['skills_identification'].execute( |
| user_input=user_input, |
| context=context |
| ) |
| self.execution_trace[-1].update({ |
| "status": "completed", |
| "result": {"skills_count": len(skills_result.get('identified_skills', []))} |
| }) |
| logger.info(f"Skills identified: {len(skills_result.get('identified_skills', []))} skills") |
| |
| |
| reasoning_chain["chain_of_thought"]["step_2_5"] = { |
| "hypothesis": f"User input relates to {len(skills_result.get('identified_skills', []))} expert skills", |
| "evidence": [ |
| f"Market analysis: {skills_result.get('market_analysis', {}).get('overall_analysis', 'N/A')}", |
| f"Skill classification: {skills_result.get('skill_classification', {}).get('classification_reasoning', 'N/A')}", |
| f"High-probability skills: {[s.get('skill', '') for s in skills_result.get('identified_skills', [])[:3]]}", |
| f"Confidence score: {skills_result.get('confidence_score', 0.5)}" |
| ], |
| "confidence": skills_result.get('confidence_score', 0.5), |
| "reasoning": f"Skills identification completed for topic '{main_topic}' with {len(skills_result.get('identified_skills', []))} relevant skills" |
| } |
| |
| |
| reasoning_chain["chain_of_thought"]["step_2"] = { |
| "hypothesis": f"User intent is '{intent_result.get('primary_intent', 'unknown')}' for topic '{main_topic}'", |
| "evidence": [ |
| f"Pattern analysis: {self._extract_pattern_evidence(user_input)}", |
| f"Confidence scores: {intent_result.get('confidence_scores', {})}", |
| f"Secondary intents: {intent_result.get('secondary_intents', [])}", |
| f"Query complexity: {self._assess_query_complexity(user_input)}" |
| ], |
| "confidence": intent_result.get('confidence_scores', {}).get(intent_result.get('primary_intent', 'unknown'), 0.7), |
| "reasoning": f"Intent '{intent_result.get('primary_intent', 'unknown')}' detected for {main_topic} based on linguistic patterns and context" |
| } |
| |
| |
| logger.info("Step 4: Creating execution plan...") |
| execution_plan = await self._create_execution_plan(intent_result, context) |
| |
| |
| reasoning_chain["chain_of_thought"]["step_3"] = { |
| "hypothesis": f"Optimal approach for '{intent_result.get('primary_intent', 'unknown')}' intent on '{main_topic}'", |
| "evidence": [ |
| f"Intent complexity: {self._assess_intent_complexity(intent_result)}", |
| f"Required agents: {execution_plan.get('agents_to_execute', [])}", |
| f"Execution strategy: {execution_plan.get('execution_order', 'sequential')}", |
| f"Response scope: {self._determine_response_scope(user_input)}" |
| ], |
| "confidence": 0.80, |
| "reasoning": f"Agent selection optimized for {intent_result.get('primary_intent', 'unknown')} intent regarding {main_topic}" |
| } |
| |
| |
| logger.info("Step 5: Executing agents...") |
| agent_results = await self._execute_agents(execution_plan, user_input, context) |
| logger.info(f"Agent execution complete: {len(agent_results)} results") |
| |
| |
| logger.info("Step 6: Synthesizing response...") |
| self.execution_trace.append({ |
| "step": "response_synthesis", |
| "agent": "response_synthesis", |
| "status": "executing" |
| }) |
| final_response = await self.agents['response_synthesis'].execute( |
| agent_outputs=agent_results, |
| user_input=user_input, |
| context=context, |
| skills_result=skills_result |
| ) |
| self.execution_trace[-1].update({ |
| "status": "completed", |
| "result": {"synthesis_method": final_response.get('synthesis_method', 'unknown')} |
| }) |
| |
| |
| reasoning_chain["chain_of_thought"]["step_4"] = { |
| "hypothesis": f"Response synthesis for '{main_topic}' using '{final_response.get('synthesis_method', 'unknown')}' method", |
| "evidence": [ |
| f"Synthesis quality: {final_response.get('coherence_score', 0.7)}", |
| f"Source integration: {len(final_response.get('source_references', []))} sources", |
| f"Response length: {len(str(final_response.get('final_response', '')))} characters", |
| f"Content relevance: {self._assess_content_relevance(user_input, final_response)}" |
| ], |
| "confidence": final_response.get('coherence_score', 0.7), |
| "reasoning": f"Multi-source synthesis for {main_topic} using {final_response.get('synthesis_method', 'unknown')} approach" |
| } |
| |
| |
| logger.info("Step 7: Safety check...") |
| self.execution_trace.append({ |
| "step": "safety_check", |
| "agent": "safety_check", |
| "status": "executing" |
| }) |
| safety_checked = await self.agents['safety_check'].execute( |
| response=final_response, |
| context=context |
| ) |
| self.execution_trace[-1].update({ |
| "status": "completed", |
| "result": {"warnings": safety_checked.get('warnings', [])} |
| }) |
| |
| |
| |
| intent_class = intent_result.get('primary_intent', 'casual_conversation') |
| response_content = final_response.get('final_response', '') or str(final_response.get('response', '')) |
| |
| |
| if SAFETY_CHOICE_AVAILABLE: |
| safety_analysis = safety_checked.get('safety_analysis', {}) |
| |
| |
| if should_trigger_user_choice(safety_analysis, intent_class): |
| logger.info(f"Safety concerns detected for intent '{intent_class}' - appending warnings to response") |
| |
| |
| from safety_threshold_matrix import format_safety_concerns |
| concerns_text = format_safety_concerns(safety_analysis, intent_class) |
| |
| if concerns_text: |
| |
| warning_section = f""" |
| |
| --- |
| |
| ## ⚠️ Safety Advisory |
| |
| This response has been flagged for potential safety concerns: |
| |
| {concerns_text} |
| |
| **Please review this content carefully and consider:** |
| - The potential impact on yourself and others |
| - Whether this content aligns with your intended use |
| - If additional verification or expert consultation is needed |
| |
| *This advisory is provided for transparency and user awareness. The response has not been modified.* |
| """ |
| |
| response_content = response_content + warning_section |
| |
| |
| final_response['final_response'] = response_content |
| if 'response' in final_response: |
| final_response['response'] = response_content |
| |
| |
| |
| safety_checked['safety_checked_response'] = response_content |
| safety_checked['original_response'] = response_content |
| |
| logger.info("Safety warnings appended to response - no user choice prompted (feature paused)") |
| |
| |
| reasoning_chain["chain_of_thought"]["step_5"] = { |
| "hypothesis": f"Safety validation for response about '{main_topic}'", |
| "evidence": [ |
| f"Safety score: {safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8)}", |
| f"Warnings generated: {len(safety_checked.get('warnings', []))}", |
| f"Analysis method: {safety_checked.get('safety_analysis', {}).get('analysis_method', 'unknown')}", |
| f"Content appropriateness: {self._assess_content_appropriateness(user_input, safety_checked)}" |
| ], |
| "confidence": safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8), |
| "reasoning": f"Safety analysis for {main_topic} content with non-blocking warning system" |
| } |
| |
| |
| |
| if 'final_response' in final_response: |
| final_response['final_response'] = response_content |
| if 'response' in final_response: |
| final_response['response'] = response_content |
| |
| |
| reasoning_chain["alternative_paths"] = self._generate_alternative_paths(intent_result, user_input, main_topic) |
| reasoning_chain["uncertainty_areas"] = self._identify_uncertainty_areas(intent_result, final_response, safety_checked) |
| reasoning_chain["evidence_sources"] = self._extract_evidence_sources(intent_result, final_response, context) |
| reasoning_chain["confidence_calibration"] = self._calibrate_confidence_scores(reasoning_chain) |
| |
| processing_time = time.time() - start_time |
| |
| |
| |
| merged_response = { |
| 'final_response': response_content, |
| 'response': response_content, |
| 'safety_checked_response': response_content, |
| 'original_response': response_content, |
| 'warnings': safety_checked.get('warnings', []) |
| } |
| |
| |
| result = self._format_final_output(merged_response, interaction_id, { |
| 'intent': intent_result.get('primary_intent', 'unknown'), |
| 'execution_plan': execution_plan, |
| 'processing_steps': [ |
| 'Context management', |
| 'Intent recognition', |
| 'Skills identification', |
| 'Execution planning', |
| 'Agent execution', |
| 'Response synthesis', |
| 'Safety check' |
| ], |
| 'processing_time': processing_time, |
| 'agents_used': list(self.agents.keys()), |
| 'intent_result': intent_result, |
| 'skills_result': skills_result, |
| 'synthesis_result': final_response, |
| 'safety_result': safety_checked, |
| 'reasoning_chain': reasoning_chain |
| }) |
| |
| |
| response_text = str(result.get('response', '')) |
| user_id = getattr(self, '_current_user_id', {}).get(session_id, "Test_Any") |
| if response_text: |
| self.context_manager._update_context(context, user_input, response_text, user_id=user_id) |
| |
| |
| interaction_id = result.get('interaction_id', f"{session_id}_{int(time.time())}") |
| try: |
| await self.context_manager.generate_interaction_context( |
| interaction_id=interaction_id, |
| session_id=session_id, |
| user_input=user_input, |
| system_response=response_text, |
| user_id=user_id |
| ) |
| |
| |
| |
| |
| try: |
| await self.context_manager.generate_session_context(session_id, user_id) |
| |
| except Exception as e: |
| logger.error(f"Error generating session context: {e}", exc_info=True) |
| |
| |
| if hasattr(self, '_context_cache'): |
| orchestrator_cache_key = f"context_{session_id}" |
| if orchestrator_cache_key in self._context_cache: |
| del self._context_cache[orchestrator_cache_key] |
| logger.debug(f"Orchestrator cache cleared for session {session_id} to refresh with updated contexts") |
| except Exception as e: |
| logger.error(f"Error generating interaction context: {e}", exc_info=True) |
| |
| |
| result = self.track_response_metrics(start_time, result) |
| |
| |
| if 'performance' not in result: |
| result['performance'] = { |
| "processing_time": round((time.time() - start_time) * 1000, 2), |
| "tokens_used": 0, |
| "agents_used": 0, |
| "confidence_score": 0, |
| "agent_contributions": [], |
| "safety_score": 80, |
| "latency_seconds": round(time.time() - start_time, 3), |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| |
| self.recent_queries.append({ |
| 'query': user_input, |
| 'response': result, |
| 'timestamp': time.time() |
| }) |
| |
| if len(self.recent_queries) > self.max_recent_queries: |
| self.recent_queries = self.recent_queries[-self.max_recent_queries:] |
| |
| logger.info(f"Request processing complete. Response length: {len(response_text)}") |
| return result |
| |
| except Exception as e: |
| logger.error(f"Error in process_request: {e}", exc_info=True) |
| processing_time = time.time() - start_time |
| return { |
| "response": f"Error processing request: {str(e)}", |
| "error": str(e), |
| "interaction_id": str(uuid.uuid4())[:8], |
| "agent_trace": [], |
| "timestamp": datetime.now().isoformat(), |
| "metadata": { |
| "agents_used": [], |
| "processing_time": processing_time, |
| "token_count": 0, |
| "warnings": [] |
| } |
| } |
| |
| def _generate_interaction_id(self, session_id: str) -> str: |
| """ |
| Generate unique interaction identifier |
| """ |
| timestamp = datetime.now().isoformat() |
| unique_id = str(uuid.uuid4())[:8] |
| return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}" |
| |
| async def _get_all_user_sessions(self, user_id: str) -> List[Dict]: |
| """ |
| Fetch all session contexts for relevance classification |
| Fallback method if context_manager doesn't have it |
| |
| Args: |
| user_id: User identifier |
| |
| Returns: |
| List of session context dictionaries |
| """ |
| try: |
| |
| if hasattr(self.context_manager, 'get_all_user_sessions'): |
| return await self.context_manager.get_all_user_sessions(user_id) |
| |
| |
| import sqlite3 |
| db_path = getattr(self.context_manager, 'db_path', 'sessions.db') |
| |
| conn = sqlite3.connect(db_path) |
| cursor = conn.cursor() |
| |
| cursor.execute(""" |
| SELECT DISTINCT |
| sc.session_id, |
| sc.session_summary, |
| sc.created_at, |
| (SELECT GROUP_CONCAT(ic.interaction_summary, ' ||| ') |
| FROM interaction_contexts ic |
| WHERE ic.session_id = sc.session_id |
| ORDER BY ic.created_at DESC |
| LIMIT 10) as recent_interactions |
| FROM session_contexts sc |
| JOIN sessions s ON sc.session_id = s.session_id |
| WHERE s.user_id = ? |
| ORDER BY sc.created_at DESC |
| LIMIT 50 |
| """, (user_id,)) |
| |
| sessions = [] |
| for row in cursor.fetchall(): |
| session_id, session_summary, created_at, interactions_str = row |
| |
| interaction_list = [] |
| if interactions_str: |
| for summary in interactions_str.split(' ||| '): |
| if summary.strip(): |
| interaction_list.append({ |
| 'summary': summary.strip(), |
| 'timestamp': created_at |
| }) |
| |
| sessions.append({ |
| 'session_id': session_id, |
| 'summary': session_summary or '', |
| 'created_at': created_at, |
| 'interaction_contexts': interaction_list |
| }) |
| |
| conn.close() |
| return sessions |
| |
| except Exception as e: |
| logger.error(f"Error fetching user sessions: {e}", exc_info=True) |
| return [] |
| |
| async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict: |
| """ |
| Create execution plan based on intent recognition |
| Maps intent types to specific execution tasks |
| """ |
| primary_intent = intent_result.get('primary_intent', 'casual_conversation') |
| secondary_intents = intent_result.get('secondary_intents', []) |
| confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.7) |
| |
| |
| intent_task_mapping = { |
| "information_request": { |
| "tasks": ["information_gathering", "content_research"], |
| "execution_order": "sequential", |
| "priority": "high" |
| }, |
| "task_execution": { |
| "tasks": ["task_planning", "execution_strategy"], |
| "execution_order": "sequential", |
| "priority": "high" |
| }, |
| "creative_generation": { |
| "tasks": ["creative_brainstorming", "content_ideation"], |
| "execution_order": "parallel", |
| "priority": "normal" |
| }, |
| "analysis_research": { |
| "tasks": ["research_analysis", "data_collection", "pattern_identification"], |
| "execution_order": "sequential", |
| "priority": "high" |
| }, |
| "troubleshooting": { |
| "tasks": ["problem_analysis", "solution_research"], |
| "execution_order": "sequential", |
| "priority": "high" |
| }, |
| "education_learning": { |
| "tasks": ["curriculum_planning", "educational_content"], |
| "execution_order": "sequential", |
| "priority": "normal" |
| }, |
| "technical_support": { |
| "tasks": ["technical_research", "guidance_generation"], |
| "execution_order": "sequential", |
| "priority": "high" |
| }, |
| "casual_conversation": { |
| "tasks": ["context_enrichment"], |
| "execution_order": "parallel", |
| "priority": "low" |
| } |
| } |
| |
| |
| plan = intent_task_mapping.get(primary_intent, { |
| "tasks": ["general_research"], |
| "execution_order": "parallel", |
| "priority": "normal" |
| }) |
| |
| |
| if confidence > 0.7 and secondary_intents: |
| for secondary_intent in secondary_intents[:2]: |
| secondary_plan = intent_task_mapping.get(secondary_intent) |
| if secondary_plan: |
| |
| existing_tasks = set(plan["tasks"]) |
| for task in secondary_plan["tasks"]: |
| if task not in existing_tasks: |
| plan["tasks"].append(task) |
| existing_tasks.add(task) |
| |
| logger.info(f"Execution plan created for intent '{primary_intent}': {len(plan['tasks'])} tasks, order={plan['execution_order']}") |
| |
| return { |
| "agents_to_execute": plan["tasks"], |
| "execution_order": plan["execution_order"], |
| "priority": plan["priority"], |
| "primary_intent": primary_intent, |
| "secondary_intents": secondary_intents |
| } |
| |
| async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict: |
| """ |
| Execute agents in parallel or sequential order based on plan |
| Actually executes task-specific LLM calls based on intent |
| """ |
| tasks = execution_plan.get("agents_to_execute", []) |
| execution_order = execution_plan.get("execution_order", "parallel") |
| primary_intent = execution_plan.get("primary_intent", "casual_conversation") |
| |
| if not tasks: |
| logger.warning("No tasks to execute in execution plan") |
| return {} |
| |
| logger.info(f"Executing {len(tasks)} tasks in {execution_order} order for intent '{primary_intent}'") |
| |
| results = {} |
| |
| |
| context_summary = self._build_context_summary(context) |
| |
| |
| task_prompts = self._build_task_prompts(user_input, context_summary, primary_intent) |
| |
| if execution_order == "parallel": |
| |
| task_coroutines = [] |
| for task in tasks: |
| if task in task_prompts: |
| coro = self._execute_single_task(task, task_prompts[task]) |
| task_coroutines.append((task, coro)) |
| else: |
| logger.warning(f"No prompt template for task: {task}") |
| |
| |
| if task_coroutines: |
| task_results = await asyncio.gather( |
| *[coro for _, coro in task_coroutines], |
| return_exceptions=True |
| ) |
| |
| |
| for (task, _), result in zip(task_coroutines, task_results): |
| if isinstance(result, Exception): |
| logger.error(f"Task {task} failed: {result}") |
| results[task] = {"error": str(result), "status": "failed"} |
| else: |
| results[task] = result |
| logger.info(f"Task {task} completed: {len(str(result))} chars") |
| else: |
| |
| previous_results = {} |
| for task in tasks: |
| if task in task_prompts: |
| |
| enhanced_prompt = task_prompts[task] |
| if previous_results: |
| enhanced_prompt += f"\n\nPrevious task results: {str(previous_results)}" |
| |
| try: |
| result = await self._execute_single_task(task, enhanced_prompt) |
| results[task] = result |
| previous_results[task] = result |
| logger.info(f"Task {task} completed: {len(str(result))} chars") |
| except Exception as e: |
| logger.error(f"Task {task} failed: {e}") |
| results[task] = {"error": str(e), "status": "failed"} |
| previous_results[task] = results[task] |
| else: |
| logger.warning(f"No prompt template for task: {task}") |
| |
| logger.info(f"Agent execution complete: {len(results)} results collected") |
| return results |
| |
| def _build_context_summary(self, context: dict) -> str: |
| """Build a concise summary of context for task execution (all from cache)""" |
| summary_parts = [] |
| |
| |
| session_context = context.get('session_context', {}) |
| session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
| if session_summary: |
| summary_parts.append(f"Session summary: {session_summary[:1500]}") |
| |
| |
| interaction_contexts = context.get('interaction_contexts', []) |
| if interaction_contexts: |
| recent_summaries = [ic.get('summary', '') for ic in interaction_contexts[-3:]] |
| if recent_summaries: |
| summary_parts.append(f"Recent conversation topics: {', '.join(recent_summaries)}") |
| |
| |
| user_context = context.get('user_context', '') |
| if user_context: |
| summary_parts.append(f"User background: {user_context[:200]}") |
| |
| return " | ".join(summary_parts) if summary_parts else "No prior context" |
| |
| async def process_agents_parallel(self, request: Dict) -> List: |
| """ |
| Step 1: Optimize Agent Chain - Process multiple agents in parallel |
| |
| Args: |
| request: Dictionary containing request data with 'user_input' and 'context' |
| |
| Returns: |
| List of agent results in order [intent_result, skills_result] |
| """ |
| user_input = request.get('user_input', '') |
| context = request.get('context', {}) |
| |
| |
| self.agent_call_count += 2 |
| |
| tasks = [ |
| self.agents['intent_recognition'].execute( |
| user_input=user_input, |
| context=context |
| ), |
| self.agents['skills_identification'].execute( |
| user_input=user_input, |
| context=context |
| ), |
| ] |
| |
| try: |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
| |
| processed_results = [] |
| for idx, result in enumerate(results): |
| if isinstance(result, Exception): |
| logger.error(f"Agent task {idx} failed: {result}") |
| processed_results.append({}) |
| else: |
| processed_results.append(result) |
| return processed_results |
| except Exception as e: |
| logger.error(f"Error in parallel agent processing: {e}", exc_info=True) |
| return [{}, {}] |
| |
| async def process_request_parallel(self, session_id: str, user_input: str, context: Dict) -> Dict: |
| """Process intent, skills, and safety in parallel with enhanced tracking""" |
| |
| |
| agents_called = [] |
| |
| |
| try: |
| intent_task = self.agents['intent_recognition'].execute( |
| user_input=user_input, |
| context=context |
| ) |
| agents_called.append('Intent') |
| |
| skills_task = self.agents['skills_identification'].execute( |
| user_input=user_input, |
| context=context |
| ) |
| agents_called.append('Skills') |
| |
| |
| safety_task = self.agents['safety_check'].execute( |
| response=user_input, |
| context=context |
| ) |
| agents_called.append('Safety') |
| |
| |
| self.agent_call_count += len(agents_called) |
| |
| |
| if len(self.agent_call_history) >= self.max_agent_history: |
| self.agent_call_history = self.agent_call_history[-self.max_agent_history:] |
| self.agent_call_history.append({ |
| 'agents': agents_called, |
| 'timestamp': time.time() |
| }) |
| |
| |
| results = await asyncio.gather( |
| intent_task, |
| skills_task, |
| safety_task, |
| return_exceptions=True |
| ) |
| |
| |
| intent_result = results[0] if not isinstance(results[0], Exception) else {} |
| skills_result = results[1] if not isinstance(results[1], Exception) else {} |
| safety_result = results[2] if not isinstance(results[2], Exception) else {} |
| |
| |
| if isinstance(results[0], Exception): |
| logger.error(f"Intent recognition error: {results[0]}") |
| if isinstance(results[1], Exception): |
| logger.error(f"Skills identification error: {results[1]}") |
| if isinstance(results[2], Exception): |
| logger.error(f"Safety check error: {results[2]}") |
| |
| return { |
| 'intent': intent_result, |
| 'skills': skills_result, |
| 'safety_precheck': safety_result, |
| 'agents_called': agents_called |
| } |
| |
| except Exception as e: |
| logger.error(f"Error in parallel processing: {e}", exc_info=True) |
| |
| return { |
| 'intent': await self.agents['intent_recognition'].execute(user_input=user_input, context=context), |
| 'skills': await self.agents['skills_identification'].execute(user_input=user_input, context=context), |
| 'safety_precheck': {} |
| } |
| |
| def _build_enhanced_context(self, session_id: str, prior_interactions: List[Dict]) -> Dict: |
| """Build enhanced context with memory accumulation""" |
| |
| |
| context = { |
| 'session_memory': [], |
| 'user_preferences': {}, |
| 'interaction_patterns': {}, |
| 'skills_used': set() |
| } |
| |
| |
| for idx, interaction in enumerate(prior_interactions): |
| weight = 1.0 / (idx + 1) |
| |
| |
| if 'skills' in interaction: |
| for skill in interaction['skills']: |
| if isinstance(skill, dict): |
| context['skills_used'].add(skill.get('name', skill.get('skill', ''))) |
| elif isinstance(skill, str): |
| context['skills_used'].add(skill) |
| |
| |
| if 'intent' in interaction: |
| intent = interaction['intent'] |
| if intent not in context['interaction_patterns']: |
| context['interaction_patterns'][intent] = 0 |
| context['interaction_patterns'][intent] += weight |
| |
| |
| if idx < 3: |
| context['session_memory'].append({ |
| 'summary': interaction.get('summary', ''), |
| 'timestamp': interaction.get('timestamp'), |
| 'relevance': weight |
| }) |
| |
| |
| context['skills_used'] = list(context['skills_used']) |
| |
| return context |
| |
| def _build_task_prompts(self, user_input: str, context_summary: str, primary_intent: str) -> dict: |
| """Build task-specific prompts for execution""" |
| |
| base_context = f"User Query: {user_input}\nContext: {context_summary}" |
| |
| prompts = { |
| "information_gathering": f""" |
| {base_context} |
| |
| Task: Gather comprehensive, accurate information relevant to the user's query. |
| Focus on facts, definitions, explanations, and verified information. |
| Structure the information clearly and cite key points. |
| """, |
| |
| "content_research": f""" |
| {base_context} |
| |
| Task: Research and compile detailed content about the topic. |
| Include multiple perspectives, current information, and relevant examples. |
| Organize findings logically with clear sections. |
| """, |
| |
| "task_planning": f""" |
| {base_context} |
| |
| Task: Create a detailed execution plan for the requested task. |
| Break down into clear steps, identify requirements, and outline expected outcomes. |
| Consider potential challenges and solutions. |
| """, |
| |
| "execution_strategy": f""" |
| {base_context} |
| |
| Task: Develop a strategic approach for task execution. |
| Define methodology, best practices, and implementation considerations. |
| Provide actionable guidance with clear priorities. |
| """, |
| |
| "creative_brainstorming": f""" |
| {base_context} |
| |
| Task: Generate creative ideas and approaches for content creation. |
| Explore different angles, styles, and formats. |
| Provide diverse creative options with implementation suggestions. |
| """, |
| |
| "content_ideation": f""" |
| {base_context} |
| |
| Task: Develop content concepts and detailed ideation. |
| Create outlines, themes, and structural frameworks. |
| Suggest variations and refinement paths. |
| """, |
| |
| "research_analysis": f""" |
| {base_context} |
| |
| Task: Conduct thorough research analysis on the topic. |
| Identify key findings, trends, patterns, and insights. |
| Analyze different perspectives and methodologies. |
| """, |
| |
| "data_collection": f""" |
| {base_context} |
| |
| Task: Collect and organize relevant data points and evidence. |
| Gather statistics, examples, case studies, and supporting information. |
| Structure data for easy analysis and reference. |
| """, |
| |
| "pattern_identification": f""" |
| {base_context} |
| |
| Task: Identify patterns, correlations, and significant relationships. |
| Analyze trends, cause-effect relationships, and underlying structures. |
| Provide insights based on pattern recognition. |
| """, |
| |
| "problem_analysis": f""" |
| {base_context} |
| |
| Task: Analyze the problem in detail. |
| Identify root causes, contributing factors, and constraints. |
| Break down the problem into components for systematic resolution. |
| """, |
| |
| "solution_research": f""" |
| {base_context} |
| |
| Task: Research and evaluate potential solutions. |
| Compare approaches, assess pros/cons, and recommend best practices. |
| Consider implementation feasibility and effectiveness. |
| """, |
| |
| "curriculum_planning": f""" |
| {base_context} |
| |
| Task: Design educational curriculum and learning path. |
| Structure content progressively, define learning objectives, and suggest resources. |
| Create a comprehensive learning framework. |
| """, |
| |
| "educational_content": f""" |
| {base_context} |
| |
| Task: Generate educational content with clear explanations. |
| Use teaching methods, examples, analogies, and progressive complexity. |
| Make content accessible and engaging for learning. |
| """, |
| |
| "technical_research": f""" |
| {base_context} |
| |
| Task: Research technical aspects and solutions. |
| Gather technical documentation, best practices, and implementation details. |
| Structure technical information clearly with practical guidance. |
| """, |
| |
| "guidance_generation": f""" |
| {base_context} |
| |
| Task: Generate step-by-step guidance and instructions. |
| Create clear, actionable steps with explanations and troubleshooting tips. |
| Ensure guidance is comprehensive and easy to follow. |
| """, |
| |
| "context_enrichment": f""" |
| {base_context} |
| |
| Task: Enrich the conversation with relevant context and insights. |
| Add helpful background information, connections to previous topics, and engaging details. |
| Enhance understanding and engagement. |
| """, |
| |
| "general_research": f""" |
| {base_context} |
| |
| Task: Conduct general research and information gathering. |
| Compile relevant information, insights, and useful details about the topic. |
| Organize findings for clear presentation. |
| """ |
| } |
| |
| return prompts |
| |
| async def _execute_single_task(self, task_name: str, prompt: str) -> dict: |
| """Execute a single task using the LLM router""" |
| try: |
| logger.debug(f"Executing task: {task_name}") |
| logger.debug(f"Task prompt length: {len(prompt)}") |
| |
| |
| result = await self.llm_router.route_inference( |
| task_type="general_reasoning", |
| prompt=prompt, |
| max_tokens=2000, |
| temperature=0.7 |
| ) |
| |
| if result: |
| return { |
| "task": task_name, |
| "status": "completed", |
| "content": result, |
| "content_length": len(str(result)) |
| } |
| else: |
| logger.warning(f"Task {task_name} returned empty result") |
| return { |
| "task": task_name, |
| "status": "empty", |
| "content": "", |
| "content_length": 0 |
| } |
| |
| except Exception as e: |
| logger.error(f"Error executing task {task_name}: {e}", exc_info=True) |
| return { |
| "task": task_name, |
| "status": "error", |
| "error": str(e), |
| "content": "" |
| } |
| |
| def _format_final_output(self, response: dict, interaction_id: str, additional_metadata: dict = None) -> dict: |
| """ |
| Format final output with tracing and metadata |
| """ |
| |
| response_text = ( |
| response.get("final_response") or |
| response.get("safety_checked_response") or |
| response.get("original_response") or |
| response.get("response") or |
| str(response.get("result", "")) |
| ) |
| |
| if not response_text: |
| response_text = "I apologize, but I'm having trouble generating a response right now. Please try again." |
| |
| |
| warnings = [] |
| if "warnings" in response: |
| warnings = response["warnings"] if isinstance(response["warnings"], list) else [] |
| |
| |
| metadata = { |
| "agents_used": response.get("agents_used", []), |
| "processing_time": response.get("processing_time", 0), |
| "token_count": response.get("token_count", 0), |
| "warnings": warnings |
| } |
| |
| |
| if additional_metadata: |
| metadata.update(additional_metadata) |
| |
| return { |
| "interaction_id": interaction_id, |
| "response": response_text, |
| "final_response": response_text, |
| "confidence_score": response.get("confidence_score", 0.7), |
| "agent_trace": self.execution_trace if self.execution_trace else [ |
| {"step": "complete", "agent": "orchestrator", "status": "completed"} |
| ], |
| "timestamp": datetime.now().isoformat(), |
| "metadata": metadata |
| } |
| |
| async def handle_user_safety_decision(self, choice_id: str, user_decision: bool, session_id: str = None) -> dict: |
| """ |
| Handle user's safety decision and complete processing |
| |
| Args: |
| choice_id: The choice identifier from the prompt |
| user_decision: True for revision, False for original with warnings |
| session_id: Session identifier |
| |
| Returns: |
| dict: Final response based on user choice |
| """ |
| try: |
| |
| if session_id: |
| self.awaiting_safety_response[session_id] = False |
| |
| if not SAFETY_CHOICE_AVAILABLE: |
| logger.warning("Safety choice modules not available") |
| return {'error': 'Safety choice system not available'} |
| |
| choice_result = process_safety_choice(choice_id, user_decision) |
| |
| if 'error' in choice_result: |
| logger.error(f"Error processing safety choice: {choice_result['error']}") |
| return choice_result |
| |
| if choice_result['action'] == 'proceed_with_revision': |
| logger.info("User chose revision - applying safety revisions") |
| |
| safety_issues = choice_result['safety_analysis'].get('detected_issues', []) |
| safety_scores = choice_result['safety_analysis'].get('safety_scores', {}) |
| |
| if not safety_scores: |
| confidence_scores = choice_result['safety_analysis'].get('confidence_scores', {}) |
| if confidence_scores: |
| exceeded_categories = [] |
| if confidence_scores.get('toxicity', 0) > 0.3: |
| exceeded_categories.append('toxicity_or_harmful_language') |
| if confidence_scores.get('bias', 0) > 0.05: |
| exceeded_categories.append('potential_biases_or_stereotypes') |
| if confidence_scores.get('privacy', 0) > 0.2: |
| exceeded_categories.append('privacy_or_security_concerns') |
| else: |
| exceeded_categories = [k for k, v in safety_scores.items() if isinstance(v, (int, float)) and v > 0.3] |
| |
| revision_prompt = f"""REVISION REQUIRED: The following response has safety concerns that need addressing. |
| |
| Original Response: {choice_result['original_response']} |
| |
| Safety Issues Detected: {', '.join(exceeded_categories) if exceeded_categories else 'General safety concerns'} |
| Specific Warnings: {'; '.join(safety_issues) if safety_issues else 'General safety concerns detected'} |
| |
| Please revise the response to address these concerns while maintaining helpfulness and accuracy. |
| """ |
| |
| revised_result = await self.agents['response_synthesis'].execute( |
| agent_outputs={}, |
| user_input=revision_prompt, |
| context={} |
| ) |
| |
| revised_response = revised_result.get('final_response', choice_result['original_response']) |
| |
| return { |
| 'response': revised_response, |
| 'final_response': revised_response, |
| 'safety_analysis': choice_result['safety_analysis'], |
| 'user_choice': 'revision', |
| 'revision_applied': True, |
| 'interaction_id': str(uuid.uuid4())[:8], |
| 'timestamp': datetime.now().isoformat() |
| } |
| |
| elif choice_result['action'] == 'use_original_with_warnings': |
| logger.info("User chose original response with safety warnings") |
| |
| return { |
| 'response': choice_result['response_content'], |
| 'final_response': choice_result['response_content'], |
| 'safety_analysis': choice_result['safety_analysis'], |
| 'user_choice': 'original_with_warnings', |
| 'revision_applied': False, |
| 'interaction_id': str(uuid.uuid4())[:8], |
| 'timestamp': datetime.now().isoformat() |
| } |
| |
| else: |
| logger.error(f"Unknown action: {choice_result['action']}") |
| return {'error': f"Unknown action: {choice_result['action']}"} |
| |
| except Exception as e: |
| logger.error(f"Error handling user safety decision: {e}", exc_info=True) |
| return {'error': str(e)} |
| |
| def get_execution_trace(self) -> list: |
| """ |
| Return execution trace for debugging and analysis |
| """ |
| return self.execution_trace |
| |
| def clear_execution_trace(self): |
| """ |
| Clear the execution trace |
| """ |
| self.execution_trace = [] |
| |
| def _calculate_session_duration(self, context: dict) -> str: |
| """Calculate session duration for reasoning context""" |
| interaction_contexts = context.get('interaction_contexts', []) |
| if not interaction_contexts: |
| return "New session" |
| |
| |
| interaction_count = len(interaction_contexts) |
| if interaction_count < 5: |
| return "Short session (< 5 interactions)" |
| elif interaction_count < 20: |
| return "Medium session (5-20 interactions)" |
| else: |
| return "Long session (> 20 interactions)" |
| |
| async def _analyze_topic_continuity(self, context: dict, user_input: str) -> str: |
| """Analyze topic continuity using LLM zero-shot classification (uses session context and interaction contexts from cache)""" |
| try: |
| |
| session_context = context.get('session_context', {}) |
| session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
| |
| interaction_contexts = context.get('interaction_contexts', []) |
| if not interaction_contexts and not session_summary: |
| return "No previous context" |
| |
| |
| recent_interactions_summary = "\n".join([ |
| f"- {ic.get('summary', '')}" |
| for ic in interaction_contexts[:3] |
| if ic.get('summary') |
| ]) |
| |
| |
| if self.llm_router: |
| prompt = f"""Determine if the current query continues the previous conversation topic or introduces a new topic. |
| |
| Session Summary: {session_summary[:300] if session_summary else 'No session summary available'} |
| |
| Recent Interactions: |
| {recent_interactions_summary if recent_interactions_summary else 'No recent interactions'} |
| |
| Current Query: "{user_input}" |
| |
| Analyze whether the current query: |
| 1. Continues the same topic from previous interactions |
| 2. Introduces a new topic |
| |
| Respond with EXACTLY one of these formats: |
| - "Continuing [topic name] discussion" if same topic |
| - "New topic: [topic name]" if different topic |
| |
| Keep topic name to 2-5 words. Example responses: |
| - "Continuing machine learning discussion" |
| - "New topic: financial analysis" |
| - "Continuing software development discussion" |
| """ |
| |
| continuity_result = await self.llm_router.route_inference( |
| task_type="general_reasoning", |
| prompt=prompt, |
| max_tokens=50, |
| temperature=0.3 |
| ) |
| |
| if continuity_result and isinstance(continuity_result, str) and continuity_result.strip(): |
| result = continuity_result.strip() |
| |
| if "Continuing" in result or "New topic:" in result: |
| logger.debug(f"Topic continuity analysis: {result}") |
| return result |
| |
| |
| if not session_summary and not recent_interactions_summary: |
| return "No previous context" |
| return "Topic continuity analysis unavailable" |
| |
| except Exception as e: |
| logger.error(f"Error in LLM-based topic continuity analysis: {e}", exc_info=True) |
| |
| return "Topic continuity analysis failed" |
| |
| def _extract_pattern_evidence(self, user_input: str) -> str: |
| """Extract pattern evidence for intent reasoning""" |
| input_lower = user_input.lower() |
| |
| |
| if any(word in input_lower for word in ['what', 'how', 'why', 'when', 'where', 'which']): |
| return "Question pattern detected" |
| |
| |
| if any(word in input_lower for word in ['please', 'can you', 'could you', 'help me']): |
| return "Request pattern detected" |
| |
| |
| if any(word in input_lower for word in ['explain', 'describe', 'tell me about']): |
| return "Explanation pattern detected" |
| |
| |
| if any(word in input_lower for word in ['analyze', 'compare', 'evaluate', 'assess']): |
| return "Analysis pattern detected" |
| |
| return "General conversational pattern" |
| |
| def _assess_intent_complexity(self, intent_result: dict) -> str: |
| """Assess intent complexity for reasoning""" |
| primary_intent = intent_result.get('primary_intent', 'unknown') |
| confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) |
| secondary_intents = intent_result.get('secondary_intents', []) |
| |
| if confidence > 0.8 and len(secondary_intents) == 0: |
| return "Simple, clear intent" |
| elif confidence > 0.7 and len(secondary_intents) <= 1: |
| return "Moderate complexity" |
| else: |
| return "Complex, multi-faceted intent" |
| |
| def _generate_alternative_paths(self, intent_result: dict, user_input: str, main_topic: str) -> list: |
| """Generate alternative reasoning paths based on actual content""" |
| primary_intent = intent_result.get('primary_intent', 'unknown') |
| secondary_intents = intent_result.get('secondary_intents', []) |
| |
| alternative_paths = [] |
| |
| |
| for secondary_intent in secondary_intents: |
| alternative_paths.append({ |
| "path": f"Alternative intent: {secondary_intent} for {main_topic}", |
| "reasoning": f"Could interpret as {secondary_intent} based on linguistic patterns in the query about {main_topic}", |
| "confidence": intent_result.get('confidence_scores', {}).get(secondary_intent, 0.3), |
| "rejected_reason": f"Primary intent '{primary_intent}' has higher confidence for {main_topic} topic" |
| }) |
| |
| |
| if 'curriculum' in user_input.lower() or 'course' in user_input.lower(): |
| alternative_paths.append({ |
| "path": "Structured educational framework approach", |
| "reasoning": f"Could provide a more structured educational framework for {main_topic}", |
| "confidence": 0.6, |
| "rejected_reason": f"Current approach better matches user's specific request for {main_topic}" |
| }) |
| |
| if 'detailed' in user_input.lower() or 'comprehensive' in user_input.lower(): |
| alternative_paths.append({ |
| "path": "High-level overview approach", |
| "reasoning": f"Could provide a high-level overview instead of detailed content for {main_topic}", |
| "confidence": 0.4, |
| "rejected_reason": f"User specifically requested detailed information about {main_topic}" |
| }) |
| |
| return alternative_paths |
| |
| def _identify_uncertainty_areas(self, intent_result: dict, final_response: dict, safety_checked: dict) -> list: |
| """Identify areas of uncertainty in the reasoning based on actual content""" |
| uncertainty_areas = [] |
| |
| |
| primary_intent = intent_result.get('primary_intent', 'unknown') |
| confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) |
| if confidence < 0.8: |
| uncertainty_areas.append({ |
| "aspect": f"Intent classification ({primary_intent}) for user's specific request", |
| "confidence": confidence, |
| "mitigation": "Provided multiple interpretation options and context-aware analysis" |
| }) |
| |
| |
| coherence_score = final_response.get('coherence_score', 0.7) |
| if coherence_score < 0.8: |
| uncertainty_areas.append({ |
| "aspect": "Response coherence and structure for the specific topic", |
| "confidence": coherence_score, |
| "mitigation": "Applied quality enhancement techniques and content relevance checks" |
| }) |
| |
| |
| safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) |
| if safety_score < 0.9: |
| uncertainty_areas.append({ |
| "aspect": "Content safety and bias assessment for educational content", |
| "confidence": safety_score, |
| "mitigation": "Generated advisory warnings for user awareness and content appropriateness" |
| }) |
| |
| |
| response_text = str(final_response.get('final_response', '')) |
| if len(response_text) < 100: |
| uncertainty_areas.append({ |
| "aspect": "Response completeness for user's detailed request", |
| "confidence": 0.6, |
| "mitigation": "Enhanced response generation with topic-specific content" |
| }) |
| |
| return uncertainty_areas |
| |
| def _extract_evidence_sources(self, intent_result: dict, final_response: dict, context: dict) -> list: |
| """Extract evidence sources for reasoning based on actual content""" |
| evidence_sources = [] |
| |
| |
| evidence_sources.append({ |
| "type": "linguistic_analysis", |
| "source": "Pattern matching and NLP analysis", |
| "relevance": 0.9, |
| "description": f"Intent classification based on linguistic patterns for '{intent_result.get('primary_intent', 'unknown')}' intent" |
| }) |
| |
| |
| interactions = context.get('interactions', []) |
| if interactions: |
| evidence_sources.append({ |
| "type": "conversation_history", |
| "source": f"Previous {len(interactions)} interactions", |
| "relevance": 0.7, |
| "description": f"Conversation context and topic continuity analysis" |
| }) |
| |
| |
| synthesis_method = final_response.get('synthesis_method', 'unknown') |
| evidence_sources.append({ |
| "type": "synthesis_method", |
| "source": f"{synthesis_method} approach", |
| "relevance": 0.8, |
| "description": f"Response generated using {synthesis_method} methodology with quality optimization" |
| }) |
| |
| |
| response_text = str(final_response.get('final_response', '')) |
| if len(response_text) > 1000: |
| evidence_sources.append({ |
| "type": "content_analysis", |
| "source": "Comprehensive content generation", |
| "relevance": 0.85, |
| "description": "Detailed response generation based on user's specific requirements" |
| }) |
| |
| return evidence_sources |
| |
| def _calibrate_confidence_scores(self, reasoning_chain: dict) -> dict: |
| """Calibrate confidence scores across the reasoning chain""" |
| chain_of_thought = reasoning_chain.get('chain_of_thought', {}) |
| |
| |
| step_confidences = [] |
| for step_data in chain_of_thought.values(): |
| if isinstance(step_data, dict) and 'confidence' in step_data: |
| step_confidences.append(step_data['confidence']) |
| |
| overall_confidence = sum(step_confidences) / len(step_confidences) if step_confidences else 0.7 |
| |
| return { |
| "overall_confidence": overall_confidence, |
| "step_count": len(chain_of_thought), |
| "confidence_distribution": { |
| "high_confidence": len([c for c in step_confidences if c > 0.8]), |
| "medium_confidence": len([c for c in step_confidences if 0.6 <= c <= 0.8]), |
| "low_confidence": len([c for c in step_confidences if c < 0.6]) |
| }, |
| "calibration_method": "Weighted average of step confidences" |
| } |
| |
| async def _extract_main_topic(self, user_input: str, context: dict = None) -> str: |
| """Extract the main topic using LLM zero-shot classification with caching""" |
| try: |
| |
| import hashlib |
| cache_key = hashlib.md5(user_input.encode()).hexdigest() |
| if cache_key in self._topic_cache: |
| logger.debug(f"Topic cache hit for: {user_input[:50]}...") |
| return self._topic_cache[cache_key] |
| |
| |
| if self.llm_router: |
| |
| context_info = "" |
| if context: |
| session_context = context.get('session_context', {}) |
| session_summary = session_context.get('summary', '') if isinstance(session_context, dict) else "" |
| interaction_count = len(context.get('interaction_contexts', [])) |
| |
| if session_summary: |
| context_info = f"\n\nSession context: {session_summary[:200]}" |
| if interaction_count > 0: |
| context_info += f"\nPrevious interactions in session: {interaction_count}" |
| |
| prompt = f"""Classify the main topic of this query in 2-5 words. Be specific and concise. |
| |
| Query: "{user_input}"{context_info} |
| |
| Respond with ONLY the topic name (e.g., "Machine Learning", "Healthcare Analytics", "Financial Modeling", "Software Development", "Educational Curriculum"). |
| |
| Do not include explanations, just the topic name. Maximum 5 words.""" |
| |
| topic_result = await self.llm_router.route_inference( |
| task_type="classification", |
| prompt=prompt, |
| max_tokens=20, |
| temperature=0.3 |
| ) |
| |
| if topic_result and isinstance(topic_result, str) and topic_result.strip(): |
| topic = topic_result.strip() |
| |
| |
| topic = topic.split('\n')[0].strip() |
| words = topic.split()[:5] |
| topic = " ".join(words) |
| |
| |
| if len(self._topic_cache) >= self._topic_cache_max_size: |
| |
| oldest_key = next(iter(self._topic_cache)) |
| del self._topic_cache[oldest_key] |
| |
| self._topic_cache[cache_key] = topic |
| logger.debug(f"Topic extracted: {topic}") |
| return topic |
| |
| |
| words = user_input.split()[:4] |
| fallback_topic = " ".join(words) if words else "General inquiry" |
| logger.warning(f"Using fallback topic extraction: {fallback_topic}") |
| return fallback_topic |
| |
| except Exception as e: |
| logger.error(f"Error in LLM-based topic extraction: {e}", exc_info=True) |
| |
| words = user_input.split()[:4] |
| return " ".join(words) if words else "General inquiry" |
| |
| async def _extract_keywords(self, user_input: str) -> str: |
| """Extract key terms using LLM or simple extraction""" |
| try: |
| |
| |
| import re |
| |
| stop_words = {'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him', 'his', 'how', 'its', 'may', 'new', 'now', 'old', 'see', 'two', 'way', 'who', 'boy', 'did', 'she', 'use', 'her', 'many', 'some', 'time', 'very', 'when', 'come', 'here', 'just', 'like', 'long', 'make', 'over', 'such', 'take', 'than', 'them', 'well', 'were'} |
| |
| words = re.findall(r'\b[a-zA-Z]{3,}\b', user_input.lower()) |
| keywords = [w for w in words if w not in stop_words][:5] |
| |
| return ", ".join(keywords) if keywords else "General terms" |
| |
| except Exception as e: |
| logger.error(f"Error in keyword extraction: {e}", exc_info=True) |
| return "General terms" |
| |
| def _assess_query_complexity(self, user_input: str) -> str: |
| """Assess the complexity of the user query""" |
| word_count = len(user_input.split()) |
| question_count = user_input.count('?') |
| |
| if word_count > 50 and question_count > 2: |
| return "Highly complex multi-part query" |
| elif word_count > 30 and question_count > 1: |
| return "Moderately complex query" |
| elif word_count > 15: |
| return "Standard complexity query" |
| else: |
| return "Simple query" |
| |
| def _determine_response_scope(self, user_input: str) -> str: |
| """Determine the scope of response needed""" |
| input_lower = user_input.lower() |
| |
| if any(word in input_lower for word in ['detailed', 'comprehensive', 'complete', 'full']): |
| return "Comprehensive detailed response" |
| elif any(word in input_lower for word in ['brief', 'short', 'summary', 'overview']): |
| return "Brief summary response" |
| elif any(word in input_lower for word in ['step by step', 'tutorial', 'guide', 'how to']): |
| return "Step-by-step instructional response" |
| else: |
| return "Standard informative response" |
| |
| def _assess_content_relevance(self, user_input: str, final_response: dict) -> str: |
| """Assess how relevant the response content is to the user input""" |
| response_text = str(final_response.get('final_response', '')) |
| |
| |
| input_words = set(user_input.lower().split()) |
| response_words = set(response_text.lower().split()) |
| |
| overlap = len(input_words.intersection(response_words)) |
| total_input_words = len(input_words) |
| |
| if overlap / total_input_words > 0.3: |
| return "High relevance to user query" |
| elif overlap / total_input_words > 0.15: |
| return "Moderate relevance to user query" |
| else: |
| return "Low relevance to user query" |
| |
| def _assess_content_appropriateness(self, user_input: str, safety_checked: dict) -> str: |
| """Assess content appropriateness for the topic""" |
| warnings = safety_checked.get('warnings', []) |
| safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) |
| |
| if safety_score > 0.9 and len(warnings) == 0: |
| return "Highly appropriate content" |
| elif safety_score > 0.8 and len(warnings) <= 1: |
| return "Appropriate content with minor notes" |
| else: |
| return "Content requires review" |
| |
| def check_safety_thresholds(self, safety_result: dict) -> tuple: |
| """Check if safety scores exceed thresholds""" |
| safety_analysis = safety_result.get('safety_analysis', {}) |
| |
| |
| safety_scores = safety_analysis.get('safety_scores', {}) |
| if not safety_scores: |
| |
| confidence_scores = safety_analysis.get('confidence_scores', {}) |
| safety_scores = { |
| "toxicity_or_harmful_language": confidence_scores.get('toxicity', 0.0), |
| "potential_biases_or_stereotypes": confidence_scores.get('bias', 0.0), |
| "privacy_or_security_concerns": confidence_scores.get('privacy', 0.0), |
| "controversial_or_sensitive_topics": 1.0 - confidence_scores.get('safety', 1.0) |
| } |
| |
| exceeded_categories = [] |
| |
| for category, score in safety_scores.items(): |
| threshold = self.safety_thresholds.get(category, 1.0) |
| if isinstance(score, (int, float)) and score > threshold: |
| exceeded_categories.append(category) |
| logger.info(f"Safety threshold exceeded: {category} = {score} > {threshold}") |
| |
| return len(exceeded_categories) > 0, exceeded_categories |
| |
| def generate_revision_prompt(self, original_response: str, exceeded_categories: list, |
| safety_warnings: list) -> str: |
| """Generate revision prompt for synthesis agent""" |
| |
| category_instructions = { |
| "toxicity_or_harmful_language": "Remove any language that could be harmful or offensive", |
| "potential_biases_or_stereotypes": "Ensure response provides balanced alternatives and avoids stereotypes. Include diverse perspectives and specific qualifications needed for each career path", |
| "privacy_or_security_concerns": "Remove any content that could compromise privacy or security", |
| "controversial_or_sensitive_topics": "Present balanced viewpoints and acknowledge different perspectives" |
| } |
| |
| revision_instructions = [] |
| for category in exceeded_categories: |
| if category in category_instructions: |
| revision_instructions.append(category_instructions[category]) |
| |
| prompt = f"""REVISION REQUIRED: The following response has safety concerns that need addressing. |
| |
| Original Response: |
| {original_response} |
| |
| Safety Issues Detected: |
| {', '.join(exceeded_categories)} |
| |
| Specific Warnings: |
| {'; '.join(safety_warnings) if safety_warnings else 'General safety concerns detected'} |
| |
| REVISION INSTRUCTIONS: |
| {' | '.join(revision_instructions)} |
| |
| Please revise the response to address these concerns while maintaining helpfulness and accuracy. Ensure the response: |
| 1. Addresses the user's original question completely |
| 2. Provides specific, actionable alternatives with clear qualifications needed |
| 3. Avoids generalizations and stereotypes about career transitions |
| 4. Includes necessary skills, education, and experience requirements |
| 5. Maintains a balanced, inclusive perspective that acknowledges different paths |
| |
| Revised Response:""" |
| |
| return prompt |
| |
| async def process_request_with_revision(self, session_id: str, user_input: str) -> dict: |
| """Enhanced process_request with safety revision loop and timeout protection""" |
| try: |
| return await asyncio.wait_for( |
| self._process_request_with_revision_internal(session_id, user_input), |
| timeout=self.revision_timeout |
| ) |
| except asyncio.TimeoutError: |
| logger.error(f"Safety revision timed out after {self.revision_timeout}s") |
| |
| return { |
| 'final_response': 'Request processing took longer than expected. Please try again.', |
| 'response': 'Request processing took longer than expected. Please try again.', |
| 'revision_attempts': 0, |
| 'timeout_error': True, |
| 'safety_revision_applied': False |
| } |
| |
| async def _process_request_with_revision_internal(self, session_id: str, user_input: str) -> dict: |
| """Internal revision loop with comprehensive error handling""" |
| |
| revision_attempt = 0 |
| current_response = None |
| final_result = None |
| exceeded_categories = [] |
| safety_warnings = [] |
| |
| while revision_attempt <= self.max_revision_attempts: |
| try: |
| |
| processing_input = user_input |
| if revision_attempt > 0: |
| processing_input = self.generate_revision_prompt( |
| current_response, |
| exceeded_categories, |
| safety_warnings |
| ) |
| logger.info(f"Revision attempt {revision_attempt}: regenerating response with safety improvements") |
| |
| |
| result = await self.process_request(session_id, processing_input) |
| |
| |
| current_response = result.get('final_response') or result.get('response', '') |
| |
| if not current_response: |
| |
| metadata = result.get('metadata', {}) |
| current_response = metadata.get('synthesis_result', {}).get('final_response', '') |
| |
| if not current_response: |
| logger.warning("Could not extract response text for safety check") |
| return result |
| |
| |
| safety_checked = await self.agents['safety_check'].execute( |
| response=current_response, |
| context=result.get('context', {}) |
| ) |
| |
| |
| needs_revision, exceeded_categories = self.check_safety_thresholds(safety_checked) |
| safety_warnings = safety_checked.get('warnings', []) |
| |
| if not needs_revision: |
| |
| logger.info(f"Safety check passed on attempt {revision_attempt + 1}") |
| result['safety_result'] = safety_checked |
| result['revision_attempts'] = revision_attempt |
| result['safety_revision_applied'] = revision_attempt > 0 |
| |
| |
| if 'metadata' not in result: |
| result['metadata'] = {} |
| result['metadata']['safety_result'] = safety_checked |
| result['metadata']['revision_attempts'] = revision_attempt |
| |
| return result |
| |
| if revision_attempt >= self.max_revision_attempts: |
| |
| logger.warning(f"Max revision attempts reached. Categories still exceeded: {exceeded_categories}") |
| |
| input_complexity = self._assess_input_complexity(user_input) |
| |
| |
| if input_complexity["is_complex"] and input_complexity["complexity_score"] > 25: |
| logger.info("Complex input detected - attempting intelligent re-prompt") |
| try: |
| |
| improved_prompt = self._generate_improved_prompt(user_input, exceeded_categories) |
| |
| |
| improved_result = await self.process_request(session_id, improved_prompt) |
| improved_response = improved_result.get('final_response', '') |
| |
| |
| final_safety_check = await self.agents['safety_check'].execute( |
| response=improved_response, |
| context=improved_result.get('context', {}) |
| ) |
| |
| improved_needs_revision, improved_exceeded = self.check_safety_thresholds(final_safety_check) |
| |
| if not improved_needs_revision: |
| |
| logger.info("Intelligent re-prompt resolved safety concerns") |
| improved_result['safety_result'] = final_safety_check |
| improved_result['revision_attempts'] = revision_attempt + 1 |
| improved_result['intelligent_reprompt_success'] = True |
| if 'metadata' not in improved_result: |
| improved_result['metadata'] = {} |
| improved_result['metadata']['safety_result'] = final_safety_check |
| improved_result['metadata']['revision_attempts'] = revision_attempt + 1 |
| improved_result['metadata']['intelligent_reprompt_success'] = True |
| return improved_result |
| else: |
| |
| logger.info("Intelligent re-prompt did not fully resolve concerns") |
| current_response = improved_response |
| safety_checked = final_safety_check |
| exceeded_categories = improved_exceeded |
| |
| except Exception as e: |
| logger.warning(f"Intelligent re-prompt failed: {e}", exc_info=True) |
| |
| |
| |
| warning_summary = self._generate_warning_summary(exceeded_categories, safety_checked.get('warnings', [])) |
| user_guidance = self._generate_user_guidance(exceeded_categories, user_input) |
| |
| |
| original_response = result.get('final_response', '') |
| enhanced_response = f"{original_response}\n\n{warning_summary}\n\n{user_guidance}" |
| |
| result['final_response'] = enhanced_response |
| result['response'] = enhanced_response |
| result['safety_result'] = safety_checked |
| result['revision_attempts'] = revision_attempt |
| result['safety_exceeded'] = exceeded_categories |
| result['safety_revision_applied'] = revision_attempt > 0 |
| result['warning_summary_added'] = True |
| result['input_complexity'] = input_complexity |
| |
| |
| if 'metadata' not in result: |
| result['metadata'] = {} |
| result['metadata']['safety_result'] = safety_checked |
| result['metadata']['revision_attempts'] = revision_attempt |
| result['metadata']['safety_exceeded'] = exceeded_categories |
| result['metadata']['input_complexity'] = input_complexity |
| |
| return result |
| |
| |
| final_result = result |
| revision_attempt += 1 |
| logger.info(f"Generating revision attempt {revision_attempt} for: {exceeded_categories}") |
| |
| except Exception as e: |
| logger.error(f"Error in safety revision attempt {revision_attempt}: {e}", exc_info=True) |
| if final_result: |
| final_result['revision_error'] = str(e) |
| if 'metadata' not in final_result: |
| final_result['metadata'] = {} |
| final_result['metadata']['revision_error'] = str(e) |
| return final_result |
| |
| return { |
| 'response': 'Error in processing with safety revision', |
| 'final_response': 'Error in processing with safety revision', |
| 'revision_attempts': revision_attempt, |
| 'revision_error': str(e), |
| 'error': str(e) |
| } |
| |
| |
| return final_result or { |
| 'response': 'Error in safety revision processing', |
| 'final_response': 'Error in safety revision processing', |
| 'revision_attempts': revision_attempt, |
| 'safety_revision_applied': False |
| } |
| |
| def _generate_warning_summary(self, exceeded_categories: list, safety_warnings: list) -> str: |
| """Generate user-friendly warning summary""" |
| category_explanations = { |
| "potential_biases_or_stereotypes": "may contain assumptions about career transitions that don't account for individual circumstances", |
| "toxicity_or_harmful_language": "contains language that could be harmful or inappropriate", |
| "privacy_or_security_concerns": "includes content that could raise privacy or security considerations", |
| "controversial_or_sensitive_topics": "touches on topics that may benefit from additional perspective" |
| } |
| |
| if not exceeded_categories: |
| return "" |
| |
| warning_text = "**Note**: This response " + ", ".join([ |
| category_explanations.get(cat, f"has concerns related to {cat}") |
| for cat in exceeded_categories |
| ]) + "." |
| |
| return warning_text |
| |
| def _generate_user_guidance(self, exceeded_categories: list, original_user_input: str) -> str: |
| """Generate proactive user guidance with UX-friendly options for complex prompts""" |
| if not exceeded_categories: |
| return "" |
| |
| input_complexity = self._assess_input_complexity(original_user_input) |
| |
| guidance_templates = { |
| "potential_biases_or_stereotypes": { |
| "issue": "avoid assumptions about career paths", |
| "simple_suggestion": "ask for advice tailored to specific qualifications or industry interests", |
| "complex_refinement": "add details like your specific skills, target industry, or education level" |
| }, |
| "toxicity_or_harmful_language": { |
| "issue": "ensure respectful communication", |
| "simple_suggestion": "rephrase using more neutral language", |
| "complex_refinement": "adjust the tone while keeping your detailed context" |
| }, |
| "privacy_or_security_concerns": { |
| "issue": "protect sensitive information", |
| "simple_suggestion": "ask for general guidance instead", |
| "complex_refinement": "remove specific personal details while keeping the scenario structure" |
| }, |
| "controversial_or_sensitive_topics": { |
| "issue": "get balanced perspectives", |
| "simple_suggestion": "ask for multiple viewpoints or balanced analysis", |
| "complex_refinement": "specify you'd like pros/cons or different perspectives included" |
| } |
| } |
| |
| primary_category = exceeded_categories[0] |
| guidance = guidance_templates.get(primary_category, { |
| "issue": "improve response quality", |
| "simple_suggestion": "try rephrasing with more specific details", |
| "complex_refinement": "add clarifying details to your existing question" |
| }) |
| |
| |
| |
| topic = "Error recovery context" |
| |
| |
| if input_complexity["is_complex"]: |
| return f"""**Want a better response?** To {guidance['issue']} in responses about {topic}, you could {guidance['complex_refinement']} rather than rewriting your detailed question. Or simply ask again as-is and I'll focus on providing more balanced information.""" |
| else: |
| return f"""**Want a better response?** To {guidance['issue']} in future responses about {topic}, you could {guidance['simple_suggestion']}. Feel free to ask again with any adjustments!""" |
| |
| def _assess_input_complexity(self, user_input: str) -> dict: |
| """Assess input complexity to determine appropriate UX guidance""" |
| word_count = len(user_input.split()) |
| sentence_count = user_input.count('.') + user_input.count('!') + user_input.count('?') |
| has_context = any(phrase in user_input.lower() for phrase in [ |
| 'i am currently', 'my situation', 'my background', 'i have been', |
| 'my experience', 'i work', 'my company', 'specific to my' |
| ]) |
| has_constraints = any(phrase in user_input.lower() for phrase in [ |
| 'must', 'need to', 'required', 'limited by', 'constraint', 'budget', |
| 'timeline', 'deadline', 'specific requirements' |
| ]) |
| |
| is_complex = ( |
| word_count > 30 or |
| sentence_count > 2 or |
| has_context or |
| has_constraints |
| ) |
| |
| return { |
| "is_complex": is_complex, |
| "word_count": word_count, |
| "has_personal_context": has_context, |
| "has_constraints": has_constraints, |
| "complexity_score": word_count * 0.1 + sentence_count * 5 + (has_context * 10) + (has_constraints * 10) |
| } |
| |
| def _generate_improved_prompt(self, original_input: str, exceeded_categories: list) -> str: |
| """Generate improved prompt for complex inputs to resolve safety concerns automatically""" |
| |
| improvements = [] |
| |
| if "potential_biases_or_stereotypes" in exceeded_categories: |
| improvements.append("Please provide specific qualifications, skills, and requirements for each option") |
| improvements.append("Include diverse pathways and acknowledge individual circumstances vary") |
| |
| if "toxicity_or_harmful_language" in exceeded_categories: |
| improvements.append("Use respectful, professional language throughout") |
| |
| if "privacy_or_security_concerns" in exceeded_categories: |
| improvements.append("Focus on general guidance without personal specifics") |
| |
| if "controversial_or_sensitive_topics" in exceeded_categories: |
| improvements.append("Present balanced perspectives and multiple viewpoints") |
| |
| improvement_instructions = ". ".join(improvements) |
| |
| improved_prompt = f"""{original_input} |
| |
| Additional guidance for response: {improvement_instructions}. Ensure all advice is specific, actionable, and acknowledges different backgrounds and circumstances.""" |
| |
| return improved_prompt |
| |
| def check_query_similarity(self, new_query: str, threshold: float = 0.85) -> Optional[Dict]: |
| """ |
| Step 3: Add Query Similarity Detection |
| |
| Check if new query is similar to any recent queries above threshold. |
| Uses simple string similarity (can be enhanced with embeddings later). |
| |
| Args: |
| new_query: The new query to check |
| threshold: Similarity threshold (default 0.85) |
| |
| Returns: |
| Cached response dict if similar query found, None otherwise |
| """ |
| if not self.recent_queries: |
| return None |
| |
| new_query_lower = new_query.lower().strip() |
| |
| for cached_query_data in reversed(self.recent_queries): |
| cached_query = cached_query_data.get('query', '') |
| if not cached_query: |
| continue |
| |
| cached_query_lower = cached_query.lower().strip() |
| |
| |
| similarity = self._calculate_similarity(new_query_lower, cached_query_lower) |
| |
| if similarity > threshold: |
| logger.info(f"Similar query detected (similarity: {similarity:.2f}): '{new_query[:50]}...' similar to '{cached_query[:50]}...'") |
| return cached_query_data.get('response') |
| |
| return None |
| |
| def _calculate_similarity(self, query1: str, query2: str) -> float: |
| """ |
| Calculate similarity between two queries using Jaccard similarity on words. |
| Can be enhanced with embeddings for semantic similarity. |
| """ |
| if not query1 or not query2: |
| return 0.0 |
| |
| |
| words1 = set(query1.split()) |
| words2 = set(query2.split()) |
| |
| if not words1 or not words2: |
| return 0.0 |
| |
| |
| intersection = len(words1.intersection(words2)) |
| union = len(words1.union(words2)) |
| |
| if union == 0: |
| return 0.0 |
| |
| jaccard = intersection / union |
| |
| |
| if query1 in query2 or query2 in query1: |
| jaccard = max(jaccard, 0.9) |
| |
| return jaccard |
| |
| def track_response_metrics(self, start_time: float, response: Dict) -> Dict: |
| """ |
| Track performance metrics and add them to response dictionary. |
| |
| ENHANCED: Now adds performance metrics to response for API consumption. |
| |
| Args: |
| start_time: Start time from time.time() |
| response: Response dictionary containing response data |
| |
| Returns: |
| Dict with performance metrics added to response |
| """ |
| try: |
| latency = time.time() - start_time |
| |
| |
| response_text = ( |
| response.get('response') or |
| response.get('final_response') or |
| response.get('synthesized_response') or |
| str(response.get('result', '')) |
| ) |
| |
| |
| def estimate_tokens(text: str) -> int: |
| """Estimate tokens more accurately""" |
| if not text: |
| return 0 |
| |
| |
| words = len(text.split()) |
| chars = len(text) |
| |
| token_estimate = max(words * 1.3, chars / 4) |
| return int(token_estimate) |
| |
| token_count = estimate_tokens(response_text) |
| |
| |
| safety_score = 0.8 |
| confidence_score = 0.8 |
| |
| if 'metadata' in response: |
| synthesis_result = response['metadata'].get('synthesis_result', {}) |
| safety_result = response['metadata'].get('safety_result', {}) |
| intent_result = response.get('intent', {}) or response.get('metadata', {}).get('intent_result', {}) |
| |
| if safety_result: |
| safety_analysis = safety_result.get('safety_analysis', {}) |
| safety_score = safety_analysis.get('overall_safety_score', 0.8) |
| |
| |
| if intent_result and 'confidence_scores' in intent_result: |
| primary_intent = intent_result.get('primary_intent', '') |
| if primary_intent: |
| conf_scores = intent_result['confidence_scores'] |
| confidence_score = conf_scores.get(primary_intent, 0.8) |
| |
| |
| agent_contributions = [] |
| total_agents = 0 |
| |
| |
| agents_used = [] |
| metadata = response.get('metadata', {}) |
| |
| if metadata.get('intent_result') or response.get('intent'): |
| agents_used.append('Intent') |
| if metadata.get('synthesis_result') or response.get('synthesized_response'): |
| agents_used.append('Synthesis') |
| if metadata.get('safety_result') or response.get('safety_precheck'): |
| agents_used.append('Safety') |
| if metadata.get('skills_result') or response.get('skills'): |
| agents_used.append('Skills') |
| |
| |
| if not agents_used and self.agent_call_count > 0: |
| |
| if self.agent_call_count >= 3: |
| agents_used = ['Intent', 'Skills', 'Safety'] |
| elif self.agent_call_count >= 2: |
| agents_used = ['Intent', 'Synthesis'] |
| else: |
| agents_used = ['Synthesis'] |
| |
| total_agents = len(agents_used) if agents_used else self.agent_call_count |
| |
| |
| if total_agents > 0 and agents_used: |
| base_percentage = 100 / total_agents |
| for agent in agents_used: |
| |
| if agent == 'Synthesis': |
| percentage = min(50, base_percentage * 1.5) |
| elif agent == 'Intent': |
| percentage = min(30, base_percentage * 1.2) |
| else: |
| percentage = base_percentage |
| |
| agent_contributions.append({ |
| "agent": agent, |
| "percentage": round(percentage, 1) |
| }) |
| |
| |
| if agent_contributions: |
| total_pct = sum(c['percentage'] for c in agent_contributions) |
| if total_pct > 0 and abs(total_pct - 100) > 0.1: |
| for contrib in agent_contributions: |
| contrib['percentage'] = round(contrib['percentage'] * 100 / total_pct, 1) |
| |
| |
| performance_metrics = { |
| "processing_time": round(latency * 1000, 2), |
| "tokens_used": token_count, |
| "agents_used": total_agents, |
| "confidence_score": round(confidence_score * 100, 1), |
| "agent_contributions": agent_contributions, |
| "safety_score": round(safety_score * 100, 1), |
| "latency_seconds": round(latency, 3), |
| "timestamp": datetime.now().isoformat() |
| } |
| |
| |
| metrics_history = { |
| 'latency': latency, |
| 'token_count': token_count, |
| 'agent_calls': self.agent_call_count, |
| 'safety_score': safety_score, |
| 'timestamp': datetime.now().isoformat() |
| } |
| |
| self.response_metrics_history.append(metrics_history) |
| if len(self.response_metrics_history) > self.metrics_history_max_size: |
| self.response_metrics_history = self.response_metrics_history[-self.metrics_history_max_size:] |
| |
| |
| if 'performance' not in response: |
| response['performance'] = {} |
| |
| response['performance'].update(performance_metrics) |
| |
| |
| if 'metadata' not in response: |
| response['metadata'] = {} |
| |
| response['metadata']['performance_metrics'] = performance_metrics |
| response['metadata']['processing_time'] = latency |
| response['metadata']['token_count'] = token_count |
| response['metadata']['agents_used'] = agents_used |
| |
| |
| logger.info(f"Response Metrics - Latency: {latency:.3f}s, Tokens: {token_count}, " |
| f"Agent Calls: {self.agent_call_count}, Safety Score: {safety_score:.2f}, " |
| f"Agents Used: {total_agents}") |
| logger.debug(f"Performance metrics: {performance_metrics}") |
| |
| |
| self.agent_call_count = 0 |
| |
| return response |
| |
| except Exception as e: |
| logger.error(f"Error tracking response metrics: {e}", exc_info=True) |
| |
| if 'performance' not in response: |
| response['performance'] = { |
| "processing_time": round((time.time() - start_time) * 1000, 2), |
| "tokens_used": 0, |
| "agents_used": 0, |
| "confidence_score": 0, |
| "agent_contributions": [], |
| "safety_score": 80, |
| "error": str(e) |
| } |
| return response |
| |
| def get_performance_summary(self) -> Dict: |
| """ |
| Get summary of recent performance metrics. |
| Useful for monitoring and debugging. |
| |
| Returns: |
| Dict with performance statistics |
| """ |
| if not self.response_metrics_history: |
| return { |
| "total_requests": 0, |
| "average_latency": 0, |
| "average_tokens": 0, |
| "average_agents": 0 |
| } |
| |
| recent = self.response_metrics_history[-20:] |
| |
| return { |
| "total_requests": len(self.response_metrics_history), |
| "recent_requests": len(recent), |
| "average_latency": round(sum(m['latency'] for m in recent) / len(recent), 3) if recent else 0, |
| "average_tokens": round(sum(m['token_count'] for m in recent) / len(recent), 1) if recent else 0, |
| "average_agents": round(sum(m.get('agent_calls', 0) for m in recent) / len(recent), 1) if recent else 0, |
| "last_10_metrics": recent[-10:] if len(recent) > 10 else recent |
| } |
|
|