import gradio as gr from google import genai from google.genai import types from google.genai.types import Tool, GoogleSearch, FunctionDeclaration from PIL import Image import io import traceback import datetime import re import importlib import os import sys from typing import List, Dict, Any, Optional, Tuple from pathlib import Path import json # Add current directory to path for imports sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from base_extension import BaseExtension DESCRIPTION = """ # Wuhp Agents **Powered by Gemini 2.5 Flash + Google Search Grounding + Agent Extensions** """ BASE_SYSTEM_PROMPT = """ You are Wuhp Agent, a versatile AI assistant with extensible capabilities. Your core abilities include conversation, web search, and image understanding. When users enable extensions, you gain additional tools and capabilities. Always use the available tools when they would be helpful to the user. Be proactive about suggesting when an extension might be useful. """ def log(msg: str): now = datetime.datetime.now().strftime("%H:%M:%S") print(f"[{now}] {msg}", flush=True) def get_mime_type(file_path: str) -> str: """Determine MIME type from file extension""" ext = Path(file_path).suffix.lower() mime_types = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp', '.heic': 'image/heic', '.heif': 'image/heif', '.pdf': 'application/pdf', '.txt': 'text/plain', '.html': 'text/html', '.md': 'text/markdown', '.mp4': 'video/mp4', '.mpeg': 'video/mpeg', '.mov': 'video/mov', '.avi': 'video/avi', '.flv': 'video/x-flv', '.mpg': 'video/mpg', '.webm': 'video/webm', '.wmv': 'video/wmv', '.3gpp': 'video/3gpp', } return mime_types.get(ext, 'application/octet-stream') def process_uploaded_file(client: genai.Client, file_path: str) -> types.Part: """Process an uploaded file and return a Part object""" mime_type = get_mime_type(file_path) file_size = Path(file_path).stat().st_size log(f"šŸ“Ž Processing file: {Path(file_path).name} ({mime_type}, {file_size/1024:.1f}KB)") # For files > 20MB or videos, use File API if file_size > 20 * 1024 * 1024 or mime_type.startswith('video/'): log(f"šŸ“¤ Uploading large file via File API...") uploaded_file = client.files.upload(file=file_path) log(f"āœ… File uploaded: {uploaded_file.name}") return uploaded_file else: with open(file_path, 'rb') as f: file_bytes = f.read() log(f"āœ… File loaded inline") return types.Part.from_bytes(data=file_bytes, mime_type=mime_type) class ExtensionManager: """Enhanced extension manager with capability-based discovery""" def __init__(self): self.extensions: Dict[str, BaseExtension] = {} self.load_extensions() def load_extensions(self): """Dynamically load all extensions from extensions/ folder""" extensions_dir = Path("extensions") if not extensions_dir.exists(): log("āš ļø Extensions directory not found, creating it...") extensions_dir.mkdir() return log(f"šŸ” Scanning for extensions in {extensions_dir.absolute()}") for file in extensions_dir.glob("*.py"): if file.name.startswith("_"): continue try: module_name = file.stem spec = importlib.util.spec_from_file_location(module_name, file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) for attr_name in dir(module): attr = getattr(module, attr_name) if (isinstance(attr, type) and issubclass(attr, BaseExtension) and attr != BaseExtension): ext = attr() self.extensions[ext.name] = ext log(f"āœ… Loaded extension: {ext.display_name} ({ext.name})") break except Exception as e: log(f"āŒ Failed to load {file.name}: {e}") traceback.print_exc() log(f"šŸ“Š Total extensions loaded: {len(self.extensions)}") self._analyze_extension_capabilities() def _analyze_extension_capabilities(self): """Analyze and log extension capabilities""" log("šŸ”— Analyzing extension capabilities...") for ext in self.extensions.values(): caps = ext.get_capabilities() log(f" šŸ“¦ {ext.name}:") if caps.get('provides_data'): log(f" → Provides: {', '.join(caps['provides_data'])}") if caps.get('consumes_data'): log(f" → Consumes: {', '.join(caps['consumes_data'])}") if caps.get('creates_output'): log(f" → Creates: {', '.join(caps['creates_output'])}") # Show data flow possibilities chains = self.get_data_flow_chain(list(self.extensions.keys())) if chains: log(" šŸ”„ Data flow chains detected:") for chain in chains: consumers_str = ', '.join(chain['consumers']) log(f" {chain['provider']} → {chain['data_type']} → [{consumers_str}]") def get_extension(self, name: str) -> Optional[BaseExtension]: return self.extensions.get(name) def get_all_extensions(self) -> List[BaseExtension]: return list(self.extensions.values()) def get_enabled_extensions(self, enabled_list: List[str]) -> List[BaseExtension]: """Get list of enabled extension objects""" return [ext for name, ext in self.extensions.items() if name in enabled_list] def detect_capabilities_by_keywords(self, query: str) -> List[str]: """Detect which extensions might be relevant based on query keywords""" query_lower = query.lower() relevant_extensions = [] for ext in self.get_all_extensions(): keywords = ext.get_capabilities().get('keywords', []) if any(keyword in query_lower for keyword in keywords): relevant_extensions.append(ext.name) return relevant_extensions def find_extensions_by_capability(self, capability_type: str, capability_value: str) -> List[BaseExtension]: """Find extensions by capability""" matching = [] for ext in self.get_all_extensions(): caps = ext.get_capabilities().get(capability_type, []) if capability_value in caps: matching.append(ext) return matching def get_data_flow_chain(self, enabled_extensions: List[str]) -> List[Dict]: """Analyze enabled extensions and build data flow chain""" enabled_exts = self.get_enabled_extensions(enabled_extensions) chains = [] for provider in enabled_exts: provided_data = provider.get_capabilities().get('provides_data', []) for data_type in provided_data: consumers = [ ext for ext in enabled_exts if ext.can_consume(data_type) and ext.name != provider.name ] if consumers: chains.append({ 'provider': provider.name, 'data_type': data_type, 'consumers': [c.name for c in consumers] }) return chains def build_system_prompt(self, user_id: str, enabled_list: List[str]) -> str: """Build enhanced system prompt with capability awareness""" prompt = BASE_SYSTEM_PROMPT enabled_exts = self.get_enabled_extensions(enabled_list) if enabled_exts: prompt += "\n\n# ENABLED EXTENSIONS\nYou currently have these extensions enabled:\n\n" for ext in enabled_exts: ext.initialize_state(user_id) prompt += f"## {ext.display_name}\n{ext.get_system_context()}\n\n" state = ext.get_state(user_id) if state and hasattr(ext, 'get_state_summary'): state_summary = ext.get_state_summary(user_id) if state_summary: prompt += f"**Current State:** {state_summary}\n\n" if len(enabled_exts) > 1: prompt += self._build_capability_guidance(enabled_list) return prompt def _build_capability_guidance(self, enabled_list: List[str]) -> str: """Build intelligent guidance based on actual capabilities""" guidance = "\n\n# INTELLIGENT INTEGRATION SYSTEM\n" guidance += "Extensions can automatically work together based on their capabilities:\n\n" # Detect what combinations are possible chains = self.get_data_flow_chain(enabled_list) if chains: guidance += "**Available Data Flows:**\n" for chain in chains: provider_ext = self.get_extension(chain['provider']) consumers_display = [] for consumer_name in chain['consumers']: consumer_ext = self.get_extension(consumer_name) if consumer_ext: consumers_display.append(consumer_ext.display_name) guidance += f"- {provider_ext.display_name} provides `{chain['data_type']}` → " guidance += f"{', '.join(consumers_display)} can use it\n" guidance += "\n**Key Principles:**\n" guidance += "1. **Sequential Operations**: Complete one extension's task, then use results in another\n" guidance += "2. **Data Flow**: Extensions automatically suggest next steps based on their output\n" guidance += "3. **No Hardcoding**: System detects compatible extensions dynamically\n\n" # CRITICAL: Add enforcement rule guidance += "**🚨 CRITICAL EXECUTION RULE:**\n" guidance += "When you see suggestions like 'šŸ’” Extension suggests: reason → Call tool_name':\n" guidance += "- You MUST immediately execute the suggested tool\n" guidance += "- Do NOT ask the user for permission or clarification\n" guidance += "- The data is already formatted and ready to use\n" guidance += "- Just call the suggested tool NOW\n" guidance += "- Failing to execute suggestions means failing the user's request!\n\n" return guidance def get_all_tools(self, enabled_list: List[str]) -> List[types.Tool]: """Get all tools from enabled extensions""" tools = [] for ext_name in enabled_list: ext = self.get_extension(ext_name) if ext: tools.extend(ext.get_tools()) return tools def get_search_tool(self) -> types.Tool: """Get Google Search tool separately""" return types.Tool(google_search=types.GoogleSearch()) def handle_function_calls(self, user_id: str, enabled_list: List[str], function_calls: List) -> List[Tuple[str, Any]]: """Process function calls and return (function_name, result) tuples""" results = [] for fc in function_calls: function_name = fc.name args = fc.args if hasattr(fc, 'args') else {} log(f"šŸ”§ Executing: {function_name}({json.dumps(args, indent=2)[:100]}...)") handled = False for ext_name in enabled_list: ext = self.get_extension(ext_name) if ext: for tool in ext.get_tools(): if hasattr(tool, 'function_declarations'): for func_decl in tool.function_declarations: if func_decl.name == function_name: try: result = ext.handle_tool_call(user_id, function_name, args) results.append((function_name, result)) log(f"āœ… {function_name} completed: {str(result)[:100]}...") except Exception as e: log(f"āŒ Error in {function_name}: {e}") results.append((function_name, {"error": str(e)})) handled = True break if handled: break if handled: break if not handled: log(f"āš ļø Unknown function: {function_name}") results.append((function_name, {"error": f"Unknown function {function_name}"})) return results def check_proactive_messages(self, user_id: str, enabled_list: List[str]) -> List[str]: """Check all extensions for proactive messages""" messages = [] for ext_name in enabled_list: ext = self.get_extension(ext_name) if ext: msg = ext.get_proactive_message(user_id) if msg: messages.append(msg) return messages class ImprovedOrchestrator: """Enhanced orchestrator with capability-based reasoning""" def __init__(self, client, chat, extension_manager, user_id, enabled_extensions): self.client = client self.chat = chat self.extension_manager = extension_manager self.user_id = user_id self.enabled_extensions = enabled_extensions self.search_chat = client.chats.create(model="gemini-2.5-flash") self.execution_history = [] def analyze_query(self, query: str, file_parts: List = None) -> Dict[str, Any]: """Analyze query using capability detection""" log("🧠 Analyzing query with capability detection...") # Detect relevant extensions by keywords relevant_exts = self.extension_manager.detect_capabilities_by_keywords(query) log(f" šŸ“‹ Relevant extensions by keywords: {', '.join(relevant_exts)}") query_lower = query.lower() # Detect explicit requests explicit_extensions = [] if any(word in query_lower for word in ['deep research', 'conduct research', 'research report']): explicit_extensions.append('deep_research') if any(word in query_lower for word in ['chart', 'graph', 'visualize', 'plot']): explicit_extensions.append('visualization') # Combine keyword detection with explicit requests all_relevant = list(set(relevant_exts + explicit_extensions)) return { "needs_search": "maybe", "relevant_extensions": all_relevant, "explicit_extensions": explicit_extensions, "multi_step": len(all_relevant) > 1, "uses_context": any(word in query_lower for word in ['the', 'my', 'that', 'previous']), "complexity": "complex" if len(all_relevant) > 2 else "moderate", "reasoning": f"Detected {len(all_relevant)} relevant extensions" } def execute_with_planning(self, query: str, file_parts: List = None, reasoning_budget: int = -1) -> Tuple[str, List, List, str, Optional[str]]: """Execute query with capability-based planning""" analysis = self.analyze_query(query, file_parts) # Search phase search_results = "" search_citations = None if analysis.get('needs_search') in ['yes', 'maybe']: log("šŸ” Executing search phase...") search_results, search_citations = self.call_search_agent(query, file_parts) # Tool execution phase log("šŸ› ļø Executing tool phase with capability awareness...") tool_results = [] generated_images = [] thoughts = "" system_prompt = self.extension_manager.build_system_prompt( self.user_id, self.enabled_extensions ) # Add capability-aware guidance if analysis.get('explicit_extensions'): system_prompt += self._build_explicit_extension_prompt(analysis['explicit_extensions']) max_rounds = 5 if analysis['complexity'] == 'complex' else 3 current_round = 0 prompt = query if search_results: prompt = f"[Web Search Results]\n{search_results}\n\n[User Query]\n{query}" # ========== UPDATED WHILE LOOP WITH FIXED AUTO-EXECUTION ========== while current_round < max_rounds: current_round += 1 log(f"šŸ”„ Tool execution round {current_round}/{max_rounds}") function_calls, text_response, round_thoughts = self.call_tool_agent( prompt, reasoning_budget, file_parts, system_prompt ) thoughts += round_thoughts if not function_calls: log(f"āœ… No more tools needed after round {current_round}") if text_response: return text_response, tool_results, generated_images, thoughts, search_citations break # Execute the function calls results = self.extension_manager.handle_function_calls( self.user_id, self.enabled_extensions, function_calls ) for (tool_name, result) in results: tool_results.append((tool_name, result)) # Extract images if isinstance(result, dict) and 'image_base64' in result: img_data = { 'base64': result['image_base64'], 'title': result.get('message', 'Generated visualization'), 'filepath': result.get('filepath', '') } generated_images.append(img_data) log(f"šŸ“Š Captured visualization: {result.get('message', 'Chart')}") # ========== CRITICAL FIX: AUTO-EXECUTION WITH ACTUAL DATA ========== # Check for auto-executable suggestions from completed tools if current_round < max_rounds: log(f"šŸ” Checking for auto-executable suggestions from completed tools...") available_exts = self.extension_manager.get_enabled_extensions(self.enabled_extensions) executable_suggestions = [] for tool_name, result in results: if not isinstance(result, dict): continue # Find extension that executed this tool source_ext = next((ext for ext in available_exts if ext.get_tool_by_name(tool_name)), None) if source_ext: # Ask extension for suggestions suggestion = source_ext.get_suggested_next_action(result, available_exts) if suggestion and suggestion.get('data_ready'): log(f"āœ… Valid suggestion: {suggestion['tool']} from {source_ext.name}") # CRITICAL: Log actual data being passed suggested_data = suggestion.get('data', {}) for key, value in suggested_data.items(): if isinstance(value, list): log(f" → {key}: list with {len(value)} items") elif isinstance(value, dict): log(f" → {key}: dict with {len(value)} keys") else: log(f" → {key}: {type(value).__name__}") executable_suggestions.append(suggestion) # If we have executable suggestions with ready data, execute them NOW if executable_suggestions: log(f"⚔ Auto-executing {len(executable_suggestions)} suggested tool(s)") # Build a prompt that FORCES execution with COMPLETE DATA auto_exec_prompt = "🚨 MANDATORY TOOL EXECUTION - NO TEXT RESPONSE ALLOWED 🚨\n\n" auto_exec_prompt += "SYSTEM DIRECTIVE: You must call the specified tool(s) immediately.\n" auto_exec_prompt += "All required data has been prepared and formatted for you.\n\n" # Add ALL relevant context from completed tools INCLUDING ACTUAL DATA auto_exec_prompt += "=== AVAILABLE DATA ===\n" for tool_name, result in results: if isinstance(result, dict) and result.get('success'): auto_exec_prompt += f"\n{tool_name} completed successfully:\n" # CRITICAL: Include ACTUAL DATA VALUES if 'ticker' in result: auto_exec_prompt += f" - Ticker: {result['ticker']}\n" if 'dates' in result and result['dates']: dates = result['dates'] auto_exec_prompt += f" - Date range: {dates[0]} to {dates[-1]} ({len(dates)} data points)\n" auto_exec_prompt += f" - Sample dates: {dates[:3]}\n" if 'close_prices' in result: prices = result['close_prices'] auto_exec_prompt += f" - Price range: ${min(prices):.2f} to ${max(prices):.2f}\n" auto_exec_prompt += f" - Sample prices: {prices[:3]}\n" if 'data_points' in result: auto_exec_prompt += f" - Total data points: {result['data_points']}\n" # For charts, show what data is available if 'dates' in result and 'close_prices' in result: auto_exec_prompt += f" - READY FOR CHART: {len(result['dates'])} date/price pairs\n" auto_exec_prompt += "\n=== REQUIRED ACTIONS ===\n" for idx, suggestion in enumerate(executable_suggestions, 1): auto_exec_prompt += f"\n{idx}. CALL: {suggestion['tool']}\n" auto_exec_prompt += f" REASON: {suggestion['reason']}\n" auto_exec_prompt += f" PARAMETERS (use these EXACT values):\n" # ========== UPDATED DATA FORMATTING - HANDLES DICT-OF-DICTS ========== suggested_data = suggestion.get('data', {}) for key, value in suggested_data.items(): if isinstance(value, list): # Check if it's a list of dicts if len(value) > 0 and isinstance(value[0], dict): auto_exec_prompt += f" - {key}: [ # List of {len(value)} dict(s)\n" for item_idx, item in enumerate(value): auto_exec_prompt += f" {{\n" for sub_key, sub_value in item.items(): if isinstance(sub_value, list): if len(sub_value) <= 10: auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n" else: auto_exec_prompt += f" '{sub_key}': [ # {len(sub_value)} items total\n" auto_exec_prompt += f" # First 3: {json.dumps(sub_value[:3])}\n" auto_exec_prompt += f" # ... ({len(sub_value) - 6} more items)\n" auto_exec_prompt += f" # Last 3: {json.dumps(sub_value[-3:])}\n" auto_exec_prompt += f" ],\n" else: auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n" auto_exec_prompt += f" }}\n" auto_exec_prompt += f" ]\n" else: # Simple list of primitives if len(value) <= 5: auto_exec_prompt += f" - {key}: {json.dumps(value)}\n" else: auto_exec_prompt += f" - {key}: [ # {len(value)} items\n" auto_exec_prompt += f" First 3: {json.dumps(value[:3])}\n" auto_exec_prompt += f" ... ({len(value) - 6} more)\n" auto_exec_prompt += f" Last 3: {json.dumps(value[-3:])}\n" auto_exec_prompt += f" ]\n" elif isinstance(value, dict): # Check if it's a dict of dicts (like visualization's 'data' parameter) dict_values = list(value.values()) if dict_values and isinstance(dict_values[0], dict): # This is like: {'Series Name': {'x_values': [...], 'y_values': [...]}} auto_exec_prompt += f" - {key}: {{ # Dict with {len(value)} series\n" for series_name, series_data in value.items(): auto_exec_prompt += f" '{series_name}': {{\n" for sub_key, sub_value in series_data.items(): if isinstance(sub_value, list): if len(sub_value) <= 10: auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n" else: auto_exec_prompt += f" '{sub_key}': [ # {len(sub_value)} items\n" auto_exec_prompt += f" # First 3: {json.dumps(sub_value[:3])}\n" auto_exec_prompt += f" # ... ({len(sub_value) - 6} more items)\n" auto_exec_prompt += f" # Last 3: {json.dumps(sub_value[-3:])}\n" auto_exec_prompt += f" ],\n" else: auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n" auto_exec_prompt += f" }},\n" auto_exec_prompt += f" }}\n" else: # Simple dict auto_exec_prompt += f" - {key}: {{\n" for sub_key, sub_value in value.items(): if isinstance(sub_value, list): auto_exec_prompt += f" '{sub_key}': [list with {len(sub_value)} items],\n" else: auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n" auto_exec_prompt += f" }}\n" else: auto_exec_prompt += f" - {key}: {json.dumps(value)}\n" # ========== END UPDATED DATA FORMATTING ========== auto_exec_prompt += "\n=== EXECUTION RULES ===\n" auto_exec_prompt += "1. You MUST call the tool(s) listed above\n" auto_exec_prompt += "2. Use EXACTLY the parameters provided above\n" auto_exec_prompt += "3. The data arrays are READY - construct them from the info above\n" auto_exec_prompt += "4. Do NOT write any text response\n" auto_exec_prompt += "5. Do NOT ask for clarification\n" auto_exec_prompt += "6. Do NOT explain what you're doing\n" auto_exec_prompt += "7. JUST CALL THE TOOL NOW\n\n" auto_exec_prompt += "EXECUTE IMMEDIATELY >>>>\n" log(f" šŸ“‹ Auto-exec prompt length: {len(auto_exec_prompt)} chars") # Execute auto-suggestions with NO reasoning (instant execution) auto_calls, _, _ = self.call_tool_agent( auto_exec_prompt, 0, None, system_prompt ) if auto_calls: log(f" → Executing {len(auto_calls)} auto-suggested call(s)") auto_results = self.extension_manager.handle_function_calls( self.user_id, self.enabled_extensions, auto_calls ) for (auto_tool_name, auto_result) in auto_results: tool_results.append((auto_tool_name, auto_result)) # Extract images from auto-executed tools if isinstance(auto_result, dict) and 'image_base64' in auto_result: img_data = { 'base64': auto_result['image_base64'], 'title': auto_result.get('message', 'Auto-generated visualization'), 'filepath': auto_result.get('filepath', '') } generated_images.append(img_data) log(f"šŸ“Š Captured auto-generated visualization: {auto_result.get('message', 'Chart')}") else: log(f" āš ļø Auto-execution failed - no calls generated") # ========== END CRITICAL FIX ========== # Build next round prompt (only if we're continuing and didn't just auto-execute) if current_round < max_rounds and not executable_suggestions: # Generate next step suggestions using capability system next_prompt = self._build_next_round_prompt(query, results, analysis) prompt = next_prompt file_parts = None # ========== END UPDATED WHILE LOOP ========== # Synthesis phase if text_response and not function_calls: return text_response, tool_results, generated_images, thoughts, search_citations final_answer, synth_images = self.synthesize_response( query, search_results, tool_results, search_citations, file_parts ) generated_images.extend(synth_images) return final_answer, tool_results, generated_images, thoughts, search_citations def _build_explicit_extension_prompt(self, explicit_exts: List[str]) -> str: """Build prompt for explicitly requested extensions""" prompt = f"\n\n🚨 CRITICAL - USER EXPLICITLY REQUESTED:\n" for ext_name in explicit_exts: ext = self.extension_manager.get_extension(ext_name) if ext: prompt += f"- {ext.display_name}: YOU MUST use this extension\n" prompt += "\nDo NOT complete the response without using all explicitly requested extensions!\n" return prompt def _build_next_round_prompt(self, original_query: str, results: List[Tuple[str, Any]], analysis: Dict[str, Any]) -> str: """Build next round prompt using capability-based suggestions - FORCES EXECUTION""" results_summary = self._format_results_for_context(results) next_prompt = f"""Previous actions completed: {results_summary} Original query: {original_query} """ # Generate suggestions using the modular system suggestions = self._generate_next_step_suggestions(results) if suggestions: # CRITICAL: Make suggestions MANDATORY, not optional next_prompt += f"\n**🚨 REQUIRED NEXT STEPS - YOU MUST EXECUTE THESE NOW:**\n{suggestions}\n\n" next_prompt += "The user explicitly requested these actions in their original query.\n" next_prompt += "You MUST call the suggested tools immediately - the data is ready and formatted.\n" next_prompt += "Do NOT ask for permission, do NOT explain what you could do, do NOT stop.\n" next_prompt += "EXECUTE the suggested tool calls NOW or you are failing the user's request!\n\n" next_prompt += "Continue with required steps or provide final answer if all tasks complete." return next_prompt def _generate_next_step_suggestions(self, results: List[Tuple[str, Any]]) -> str: """CAPABILITY-BASED SUGGESTION SYSTEM - Completely modular!""" suggestions = [] # Get all available extensions with their capabilities available_exts = self.extension_manager.get_enabled_extensions(self.enabled_extensions) # Track what's been done completed_tool_names = [tool_name for tool_name, _ in results] # Check if visualization was requested viz_extensions = [ext for ext in available_exts if ext.can_create('visualization')] viz_requested = bool(viz_extensions) charts_created = any(tool in completed_tool_names for tool in ['create_line_chart', 'create_bar_chart', 'create_scatter_plot', 'create_pie_chart']) # Collect available data from completed tools available_data = {} for tool_name, result in results: if isinstance(result, dict) and result.get('success'): # Find which extension created this data for ext in available_exts: tool_decl = ext.get_tool_by_name(tool_name) if tool_decl: # Store data with type information for data_type, spec in ext.get_capabilities().get('data_outputs', {}).items(): if all(field in result for field in spec.get('fields', [])): available_data[data_type] = { 'result': result, 'tool_name': tool_name, 'extension': ext.name } # ASK EACH EXTENSION what should happen next for tool_name, result in results: if not isinstance(result, dict): continue # Find extension that executed this tool source_ext = next((ext for ext in available_exts if ext.get_tool_by_name(tool_name)), None) if source_ext: # Ask extension for suggestions suggestion = source_ext.get_suggested_next_action(result, available_exts) if suggestion: target_ext_name = suggestion['extension'] suggested_tool = suggestion['tool'] reason = suggestion['reason'] suggested_data = suggestion.get('data', {}) # CRITICAL: Format as EXECUTABLE instruction with provided data suggestions.append( f"šŸ’” {source_ext.display_name} completed successfully!\n" f" šŸŽÆ NEXT ACTION REQUIRED: {reason}\n" f" ⚔ YOU MUST NOW CALL: {suggested_tool}() with the provided data\n" f" šŸ“¦ Data is formatted and ready - execute immediately!\n" f" šŸ“‹ Suggested parameters: {json.dumps(suggested_data, indent=2)[:200]}..." ) # CRITICAL: Check for unfulfilled visualization requests if viz_requested and not charts_created: visualizable_data = [] for data_type, data_info in available_data.items(): for viz_ext in viz_extensions: if viz_ext.can_consume(data_info['result'].get('format', data_type)): visualizable_data.append({ 'data_type': data_type, 'data': data_info, 'viz_ext': viz_ext }) if visualizable_data: suggestions.insert(0, f"🚨 CRITICAL ALERT: User explicitly requested charts/graphs but NONE have been created!\n" f" šŸ“Š Available data ready for immediate visualization:\n" + "\n".join([ f" āœ… {item['data_type']} from {item['data']['tool_name']} " f"→ READY for {item['viz_ext'].name} tools" for item in visualizable_data ]) + f"\n\n ⚔ You MUST create visualizations NOW using the chart tools!\n" f" šŸ“ˆ The user wants to SEE the data visually, not just read about it!\n" f" 🚫 Do NOT ask permission - just CREATE the charts immediately!" ) return "\n\n".join(suggestions) if suggestions else "" def _format_results_for_context(self, results: List[Tuple[str, Any]]) -> str: """Format tool results for context""" formatted = [] for tool_name, result in results: if isinstance(result, dict): clean_result = dict(result) clean_result.pop('image_base64', None) formatted.append(f"- {tool_name}: {json.dumps(clean_result, indent=2)[:300]}") else: formatted.append(f"- {tool_name}: {str(result)[:200]}") return "\n".join(formatted) def call_search_agent(self, query: str, file_parts: List = None) -> Tuple[str, Optional[str]]: """Call search agent""" log("šŸ” Calling Search Agent...") grounding_tool = types.Tool(google_search=types.GoogleSearch()) config = types.GenerateContentConfig( system_instruction="You are a search specialist. Use Google Search to find relevant information.", tools=[grounding_tool], temperature=0.7, max_output_tokens=2048 ) try: content_parts = [] if file_parts: content_parts.extend(file_parts) content_parts.append(query) result_text = "" last_chunk = None stream = self.search_chat.send_message_stream(content_parts, config=config) for chunk in stream: last_chunk = chunk if hasattr(chunk, 'candidates') and chunk.candidates: candidate = chunk.candidates[0] if hasattr(candidate, 'content') and candidate.content: if hasattr(candidate.content, 'parts') and candidate.content.parts: for part in candidate.content.parts: if hasattr(part, 'text') and part.text: result_text += part.text citations = None if last_chunk and hasattr(last_chunk, 'candidates') and last_chunk.candidates: citations = insert_citations_from_grounding(last_chunk.candidates) return result_text, citations except Exception as e: log(f"āš ļø Search Agent error: {e}") return "", None def call_tool_agent(self, prompt: str, reasoning_budget: int, file_parts: List = None, system_prompt: str = None) -> Tuple[List, str, str]: """Call tool execution agent""" tools = self.extension_manager.get_all_tools(self.enabled_extensions) if not system_prompt: system_prompt = self.extension_manager.build_system_prompt( self.user_id, self.enabled_extensions ) config = types.GenerateContentConfig( system_instruction=system_prompt, tools=tools, temperature=0.7, max_output_tokens=4096, thinking_config=types.ThinkingConfig( include_thoughts=True, thinking_budget=reasoning_budget, ) ) try: content_parts = [] if file_parts: content_parts.extend(file_parts) content_parts.append(prompt) response = self.chat.send_message(content_parts, config=config) function_calls = [] text_response = "" thoughts = "" if response.candidates and response.candidates[0].content: for part in response.candidates[0].content.parts: if hasattr(part, 'function_call') and part.function_call: function_calls.append(part.function_call) if getattr(part, "text", None): if getattr(part, "thought", False): thoughts += part.text else: text_response += part.text return function_calls, text_response, thoughts except Exception as e: log(f"āš ļø Tool Agent error: {e}") return [], "", "" def synthesize_response(self, query: str, search_results: str, tool_results: List, search_citations: Optional[str], file_parts: List = None) -> Tuple[str, List]: """Synthesize final response""" log("✨ Synthesizing final response...") synthesis_prompt = f"[Original Query]\n{query}\n\n" if search_results: synthesis_prompt += f"[Web Search Results]\n{search_results}\n\n" generated_images = [] deep_research_report = None deep_research_sources = None has_deep_research = False if tool_results: synthesis_prompt += "[Tool Execution Results]\n" for tool_name, result in tool_results: if isinstance(result, dict) and 'image_base64' in result: generated_images.append({ 'base64': result['image_base64'], 'title': result.get('message', 'Generated visualization'), 'filepath': result.get('filepath', '') }) result_clean = dict(result) result_clean.pop('image_base64', None) synthesis_prompt += f"- {tool_name}: {result_clean.get('message', 'Chart created')}\n" elif tool_name == 'conduct_deep_research' and isinstance(result, dict): if result.get('success') and result.get('report'): has_deep_research = True deep_research_report = result.get('report') deep_research_sources = result.get('sources', []) synthesis_prompt += f"- {tool_name}: Research complete\n" else: result_str = str(result)[:500] synthesis_prompt += f"- {tool_name}: {result_str}\n" synthesis_prompt += "\n\nProvide a comprehensive answer incorporating all information above." config = types.GenerateContentConfig( system_instruction="You are a synthesis specialist. Combine information into coherent responses.", temperature=0.7, max_output_tokens=2048 ) try: content_parts = [types.Part(text=synthesis_prompt)] response = self.client.models.generate_content( model="gemini-2.5-flash", contents=[types.Content(role="user", parts=content_parts)], config=config ) result_text = "" if response.candidates and response.candidates[0].content: for part in response.candidates[0].content.parts: if getattr(part, "text", None): result_text += part.text if has_deep_research and deep_research_report: result_text += "\n\n---\n\n## šŸ“‹ Complete Research Report\n\n" result_text += deep_research_report if deep_research_sources: result_text += "\n\n### šŸ“š Research Sources\n\n" for idx, source in enumerate(deep_research_sources[:30], 1): if isinstance(source, dict): title = source.get('title', 'Source') url = source.get('url', '#') result_text += f"{idx}. [{title}]({url})\n" if len(deep_research_sources) > 30: result_text += f"\n*...and {len(deep_research_sources) - 30} more sources*\n" return result_text, generated_images except Exception as e: log(f"āš ļø Synthesis error: {e}") return "I encountered an error synthesizing the response.", [] def reasoning_budget(level: str) -> int: level = (level or "Dynamic").lower() mapping = {"none": 0, "concise": 256, "strong": 2048, "dynamic": -1} return mapping.get(level, -1) def insert_citations_from_grounding(candidates): """Extract citations from grounding metadata""" try: if not candidates: return None cand = candidates[0] grounding = getattr(cand, "grounding_metadata", None) if not grounding: return None chunks = getattr(grounding, "grounding_chunks", None) or [] if not chunks: return None citations = [] seen_titles = set() for idx, chunk in enumerate(chunks): if hasattr(chunk, 'web') and chunk.web: uri = getattr(chunk.web, "uri", None) title = getattr(chunk.web, "title", None) if uri and title and title not in seen_titles: seen_titles.add(title) citations.append(f"[{title}]({uri})") elif uri: citations.append(f"[Source {idx+1}]({uri})") if citations: return "\n\nšŸ“š **Sources:** " + " • ".join(citations) return None except Exception as e: log(f"āš ļø Citation extraction failed: {e}") return None EXTENSION_MANAGER = ExtensionManager() CHAT_SESSIONS: Dict[str, Dict[str, Any]] = {} def get_or_create_session(api_key: str): if not api_key: return None, None if api_key in CHAT_SESSIONS: return (CHAT_SESSIONS[api_key]["client"], CHAT_SESSIONS[api_key]["chat"]) try: client = genai.Client(api_key=api_key) chat = client.chats.create(model="gemini-2.5-flash") CHAT_SESSIONS[api_key] = {"client": client, "chat": chat} log("āœ… Created new Gemini session with multi-turn chat.") return client, chat except Exception as e: log(f"āŒ Error creating Gemini client: {e}") return None, None def chat_with_gemini(api_key, chat_history_msgs, multimodal_input, show_thoughts, reasoning_level, enabled_extensions): log("=== chat_with_gemini CALLED ===") if not api_key: chat_history_msgs = chat_history_msgs or [] chat_history_msgs.append({ "role": "assistant", "content": "šŸ”‘ Please enter your Gemini API key first." }) yield chat_history_msgs return client, chat = get_or_create_session(api_key) if not client: chat_history_msgs.append({ "role": "assistant", "content": "āš ļø Could not create Gemini session." }) yield chat_history_msgs return user_text = (multimodal_input or {}).get("text", "") or "" uploaded_files = (multimodal_input or {}).get("files", []) or [] if chat_history_msgs is None: chat_history_msgs = [] file_parts = [] if uploaded_files: log(f"šŸ“Ž Processing {len(uploaded_files)} uploaded file(s)...") for file_path in uploaded_files: try: file_part = process_uploaded_file(client, file_path) file_parts.append(file_part) except Exception as e: log(f"āŒ Error processing file {file_path}: {e}") chat_history_msgs.append({"role": "user", "content": user_text}) yield chat_history_msgs assistant_base_index = len(chat_history_msgs) if show_thoughts: thought_index = assistant_base_index chat_history_msgs.append({"role": "assistant", "content": "šŸ’­ Thinking..."}) answer_index = thought_index + 1 chat_history_msgs.append({"role": "assistant", "content": "šŸ¤” Processing..."}) else: thought_index = None answer_index = assistant_base_index chat_history_msgs.append({"role": "assistant", "content": "šŸ¤” Processing..."}) yield chat_history_msgs try: proactive_msgs = EXTENSION_MANAGER.check_proactive_messages(api_key, enabled_extensions) if proactive_msgs: for msg in proactive_msgs: chat_history_msgs.insert(answer_index, {"role": "assistant", "content": msg}) answer_index += 1 yield chat_history_msgs if enabled_extensions: log("šŸŽ­ Using improved orchestrator with capability-based system") orchestrator = ImprovedOrchestrator( client, chat, EXTENSION_MANAGER, api_key, enabled_extensions ) budget = reasoning_budget(reasoning_level) final_answer, tool_results, generated_images, thoughts, search_citations = \ orchestrator.execute_with_planning(user_text, file_parts, budget) log(f"šŸŽØ Orchestrator returned {len(generated_images)} images") if thoughts and show_thoughts: chat_history_msgs[thought_index]["content"] = ( f"
" f"šŸ’­ Agent Thinking" f"
" f"{thoughts.strip()}
" f"
" ) yield chat_history_msgs final_content = ( f"
šŸ¤– Response" f"
" f"{final_answer.strip()}
" ) if generated_images: for idx, img_data in enumerate(generated_images, 1): log(f" šŸ“Š Adding image {idx}: {img_data.get('title', 'Untitled')}") final_content += f"\n\n
" final_content += f"šŸ“Š {img_data.get('title', 'Chart')}
" base64_data = img_data.get('base64', '') if base64_data: final_content += f"" if img_data.get('filepath'): final_content += f"
Saved to: {img_data['filepath']}" final_content += "
" if search_citations: final_content += "\n\n" + search_citations chat_history_msgs[answer_index]["content"] = final_content yield chat_history_msgs else: log("šŸ“ŗ Using simple streaming mode") parts = [] if file_parts: parts.extend(file_parts) parts.append(user_text) budget = reasoning_budget(reasoning_level) grounding_tool = types.Tool(google_search=types.GoogleSearch()) config = types.GenerateContentConfig( system_instruction=BASE_SYSTEM_PROMPT, tools=[grounding_tool], temperature=0.7, top_p=0.9, max_output_tokens=8192, thinking_config=types.ThinkingConfig( include_thoughts=True, thinking_budget=budget, ) ) stream = chat.send_message_stream(parts, config=config) answer = "" thoughts = "" last_chunk = None if show_thoughts: thought_index = answer_index chat_history_msgs[answer_index]["content"] = "šŸ’­ Thinking..." answer_index = len(chat_history_msgs) chat_history_msgs.append({"role": "assistant", "content": ""}) yield chat_history_msgs for chunk in stream: last_chunk = chunk if not getattr(chunk, "candidates", None): continue candidate = chunk.candidates[0] if getattr(candidate, "content", None): for part in candidate.content.parts: if not getattr(part, "text", None): continue if getattr(part, "thought", False): thoughts += part.text if show_thoughts: chat_history_msgs[thought_index]["content"] = ( f"
" f"šŸ’­ Agent Thinking" f"
" f"{thoughts.strip()}
" f"
" ) yield chat_history_msgs else: answer += part.text chat_history_msgs[answer_index]["content"] = ( f"
šŸ¤– Response" f"
" f"{answer.strip()}
" ) yield chat_history_msgs if last_chunk: citations = insert_citations_from_grounding(last_chunk.candidates) if citations: chat_history_msgs[answer_index]["content"] += "\n\n" + citations yield chat_history_msgs log("āœ… Response complete.") return except Exception as e: log(f"āŒ Error: {e}") traceback.print_exc() chat_history_msgs[answer_index]["content"] = f"āš ļø Error: {e}" yield chat_history_msgs return def build_extension_ui(): """Build the extension toggle UI""" extensions = EXTENSION_MANAGER.get_all_extensions() if not extensions: return gr.Markdown("No extensions available"), [] checkboxes = [] with gr.Accordion("šŸ”Œ Agent Extensions", open=True): gr.Markdown("Enable extensions to give the agent additional capabilities:") gr.Markdown("✨ **Intelligent Integration:** Extensions automatically discover and work with each other based on their capabilities") for ext in extensions: cb = gr.Checkbox( label=f"{ext.icon} {ext.display_name}", info=ext.description, value=False ) checkboxes.append((ext.name, cb)) return checkboxes with gr.Blocks( theme=gr.themes.Soft(primary_hue="purple", secondary_hue="blue"), title="Wuhp Agents", fill_width=True ) as demo: gr.HTML(""" """) with gr.Row(): with gr.Column(scale=1, min_width=320): gr.Markdown("## āš™ļø Settings & Controls") api_key = gr.Textbox( label="šŸ”‘ Gemini API Key", placeholder="Paste your Gemini API key here...", type="password", ) reasoning_level = gr.Radio( ["None", "Concise", "Strong", "Dynamic"], label="🧠 Reasoning Level", value="Dynamic", info="Controls the model's thinking depth.", ) show_thoughts = gr.Checkbox( label="šŸ’­ Show Thinking", value=True, info="Display reasoning process before answers.", ) extension_checkboxes = build_extension_ui() with gr.Column(scale=4): with gr.Group(elem_classes="chat-panel"): chatbot = gr.Chatbot( label="šŸ¤– Chat with Wuhp Agent", height=650, show_copy_button=True, type="messages", avatar_images=(None, "https://i.imgur.com/Q2EMk2N.png"), ) multimodal_msg = gr.MultimodalTextbox( file_types=[ "image", "video", "audio", ".pdf", ".txt", ".md", ".html", ".xml", ".doc", ".docx", ".csv", ".json" ], placeholder="Ask anything, upload files, or enable extensions for intelligent multi-tool capabilities...", label="Your Message", elem_classes="message-input", autofocus=True ) enabled_extensions_state = gr.State([]) def clear_box(): return {"text": "", "files": []} def handle_chat(api_key_input, chat_history_msgs, multimodal_dict, thinking_flag, reasoning_lvl, *extension_states): enabled = [] for (ext_name, _), is_enabled in zip(extension_checkboxes, extension_states): if is_enabled: enabled.append(ext_name) log(f"Enabled extensions: {enabled}") yield from chat_with_gemini( api_key_input, chat_history_msgs, multimodal_dict, thinking_flag, reasoning_lvl, enabled ) def check_timers(api_key_input, chat_history, enabled_exts): """Background function to check for completed timers and proactive messages""" if not api_key_input or not enabled_exts: return chat_history proactive_msgs = EXTENSION_MANAGER.check_proactive_messages(api_key_input, enabled_exts) if proactive_msgs: if chat_history is None: chat_history = [] for msg in proactive_msgs: chat_history.append({"role": "assistant", "content": msg}) log(f"šŸ“¬ Proactive message sent from extension") return chat_history checkbox_components = [cb for _, cb in extension_checkboxes] multimodal_msg.submit( fn=handle_chat, inputs=[api_key, chatbot, multimodal_msg, show_thoughts, reasoning_level] + checkbox_components, outputs=[chatbot], queue=True, ).then(fn=clear_box, outputs=[multimodal_msg]) timer_check = gr.Timer(value=10, active=True) def update_enabled_state(*extension_states): enabled = [] for (ext_name, _), is_enabled in zip(extension_checkboxes, extension_states): if is_enabled: enabled.append(ext_name) return enabled for _, cb in extension_checkboxes: cb.change( fn=update_enabled_state, inputs=checkbox_components, outputs=[enabled_extensions_state] ) timer_check.tick( fn=check_timers, inputs=[api_key, chatbot, enabled_extensions_state], outputs=[chatbot] ) if __name__ == "__main__": log(f"===== Wuhp Agents started at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") demo.launch()