import gradio as gr
from google import genai
from google.genai import types
from google.genai.types import Tool, GoogleSearch, FunctionDeclaration
from PIL import Image
import io
import traceback
import datetime
import re
import importlib
import os
import sys
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
import json
# Add current directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from base_extension import BaseExtension
DESCRIPTION = """
# Wuhp Agents
**Powered by Gemini 2.5 Flash + Google Search Grounding + Agent Extensions**
"""
BASE_SYSTEM_PROMPT = """
You are Wuhp Agent, a versatile AI assistant with extensible capabilities.
Your core abilities include conversation, web search, and image understanding.
When users enable extensions, you gain additional tools and capabilities.
Always use the available tools when they would be helpful to the user.
Be proactive about suggesting when an extension might be useful.
"""
def log(msg: str):
now = datetime.datetime.now().strftime("%H:%M:%S")
print(f"[{now}] {msg}", flush=True)
def get_mime_type(file_path: str) -> str:
"""Determine MIME type from file extension"""
ext = Path(file_path).suffix.lower()
mime_types = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
'.gif': 'image/gif', '.webp': 'image/webp', '.heic': 'image/heic',
'.heif': 'image/heif', '.pdf': 'application/pdf', '.txt': 'text/plain',
'.html': 'text/html', '.md': 'text/markdown', '.mp4': 'video/mp4',
'.mpeg': 'video/mpeg', '.mov': 'video/mov', '.avi': 'video/avi',
'.flv': 'video/x-flv', '.mpg': 'video/mpg', '.webm': 'video/webm',
'.wmv': 'video/wmv', '.3gpp': 'video/3gpp',
}
return mime_types.get(ext, 'application/octet-stream')
def process_uploaded_file(client: genai.Client, file_path: str) -> types.Part:
"""Process an uploaded file and return a Part object"""
mime_type = get_mime_type(file_path)
file_size = Path(file_path).stat().st_size
log(f"š Processing file: {Path(file_path).name} ({mime_type}, {file_size/1024:.1f}KB)")
# For files > 20MB or videos, use File API
if file_size > 20 * 1024 * 1024 or mime_type.startswith('video/'):
log(f"š¤ Uploading large file via File API...")
uploaded_file = client.files.upload(file=file_path)
log(f"ā
File uploaded: {uploaded_file.name}")
return uploaded_file
else:
with open(file_path, 'rb') as f:
file_bytes = f.read()
log(f"ā
File loaded inline")
return types.Part.from_bytes(data=file_bytes, mime_type=mime_type)
class ExtensionManager:
"""Enhanced extension manager with capability-based discovery"""
def __init__(self):
self.extensions: Dict[str, BaseExtension] = {}
self.load_extensions()
def load_extensions(self):
"""Dynamically load all extensions from extensions/ folder"""
extensions_dir = Path("extensions")
if not extensions_dir.exists():
log("ā ļø Extensions directory not found, creating it...")
extensions_dir.mkdir()
return
log(f"š Scanning for extensions in {extensions_dir.absolute()}")
for file in extensions_dir.glob("*.py"):
if file.name.startswith("_"):
continue
try:
module_name = file.stem
spec = importlib.util.spec_from_file_location(module_name, file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, BaseExtension) and
attr != BaseExtension):
ext = attr()
self.extensions[ext.name] = ext
log(f"ā
Loaded extension: {ext.display_name} ({ext.name})")
break
except Exception as e:
log(f"ā Failed to load {file.name}: {e}")
traceback.print_exc()
log(f"š Total extensions loaded: {len(self.extensions)}")
self._analyze_extension_capabilities()
def _analyze_extension_capabilities(self):
"""Analyze and log extension capabilities"""
log("š Analyzing extension capabilities...")
for ext in self.extensions.values():
caps = ext.get_capabilities()
log(f" š¦ {ext.name}:")
if caps.get('provides_data'):
log(f" ā Provides: {', '.join(caps['provides_data'])}")
if caps.get('consumes_data'):
log(f" ā Consumes: {', '.join(caps['consumes_data'])}")
if caps.get('creates_output'):
log(f" ā Creates: {', '.join(caps['creates_output'])}")
# Show data flow possibilities
chains = self.get_data_flow_chain(list(self.extensions.keys()))
if chains:
log(" š Data flow chains detected:")
for chain in chains:
consumers_str = ', '.join(chain['consumers'])
log(f" {chain['provider']} ā {chain['data_type']} ā [{consumers_str}]")
def get_extension(self, name: str) -> Optional[BaseExtension]:
return self.extensions.get(name)
def get_all_extensions(self) -> List[BaseExtension]:
return list(self.extensions.values())
def get_enabled_extensions(self, enabled_list: List[str]) -> List[BaseExtension]:
"""Get list of enabled extension objects"""
return [ext for name, ext in self.extensions.items() if name in enabled_list]
def detect_capabilities_by_keywords(self, query: str) -> List[str]:
"""Detect which extensions might be relevant based on query keywords"""
query_lower = query.lower()
relevant_extensions = []
for ext in self.get_all_extensions():
keywords = ext.get_capabilities().get('keywords', [])
if any(keyword in query_lower for keyword in keywords):
relevant_extensions.append(ext.name)
return relevant_extensions
def find_extensions_by_capability(self, capability_type: str,
capability_value: str) -> List[BaseExtension]:
"""Find extensions by capability"""
matching = []
for ext in self.get_all_extensions():
caps = ext.get_capabilities().get(capability_type, [])
if capability_value in caps:
matching.append(ext)
return matching
def get_data_flow_chain(self, enabled_extensions: List[str]) -> List[Dict]:
"""Analyze enabled extensions and build data flow chain"""
enabled_exts = self.get_enabled_extensions(enabled_extensions)
chains = []
for provider in enabled_exts:
provided_data = provider.get_capabilities().get('provides_data', [])
for data_type in provided_data:
consumers = [
ext for ext in enabled_exts
if ext.can_consume(data_type) and ext.name != provider.name
]
if consumers:
chains.append({
'provider': provider.name,
'data_type': data_type,
'consumers': [c.name for c in consumers]
})
return chains
def build_system_prompt(self, user_id: str, enabled_list: List[str]) -> str:
"""Build enhanced system prompt with capability awareness"""
prompt = BASE_SYSTEM_PROMPT
enabled_exts = self.get_enabled_extensions(enabled_list)
if enabled_exts:
prompt += "\n\n# ENABLED EXTENSIONS\nYou currently have these extensions enabled:\n\n"
for ext in enabled_exts:
ext.initialize_state(user_id)
prompt += f"## {ext.display_name}\n{ext.get_system_context()}\n\n"
state = ext.get_state(user_id)
if state and hasattr(ext, 'get_state_summary'):
state_summary = ext.get_state_summary(user_id)
if state_summary:
prompt += f"**Current State:** {state_summary}\n\n"
if len(enabled_exts) > 1:
prompt += self._build_capability_guidance(enabled_list)
return prompt
def _build_capability_guidance(self, enabled_list: List[str]) -> str:
"""Build intelligent guidance based on actual capabilities"""
guidance = "\n\n# INTELLIGENT INTEGRATION SYSTEM\n"
guidance += "Extensions can automatically work together based on their capabilities:\n\n"
# Detect what combinations are possible
chains = self.get_data_flow_chain(enabled_list)
if chains:
guidance += "**Available Data Flows:**\n"
for chain in chains:
provider_ext = self.get_extension(chain['provider'])
consumers_display = []
for consumer_name in chain['consumers']:
consumer_ext = self.get_extension(consumer_name)
if consumer_ext:
consumers_display.append(consumer_ext.display_name)
guidance += f"- {provider_ext.display_name} provides `{chain['data_type']}` ā "
guidance += f"{', '.join(consumers_display)} can use it\n"
guidance += "\n**Key Principles:**\n"
guidance += "1. **Sequential Operations**: Complete one extension's task, then use results in another\n"
guidance += "2. **Data Flow**: Extensions automatically suggest next steps based on their output\n"
guidance += "3. **No Hardcoding**: System detects compatible extensions dynamically\n\n"
# CRITICAL: Add enforcement rule
guidance += "**šØ CRITICAL EXECUTION RULE:**\n"
guidance += "When you see suggestions like 'š” Extension suggests: reason ā Call tool_name':\n"
guidance += "- You MUST immediately execute the suggested tool\n"
guidance += "- Do NOT ask the user for permission or clarification\n"
guidance += "- The data is already formatted and ready to use\n"
guidance += "- Just call the suggested tool NOW\n"
guidance += "- Failing to execute suggestions means failing the user's request!\n\n"
return guidance
def get_all_tools(self, enabled_list: List[str]) -> List[types.Tool]:
"""Get all tools from enabled extensions"""
tools = []
for ext_name in enabled_list:
ext = self.get_extension(ext_name)
if ext:
tools.extend(ext.get_tools())
return tools
def get_search_tool(self) -> types.Tool:
"""Get Google Search tool separately"""
return types.Tool(google_search=types.GoogleSearch())
def handle_function_calls(self, user_id: str, enabled_list: List[str],
function_calls: List) -> List[Tuple[str, Any]]:
"""Process function calls and return (function_name, result) tuples"""
results = []
for fc in function_calls:
function_name = fc.name
args = fc.args if hasattr(fc, 'args') else {}
log(f"š§ Executing: {function_name}({json.dumps(args, indent=2)[:100]}...)")
handled = False
for ext_name in enabled_list:
ext = self.get_extension(ext_name)
if ext:
for tool in ext.get_tools():
if hasattr(tool, 'function_declarations'):
for func_decl in tool.function_declarations:
if func_decl.name == function_name:
try:
result = ext.handle_tool_call(user_id, function_name, args)
results.append((function_name, result))
log(f"ā
{function_name} completed: {str(result)[:100]}...")
except Exception as e:
log(f"ā Error in {function_name}: {e}")
results.append((function_name, {"error": str(e)}))
handled = True
break
if handled:
break
if handled:
break
if not handled:
log(f"ā ļø Unknown function: {function_name}")
results.append((function_name, {"error": f"Unknown function {function_name}"}))
return results
def check_proactive_messages(self, user_id: str, enabled_list: List[str]) -> List[str]:
"""Check all extensions for proactive messages"""
messages = []
for ext_name in enabled_list:
ext = self.get_extension(ext_name)
if ext:
msg = ext.get_proactive_message(user_id)
if msg:
messages.append(msg)
return messages
class ImprovedOrchestrator:
"""Enhanced orchestrator with capability-based reasoning"""
def __init__(self, client, chat, extension_manager, user_id, enabled_extensions):
self.client = client
self.chat = chat
self.extension_manager = extension_manager
self.user_id = user_id
self.enabled_extensions = enabled_extensions
self.search_chat = client.chats.create(model="gemini-2.5-flash")
self.execution_history = []
def analyze_query(self, query: str, file_parts: List = None) -> Dict[str, Any]:
"""Analyze query using capability detection"""
log("š§ Analyzing query with capability detection...")
# Detect relevant extensions by keywords
relevant_exts = self.extension_manager.detect_capabilities_by_keywords(query)
log(f" š Relevant extensions by keywords: {', '.join(relevant_exts)}")
query_lower = query.lower()
# Detect explicit requests
explicit_extensions = []
if any(word in query_lower for word in ['deep research', 'conduct research', 'research report']):
explicit_extensions.append('deep_research')
if any(word in query_lower for word in ['chart', 'graph', 'visualize', 'plot']):
explicit_extensions.append('visualization')
# Combine keyword detection with explicit requests
all_relevant = list(set(relevant_exts + explicit_extensions))
return {
"needs_search": "maybe",
"relevant_extensions": all_relevant,
"explicit_extensions": explicit_extensions,
"multi_step": len(all_relevant) > 1,
"uses_context": any(word in query_lower for word in ['the', 'my', 'that', 'previous']),
"complexity": "complex" if len(all_relevant) > 2 else "moderate",
"reasoning": f"Detected {len(all_relevant)} relevant extensions"
}
def execute_with_planning(self, query: str, file_parts: List = None,
reasoning_budget: int = -1) -> Tuple[str, List, List, str, Optional[str]]:
"""Execute query with capability-based planning"""
analysis = self.analyze_query(query, file_parts)
# Search phase
search_results = ""
search_citations = None
if analysis.get('needs_search') in ['yes', 'maybe']:
log("š Executing search phase...")
search_results, search_citations = self.call_search_agent(query, file_parts)
# Tool execution phase
log("š ļø Executing tool phase with capability awareness...")
tool_results = []
generated_images = []
thoughts = ""
system_prompt = self.extension_manager.build_system_prompt(
self.user_id, self.enabled_extensions
)
# Add capability-aware guidance
if analysis.get('explicit_extensions'):
system_prompt += self._build_explicit_extension_prompt(analysis['explicit_extensions'])
max_rounds = 5 if analysis['complexity'] == 'complex' else 3
current_round = 0
prompt = query
if search_results:
prompt = f"[Web Search Results]\n{search_results}\n\n[User Query]\n{query}"
# ========== UPDATED WHILE LOOP WITH FIXED AUTO-EXECUTION ==========
while current_round < max_rounds:
current_round += 1
log(f"š Tool execution round {current_round}/{max_rounds}")
function_calls, text_response, round_thoughts = self.call_tool_agent(
prompt, reasoning_budget, file_parts, system_prompt
)
thoughts += round_thoughts
if not function_calls:
log(f"ā
No more tools needed after round {current_round}")
if text_response:
return text_response, tool_results, generated_images, thoughts, search_citations
break
# Execute the function calls
results = self.extension_manager.handle_function_calls(
self.user_id, self.enabled_extensions, function_calls
)
for (tool_name, result) in results:
tool_results.append((tool_name, result))
# Extract images
if isinstance(result, dict) and 'image_base64' in result:
img_data = {
'base64': result['image_base64'],
'title': result.get('message', 'Generated visualization'),
'filepath': result.get('filepath', '')
}
generated_images.append(img_data)
log(f"š Captured visualization: {result.get('message', 'Chart')}")
# ========== CRITICAL FIX: AUTO-EXECUTION WITH ACTUAL DATA ==========
# Check for auto-executable suggestions from completed tools
if current_round < max_rounds:
log(f"š Checking for auto-executable suggestions from completed tools...")
available_exts = self.extension_manager.get_enabled_extensions(self.enabled_extensions)
executable_suggestions = []
for tool_name, result in results:
if not isinstance(result, dict):
continue
# Find extension that executed this tool
source_ext = next((ext for ext in available_exts
if ext.get_tool_by_name(tool_name)), None)
if source_ext:
# Ask extension for suggestions
suggestion = source_ext.get_suggested_next_action(result, available_exts)
if suggestion and suggestion.get('data_ready'):
log(f"ā
Valid suggestion: {suggestion['tool']} from {source_ext.name}")
# CRITICAL: Log actual data being passed
suggested_data = suggestion.get('data', {})
for key, value in suggested_data.items():
if isinstance(value, list):
log(f" ā {key}: list with {len(value)} items")
elif isinstance(value, dict):
log(f" ā {key}: dict with {len(value)} keys")
else:
log(f" ā {key}: {type(value).__name__}")
executable_suggestions.append(suggestion)
# If we have executable suggestions with ready data, execute them NOW
if executable_suggestions:
log(f"ā” Auto-executing {len(executable_suggestions)} suggested tool(s)")
# Build a prompt that FORCES execution with COMPLETE DATA
auto_exec_prompt = "šØ MANDATORY TOOL EXECUTION - NO TEXT RESPONSE ALLOWED šØ\n\n"
auto_exec_prompt += "SYSTEM DIRECTIVE: You must call the specified tool(s) immediately.\n"
auto_exec_prompt += "All required data has been prepared and formatted for you.\n\n"
# Add ALL relevant context from completed tools INCLUDING ACTUAL DATA
auto_exec_prompt += "=== AVAILABLE DATA ===\n"
for tool_name, result in results:
if isinstance(result, dict) and result.get('success'):
auto_exec_prompt += f"\n{tool_name} completed successfully:\n"
# CRITICAL: Include ACTUAL DATA VALUES
if 'ticker' in result:
auto_exec_prompt += f" - Ticker: {result['ticker']}\n"
if 'dates' in result and result['dates']:
dates = result['dates']
auto_exec_prompt += f" - Date range: {dates[0]} to {dates[-1]} ({len(dates)} data points)\n"
auto_exec_prompt += f" - Sample dates: {dates[:3]}\n"
if 'close_prices' in result:
prices = result['close_prices']
auto_exec_prompt += f" - Price range: ${min(prices):.2f} to ${max(prices):.2f}\n"
auto_exec_prompt += f" - Sample prices: {prices[:3]}\n"
if 'data_points' in result:
auto_exec_prompt += f" - Total data points: {result['data_points']}\n"
# For charts, show what data is available
if 'dates' in result and 'close_prices' in result:
auto_exec_prompt += f" - READY FOR CHART: {len(result['dates'])} date/price pairs\n"
auto_exec_prompt += "\n=== REQUIRED ACTIONS ===\n"
for idx, suggestion in enumerate(executable_suggestions, 1):
auto_exec_prompt += f"\n{idx}. CALL: {suggestion['tool']}\n"
auto_exec_prompt += f" REASON: {suggestion['reason']}\n"
auto_exec_prompt += f" PARAMETERS (use these EXACT values):\n"
# ========== UPDATED DATA FORMATTING - HANDLES DICT-OF-DICTS ==========
suggested_data = suggestion.get('data', {})
for key, value in suggested_data.items():
if isinstance(value, list):
# Check if it's a list of dicts
if len(value) > 0 and isinstance(value[0], dict):
auto_exec_prompt += f" - {key}: [ # List of {len(value)} dict(s)\n"
for item_idx, item in enumerate(value):
auto_exec_prompt += f" {{\n"
for sub_key, sub_value in item.items():
if isinstance(sub_value, list):
if len(sub_value) <= 10:
auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n"
else:
auto_exec_prompt += f" '{sub_key}': [ # {len(sub_value)} items total\n"
auto_exec_prompt += f" # First 3: {json.dumps(sub_value[:3])}\n"
auto_exec_prompt += f" # ... ({len(sub_value) - 6} more items)\n"
auto_exec_prompt += f" # Last 3: {json.dumps(sub_value[-3:])}\n"
auto_exec_prompt += f" ],\n"
else:
auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n"
auto_exec_prompt += f" }}\n"
auto_exec_prompt += f" ]\n"
else:
# Simple list of primitives
if len(value) <= 5:
auto_exec_prompt += f" - {key}: {json.dumps(value)}\n"
else:
auto_exec_prompt += f" - {key}: [ # {len(value)} items\n"
auto_exec_prompt += f" First 3: {json.dumps(value[:3])}\n"
auto_exec_prompt += f" ... ({len(value) - 6} more)\n"
auto_exec_prompt += f" Last 3: {json.dumps(value[-3:])}\n"
auto_exec_prompt += f" ]\n"
elif isinstance(value, dict):
# Check if it's a dict of dicts (like visualization's 'data' parameter)
dict_values = list(value.values())
if dict_values and isinstance(dict_values[0], dict):
# This is like: {'Series Name': {'x_values': [...], 'y_values': [...]}}
auto_exec_prompt += f" - {key}: {{ # Dict with {len(value)} series\n"
for series_name, series_data in value.items():
auto_exec_prompt += f" '{series_name}': {{\n"
for sub_key, sub_value in series_data.items():
if isinstance(sub_value, list):
if len(sub_value) <= 10:
auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n"
else:
auto_exec_prompt += f" '{sub_key}': [ # {len(sub_value)} items\n"
auto_exec_prompt += f" # First 3: {json.dumps(sub_value[:3])}\n"
auto_exec_prompt += f" # ... ({len(sub_value) - 6} more items)\n"
auto_exec_prompt += f" # Last 3: {json.dumps(sub_value[-3:])}\n"
auto_exec_prompt += f" ],\n"
else:
auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n"
auto_exec_prompt += f" }},\n"
auto_exec_prompt += f" }}\n"
else:
# Simple dict
auto_exec_prompt += f" - {key}: {{\n"
for sub_key, sub_value in value.items():
if isinstance(sub_value, list):
auto_exec_prompt += f" '{sub_key}': [list with {len(sub_value)} items],\n"
else:
auto_exec_prompt += f" '{sub_key}': {json.dumps(sub_value)},\n"
auto_exec_prompt += f" }}\n"
else:
auto_exec_prompt += f" - {key}: {json.dumps(value)}\n"
# ========== END UPDATED DATA FORMATTING ==========
auto_exec_prompt += "\n=== EXECUTION RULES ===\n"
auto_exec_prompt += "1. You MUST call the tool(s) listed above\n"
auto_exec_prompt += "2. Use EXACTLY the parameters provided above\n"
auto_exec_prompt += "3. The data arrays are READY - construct them from the info above\n"
auto_exec_prompt += "4. Do NOT write any text response\n"
auto_exec_prompt += "5. Do NOT ask for clarification\n"
auto_exec_prompt += "6. Do NOT explain what you're doing\n"
auto_exec_prompt += "7. JUST CALL THE TOOL NOW\n\n"
auto_exec_prompt += "EXECUTE IMMEDIATELY >>>>\n"
log(f" š Auto-exec prompt length: {len(auto_exec_prompt)} chars")
# Execute auto-suggestions with NO reasoning (instant execution)
auto_calls, _, _ = self.call_tool_agent(
auto_exec_prompt, 0, None, system_prompt
)
if auto_calls:
log(f" ā Executing {len(auto_calls)} auto-suggested call(s)")
auto_results = self.extension_manager.handle_function_calls(
self.user_id, self.enabled_extensions, auto_calls
)
for (auto_tool_name, auto_result) in auto_results:
tool_results.append((auto_tool_name, auto_result))
# Extract images from auto-executed tools
if isinstance(auto_result, dict) and 'image_base64' in auto_result:
img_data = {
'base64': auto_result['image_base64'],
'title': auto_result.get('message', 'Auto-generated visualization'),
'filepath': auto_result.get('filepath', '')
}
generated_images.append(img_data)
log(f"š Captured auto-generated visualization: {auto_result.get('message', 'Chart')}")
else:
log(f" ā ļø Auto-execution failed - no calls generated")
# ========== END CRITICAL FIX ==========
# Build next round prompt (only if we're continuing and didn't just auto-execute)
if current_round < max_rounds and not executable_suggestions:
# Generate next step suggestions using capability system
next_prompt = self._build_next_round_prompt(query, results, analysis)
prompt = next_prompt
file_parts = None
# ========== END UPDATED WHILE LOOP ==========
# Synthesis phase
if text_response and not function_calls:
return text_response, tool_results, generated_images, thoughts, search_citations
final_answer, synth_images = self.synthesize_response(
query, search_results, tool_results, search_citations, file_parts
)
generated_images.extend(synth_images)
return final_answer, tool_results, generated_images, thoughts, search_citations
def _build_explicit_extension_prompt(self, explicit_exts: List[str]) -> str:
"""Build prompt for explicitly requested extensions"""
prompt = f"\n\nšØ CRITICAL - USER EXPLICITLY REQUESTED:\n"
for ext_name in explicit_exts:
ext = self.extension_manager.get_extension(ext_name)
if ext:
prompt += f"- {ext.display_name}: YOU MUST use this extension\n"
prompt += "\nDo NOT complete the response without using all explicitly requested extensions!\n"
return prompt
def _build_next_round_prompt(self, original_query: str, results: List[Tuple[str, Any]],
analysis: Dict[str, Any]) -> str:
"""Build next round prompt using capability-based suggestions - FORCES EXECUTION"""
results_summary = self._format_results_for_context(results)
next_prompt = f"""Previous actions completed:
{results_summary}
Original query: {original_query}
"""
# Generate suggestions using the modular system
suggestions = self._generate_next_step_suggestions(results)
if suggestions:
# CRITICAL: Make suggestions MANDATORY, not optional
next_prompt += f"\n**šØ REQUIRED NEXT STEPS - YOU MUST EXECUTE THESE NOW:**\n{suggestions}\n\n"
next_prompt += "The user explicitly requested these actions in their original query.\n"
next_prompt += "You MUST call the suggested tools immediately - the data is ready and formatted.\n"
next_prompt += "Do NOT ask for permission, do NOT explain what you could do, do NOT stop.\n"
next_prompt += "EXECUTE the suggested tool calls NOW or you are failing the user's request!\n\n"
next_prompt += "Continue with required steps or provide final answer if all tasks complete."
return next_prompt
def _generate_next_step_suggestions(self, results: List[Tuple[str, Any]]) -> str:
"""CAPABILITY-BASED SUGGESTION SYSTEM - Completely modular!"""
suggestions = []
# Get all available extensions with their capabilities
available_exts = self.extension_manager.get_enabled_extensions(self.enabled_extensions)
# Track what's been done
completed_tool_names = [tool_name for tool_name, _ in results]
# Check if visualization was requested
viz_extensions = [ext for ext in available_exts if ext.can_create('visualization')]
viz_requested = bool(viz_extensions)
charts_created = any(tool in completed_tool_names
for tool in ['create_line_chart', 'create_bar_chart',
'create_scatter_plot', 'create_pie_chart'])
# Collect available data from completed tools
available_data = {}
for tool_name, result in results:
if isinstance(result, dict) and result.get('success'):
# Find which extension created this data
for ext in available_exts:
tool_decl = ext.get_tool_by_name(tool_name)
if tool_decl:
# Store data with type information
for data_type, spec in ext.get_capabilities().get('data_outputs', {}).items():
if all(field in result for field in spec.get('fields', [])):
available_data[data_type] = {
'result': result,
'tool_name': tool_name,
'extension': ext.name
}
# ASK EACH EXTENSION what should happen next
for tool_name, result in results:
if not isinstance(result, dict):
continue
# Find extension that executed this tool
source_ext = next((ext for ext in available_exts
if ext.get_tool_by_name(tool_name)), None)
if source_ext:
# Ask extension for suggestions
suggestion = source_ext.get_suggested_next_action(result, available_exts)
if suggestion:
target_ext_name = suggestion['extension']
suggested_tool = suggestion['tool']
reason = suggestion['reason']
suggested_data = suggestion.get('data', {})
# CRITICAL: Format as EXECUTABLE instruction with provided data
suggestions.append(
f"š” {source_ext.display_name} completed successfully!\n"
f" šÆ NEXT ACTION REQUIRED: {reason}\n"
f" ā” YOU MUST NOW CALL: {suggested_tool}() with the provided data\n"
f" š¦ Data is formatted and ready - execute immediately!\n"
f" š Suggested parameters: {json.dumps(suggested_data, indent=2)[:200]}..."
)
# CRITICAL: Check for unfulfilled visualization requests
if viz_requested and not charts_created:
visualizable_data = []
for data_type, data_info in available_data.items():
for viz_ext in viz_extensions:
if viz_ext.can_consume(data_info['result'].get('format', data_type)):
visualizable_data.append({
'data_type': data_type,
'data': data_info,
'viz_ext': viz_ext
})
if visualizable_data:
suggestions.insert(0,
f"šØ CRITICAL ALERT: User explicitly requested charts/graphs but NONE have been created!\n"
f" š Available data ready for immediate visualization:\n" +
"\n".join([
f" ā
{item['data_type']} from {item['data']['tool_name']} "
f"ā READY for {item['viz_ext'].name} tools"
for item in visualizable_data
]) +
f"\n\n ā” You MUST create visualizations NOW using the chart tools!\n"
f" š The user wants to SEE the data visually, not just read about it!\n"
f" š« Do NOT ask permission - just CREATE the charts immediately!"
)
return "\n\n".join(suggestions) if suggestions else ""
def _format_results_for_context(self, results: List[Tuple[str, Any]]) -> str:
"""Format tool results for context"""
formatted = []
for tool_name, result in results:
if isinstance(result, dict):
clean_result = dict(result)
clean_result.pop('image_base64', None)
formatted.append(f"- {tool_name}: {json.dumps(clean_result, indent=2)[:300]}")
else:
formatted.append(f"- {tool_name}: {str(result)[:200]}")
return "\n".join(formatted)
def call_search_agent(self, query: str, file_parts: List = None) -> Tuple[str, Optional[str]]:
"""Call search agent"""
log("š Calling Search Agent...")
grounding_tool = types.Tool(google_search=types.GoogleSearch())
config = types.GenerateContentConfig(
system_instruction="You are a search specialist. Use Google Search to find relevant information.",
tools=[grounding_tool],
temperature=0.7,
max_output_tokens=2048
)
try:
content_parts = []
if file_parts:
content_parts.extend(file_parts)
content_parts.append(query)
result_text = ""
last_chunk = None
stream = self.search_chat.send_message_stream(content_parts, config=config)
for chunk in stream:
last_chunk = chunk
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0]
if hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
result_text += part.text
citations = None
if last_chunk and hasattr(last_chunk, 'candidates') and last_chunk.candidates:
citations = insert_citations_from_grounding(last_chunk.candidates)
return result_text, citations
except Exception as e:
log(f"ā ļø Search Agent error: {e}")
return "", None
def call_tool_agent(self, prompt: str, reasoning_budget: int,
file_parts: List = None, system_prompt: str = None) -> Tuple[List, str, str]:
"""Call tool execution agent"""
tools = self.extension_manager.get_all_tools(self.enabled_extensions)
if not system_prompt:
system_prompt = self.extension_manager.build_system_prompt(
self.user_id, self.enabled_extensions
)
config = types.GenerateContentConfig(
system_instruction=system_prompt,
tools=tools,
temperature=0.7,
max_output_tokens=4096,
thinking_config=types.ThinkingConfig(
include_thoughts=True,
thinking_budget=reasoning_budget,
)
)
try:
content_parts = []
if file_parts:
content_parts.extend(file_parts)
content_parts.append(prompt)
response = self.chat.send_message(content_parts, config=config)
function_calls = []
text_response = ""
thoughts = ""
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
if hasattr(part, 'function_call') and part.function_call:
function_calls.append(part.function_call)
if getattr(part, "text", None):
if getattr(part, "thought", False):
thoughts += part.text
else:
text_response += part.text
return function_calls, text_response, thoughts
except Exception as e:
log(f"ā ļø Tool Agent error: {e}")
return [], "", ""
def synthesize_response(self, query: str, search_results: str,
tool_results: List, search_citations: Optional[str],
file_parts: List = None) -> Tuple[str, List]:
"""Synthesize final response"""
log("⨠Synthesizing final response...")
synthesis_prompt = f"[Original Query]\n{query}\n\n"
if search_results:
synthesis_prompt += f"[Web Search Results]\n{search_results}\n\n"
generated_images = []
deep_research_report = None
deep_research_sources = None
has_deep_research = False
if tool_results:
synthesis_prompt += "[Tool Execution Results]\n"
for tool_name, result in tool_results:
if isinstance(result, dict) and 'image_base64' in result:
generated_images.append({
'base64': result['image_base64'],
'title': result.get('message', 'Generated visualization'),
'filepath': result.get('filepath', '')
})
result_clean = dict(result)
result_clean.pop('image_base64', None)
synthesis_prompt += f"- {tool_name}: {result_clean.get('message', 'Chart created')}\n"
elif tool_name == 'conduct_deep_research' and isinstance(result, dict):
if result.get('success') and result.get('report'):
has_deep_research = True
deep_research_report = result.get('report')
deep_research_sources = result.get('sources', [])
synthesis_prompt += f"- {tool_name}: Research complete\n"
else:
result_str = str(result)[:500]
synthesis_prompt += f"- {tool_name}: {result_str}\n"
synthesis_prompt += "\n\nProvide a comprehensive answer incorporating all information above."
config = types.GenerateContentConfig(
system_instruction="You are a synthesis specialist. Combine information into coherent responses.",
temperature=0.7,
max_output_tokens=2048
)
try:
content_parts = [types.Part(text=synthesis_prompt)]
response = self.client.models.generate_content(
model="gemini-2.5-flash",
contents=[types.Content(role="user", parts=content_parts)],
config=config
)
result_text = ""
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
if getattr(part, "text", None):
result_text += part.text
if has_deep_research and deep_research_report:
result_text += "\n\n---\n\n## š Complete Research Report\n\n"
result_text += deep_research_report
if deep_research_sources:
result_text += "\n\n### š Research Sources\n\n"
for idx, source in enumerate(deep_research_sources[:30], 1):
if isinstance(source, dict):
title = source.get('title', 'Source')
url = source.get('url', '#')
result_text += f"{idx}. [{title}]({url})\n"
if len(deep_research_sources) > 30:
result_text += f"\n*...and {len(deep_research_sources) - 30} more sources*\n"
return result_text, generated_images
except Exception as e:
log(f"ā ļø Synthesis error: {e}")
return "I encountered an error synthesizing the response.", []
def reasoning_budget(level: str) -> int:
level = (level or "Dynamic").lower()
mapping = {"none": 0, "concise": 256, "strong": 2048, "dynamic": -1}
return mapping.get(level, -1)
def insert_citations_from_grounding(candidates):
"""Extract citations from grounding metadata"""
try:
if not candidates:
return None
cand = candidates[0]
grounding = getattr(cand, "grounding_metadata", None)
if not grounding:
return None
chunks = getattr(grounding, "grounding_chunks", None) or []
if not chunks:
return None
citations = []
seen_titles = set()
for idx, chunk in enumerate(chunks):
if hasattr(chunk, 'web') and chunk.web:
uri = getattr(chunk.web, "uri", None)
title = getattr(chunk.web, "title", None)
if uri and title and title not in seen_titles:
seen_titles.add(title)
citations.append(f"[{title}]({uri})")
elif uri:
citations.append(f"[Source {idx+1}]({uri})")
if citations:
return "\n\nš **Sources:** " + " ⢠".join(citations)
return None
except Exception as e:
log(f"ā ļø Citation extraction failed: {e}")
return None
EXTENSION_MANAGER = ExtensionManager()
CHAT_SESSIONS: Dict[str, Dict[str, Any]] = {}
def get_or_create_session(api_key: str):
if not api_key:
return None, None
if api_key in CHAT_SESSIONS:
return (CHAT_SESSIONS[api_key]["client"],
CHAT_SESSIONS[api_key]["chat"])
try:
client = genai.Client(api_key=api_key)
chat = client.chats.create(model="gemini-2.5-flash")
CHAT_SESSIONS[api_key] = {"client": client, "chat": chat}
log("ā
Created new Gemini session with multi-turn chat.")
return client, chat
except Exception as e:
log(f"ā Error creating Gemini client: {e}")
return None, None
def chat_with_gemini(api_key, chat_history_msgs, multimodal_input, show_thoughts,
reasoning_level, enabled_extensions):
log("=== chat_with_gemini CALLED ===")
if not api_key:
chat_history_msgs = chat_history_msgs or []
chat_history_msgs.append({
"role": "assistant",
"content": "š Please enter your Gemini API key first."
})
yield chat_history_msgs
return
client, chat = get_or_create_session(api_key)
if not client:
chat_history_msgs.append({
"role": "assistant",
"content": "ā ļø Could not create Gemini session."
})
yield chat_history_msgs
return
user_text = (multimodal_input or {}).get("text", "") or ""
uploaded_files = (multimodal_input or {}).get("files", []) or []
if chat_history_msgs is None:
chat_history_msgs = []
file_parts = []
if uploaded_files:
log(f"š Processing {len(uploaded_files)} uploaded file(s)...")
for file_path in uploaded_files:
try:
file_part = process_uploaded_file(client, file_path)
file_parts.append(file_part)
except Exception as e:
log(f"ā Error processing file {file_path}: {e}")
chat_history_msgs.append({"role": "user", "content": user_text})
yield chat_history_msgs
assistant_base_index = len(chat_history_msgs)
if show_thoughts:
thought_index = assistant_base_index
chat_history_msgs.append({"role": "assistant", "content": "š Thinking..."})
answer_index = thought_index + 1
chat_history_msgs.append({"role": "assistant", "content": "š¤ Processing..."})
else:
thought_index = None
answer_index = assistant_base_index
chat_history_msgs.append({"role": "assistant", "content": "š¤ Processing..."})
yield chat_history_msgs
try:
proactive_msgs = EXTENSION_MANAGER.check_proactive_messages(api_key, enabled_extensions)
if proactive_msgs:
for msg in proactive_msgs:
chat_history_msgs.insert(answer_index, {"role": "assistant", "content": msg})
answer_index += 1
yield chat_history_msgs
if enabled_extensions:
log("š Using improved orchestrator with capability-based system")
orchestrator = ImprovedOrchestrator(
client, chat, EXTENSION_MANAGER, api_key, enabled_extensions
)
budget = reasoning_budget(reasoning_level)
final_answer, tool_results, generated_images, thoughts, search_citations = \
orchestrator.execute_with_planning(user_text, file_parts, budget)
log(f"šØ Orchestrator returned {len(generated_images)} images")
if thoughts and show_thoughts:
chat_history_msgs[thought_index]["content"] = (
f"š Agent Thinking
"
f"