""" Enhanced Gradio Interface for ComputeAgent with Tool Approval Support This interface supports BOTH capacity approval and tool approval with full modification capabilities. Features: - Capacity approval (existing) - Tool approval (NEW) - Tool argument modification (NEW) - Re-reasoning requests (NEW) - Batch tool operations (NEW) Author: ComputeAgent Team """ # This allows importing modules from the top-level project directory import os import sys sys.path.append("/home/hivenet") import asyncio import gradio as gr import httpx import logging from typing import Optional, Dict, Any, List, Tuple from datetime import datetime import json import base64 from pathlib import Path logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ComputeAgent-UI") # FastAPI configuration - will be set when creating the interface API_BASE_URL = "http://localhost:8000" API_TIMEOUT = 300.0 # GPU/Location configuration LOCATION_GPU_MAP = { "France": ["RTX 4090"], "UAE-1": ["RTX 4090"], "Texas": ["RTX 5090"], "UAE-2": ["RTX 5090"] } # Load and encode logo def get_logo_base64(filename): """Load a logo and convert to base64 for embedding in HTML.""" try: logo_path = Path(__file__).parent / filename with open(logo_path, "rb") as f: logo_bytes = f.read() return base64.b64encode(logo_bytes).decode() except Exception as e: logger.warning(f"Could not load logo {filename}: {e}") return None HIVENET_LOGO_BASE64 = get_logo_base64("hivenet.jpg") COMPUTEAGENT_LOGO_BASE64 = get_logo_base64("ComputeAgent.png") class ComputeAgentClient: """Client for interacting with ComputeAgent FastAPI backend.""" def __init__(self, base_url: str): self.base_url = base_url self.client = httpx.AsyncClient(timeout=API_TIMEOUT) async def send_query( self, query: str, user_id: str = "demo_user", session_id: str = "demo_session" ) -> Dict[str, Any]: """Send query to FastAPI backend.""" try: response = await self.client.post( f"{self.base_url}/api/compute/query", json={ "query": query, "user_id": user_id, "session_id": session_id } ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"❌ Error sending query: {e}") return {"success": False, "error": str(e)} async def continue_execution( self, thread_id: str, user_input: Dict[str, Any] ) -> Dict[str, Any]: """Continue execution after interrupt.""" try: response = await self.client.post( f"{self.base_url}/api/compute/continue/{thread_id}", json=user_input ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"❌ Error continuing: {e}") return {"success": False, "error": str(e)} async def approve_tools( self, thread_id: str, decision: Dict[str, Any] ) -> Dict[str, Any]: """Approve/reject/modify tools.""" try: response = await self.client.post( f"{self.base_url}/api/compute/approve-tools", json={ "thread_id": thread_id, **decision } ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"❌ Error with tool approval: {e}") return {"success": False, "error": str(e)} class ComputeAgentInterface: """Enhanced Gradio interface with tool approval.""" def __init__(self, api_base_url: str): self.client = ComputeAgentClient(api_base_url) self.current_thread_id = None self.current_interrupt_data = None self.approval_type = None # "capacity" or "tool" self.selected_tools = set() # For tool selection self.tool_modifications = {} # For tool argument mods self.stats = {"total": 0, "successful": 0} logger.info(f"🚀 ComputeAgent UI initialized with tool approval support (API: {api_base_url})") def update_gpu_options(self, location: str): """Update GPU dropdown based on location.""" gpus = LOCATION_GPU_MAP.get(location, []) return gr.update(choices=gpus, value=gpus[0] if gpus else None) def get_stats_display(self) -> str: """Format stats display.""" success_rate = (self.stats["successful"] / max(1, self.stats["total"])) * 100 return f"""**📊 Session Statistics** Total Requests: {self.stats["total"]} Success Rate: {success_rate:.1f}%""" async def process_query( self, message: str, history: List, user_id: str, session_id: str ): """Process query through FastAPI.""" if not message.strip(): yield ( history, "", gr.update(visible=False), # capacity_approval_panel gr.update(visible=False), # capacity_param_panel gr.update(visible=False), # tool_approval_panel gr.update(visible=False), # tool_list_panel self.get_stats_display() ) return user_id = user_id.strip() or "demo_user" session_id = session_id.strip() or f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Add user message history.append([message, "🤖 **Processing...**"]) yield ( history, "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) try: # Send to API result = await self.client.send_query(message, user_id, session_id) if not result.get("success"): error_msg = f"❌ **Error:** {result.get('error', 'Unknown error')}" history[-1][1] = error_msg yield ( history, "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) return # Check if waiting for approval (interrupt) if result.get("state") == "waiting_for_input": self.current_thread_id = result.get("thread_id") self.current_interrupt_data = result.get("interrupt_data", {}) # Determine approval type if "tool_calls" in self.current_interrupt_data: # Tool approval self.approval_type = "tool" formatted_response = self._format_tool_approval(self.current_interrupt_data) history[-1][1] = formatted_response yield ( history, "", gr.update(visible=False), # capacity panels hidden gr.update(visible=False), gr.update(visible=True), # tool approval visible gr.update(visible=True), # tool list visible self.get_stats_display() ) else: # Capacity approval self.approval_type = "capacity" formatted_response = self.current_interrupt_data.get( "formatted_response", self._format_basic_capacity(self.current_interrupt_data) ) history[-1][1] = formatted_response yield ( history, "", gr.update(visible=True), # capacity approval visible gr.update(visible=False), gr.update(visible=False), # tool panels hidden gr.update(visible=False), self.get_stats_display() ) return # Normal completion response_text = result.get("response", "Request completed") history[-1][1] = response_text self.stats["total"] += 1 self.stats["successful"] += 1 yield ( history, "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) except Exception as e: logger.error(f"❌ Error: {e}", exc_info=True) history[-1][1] = f"❌ **Error:** {str(e)}" yield ( history, "", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) def _format_tool_approval(self, interrupt_data: Dict[str, Any]) -> str: """Format tool approval request for display.""" tool_calls = interrupt_data.get("tool_calls", []) query = interrupt_data.get("query", "") if not tool_calls: return "âš ī¸ No tools proposed" tools_list = [] for i, tool in enumerate(tool_calls): tool_name = tool.get("name", "unknown") tool_args = json.dumps(tool.get("args", {}), indent=2) tool_desc = tool.get("description", "No description") tools_list.append(f""" **Tool {i+1}: {tool_name}** - Description: {tool_desc} - Arguments: ```json {tool_args} ``` """) tools_text = "\n".join(tools_list) return f"""# 🔧 **Tool Approval Required** **Query:** {query} **Proposed Tools ({len(tool_calls)}):** {tools_text} âš ī¸ **Please review and approve, modify, or request re-reasoning.** """ def _format_basic_capacity(self, interrupt_data: Dict[str, Any]) -> str: """Basic capacity formatting if formatted_response not available.""" model_name = interrupt_data.get("model_name", "Unknown") memory = interrupt_data.get("estimated_gpu_memory", 0) gpu_reqs = interrupt_data.get("gpu_requirements", {}) gpu_lines = [f" â€ĸ **{gpu}:** {count} GPU{'s' if count > 1 else ''}" for gpu, count in gpu_reqs.items()] gpu_text = "\n".join(gpu_lines) if gpu_lines else " â€ĸ No requirements" return f"""# 📊 **Capacity Estimation** **Model:** `{model_name}` **Estimated GPU Memory:** **{memory:.2f} GB** **GPU Requirements:** {gpu_text} âš ī¸ **Please review and approve or modify the configuration.** """ def build_tool_checkboxes(self): """Build checkbox UI for tool selection.""" if not self.current_interrupt_data or "tool_calls" not in self.current_interrupt_data: return [] tool_calls = self.current_interrupt_data.get("tool_calls", []) # Return list of tool names with indices return [ f"[{i}] {tool.get('name', 'unknown')}: {json.dumps(tool.get('args', {}))}" for i, tool in enumerate(tool_calls) ] # ======================================================================== # CAPACITY APPROVAL HANDLERS # ======================================================================== async def approve_capacity(self, history: List, user_id: str, session_id: str): """Handle capacity approval.""" if not self.current_thread_id or self.approval_type != "capacity": history.append([None, "âš ī¸ No pending capacity approval"]) yield ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) return history.append(["✅ **Approved Capacity**", "🚀 **Continuing deployment...**"]) yield ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) try: approval_input = { "capacity_approved": True, "custom_config": {}, "needs_re_estimation": False } result = await self.client.continue_execution( self.current_thread_id, approval_input ) # Check if there's another interrupt (e.g., tool approval after capacity approval) if result.get("state") == "waiting_for_input": self.current_interrupt_data = result.get("interrupt_data", {}) # Determine approval type if "tool_calls" in self.current_interrupt_data: # Tool approval needed self.approval_type = "tool" formatted_response = self._format_tool_approval(self.current_interrupt_data) history[-1][1] = formatted_response yield ( history, gr.update(visible=False), # capacity panels hidden gr.update(visible=False), gr.update(visible=True), # tool approval visible gr.update(visible=True), # tool list visible self.get_stats_display() ) else: # Another capacity approval (re-estimation) self.approval_type = "capacity" formatted_response = self.current_interrupt_data.get( "formatted_response", self._format_basic_capacity(self.current_interrupt_data) ) history[-1][1] = formatted_response yield ( history, gr.update(visible=True), # capacity approval visible gr.update(visible=False), gr.update(visible=False), # tool panels hidden gr.update(visible=False), self.get_stats_display() ) return # Normal completion if result.get("success"): response = result.get("response", "Deployment completed") history[-1][1] = f"✅ **{response}**" self.stats["total"] += 1 self.stats["successful"] += 1 else: history[-1][1] = f"❌ **Error:** {result.get('error', 'Unknown error')}" self._clear_approval_state() yield ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) except Exception as e: logger.error(f"❌ Approval error: {e}") history[-1][1] = f"❌ **Error:** {str(e)}" yield ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) async def reject_capacity(self, history: List, user_id: str, session_id: str): """Handle capacity rejection.""" if not self.current_thread_id or self.approval_type != "capacity": return self._no_approval_response(history) history.append(["❌ **Rejected Capacity**", "Deployment cancelled"]) rejection_input = { "capacity_approved": False, "custom_config": {}, "needs_re_estimation": False } await self.client.continue_execution(self.current_thread_id, rejection_input) self._clear_approval_state() return ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) async def apply_capacity_modifications( self, history: List, user_id: str, session_id: str, max_model_len: int, max_num_seqs: int, max_batched_tokens: int, kv_cache_dtype: str, gpu_util: float, location: str, gpu_type: str ): """Apply capacity modifications and re-estimate.""" if not self.current_thread_id or self.approval_type != "capacity": history.append([None, "âš ī¸ No pending capacity approval"]) yield self._all_hidden_response(history) return history.append(["🔧 **Re-estimating with new parameters...**", "âŗ **Please wait...**"]) yield self._all_hidden_response(history) try: custom_config = { "GPU_type": gpu_type, "location": location, "max_model_len": int(max_model_len), "max_num_seqs": int(max_num_seqs), "max_num_batched_tokens": int(max_batched_tokens), "kv_cache_dtype": kv_cache_dtype, "gpu_memory_utilization": float(gpu_util) } re_estimate_input = { "capacity_approved": None, "custom_config": custom_config, "needs_re_estimation": True } result = await self.client.continue_execution( self.current_thread_id, re_estimate_input ) # Check if still waiting for input (could be capacity or tool approval) if result.get("state") == "waiting_for_input": self.current_interrupt_data = result.get("interrupt_data", {}) # Determine approval type if "tool_calls" in self.current_interrupt_data: # Tool approval needed after re-estimation self.approval_type = "tool" formatted_response = self._format_tool_approval(self.current_interrupt_data) history[-1][1] = formatted_response yield ( history, gr.update(visible=False), # capacity panels hidden gr.update(visible=False), gr.update(visible=True), # tool approval visible gr.update(visible=True), # tool list visible self.get_stats_display() ) else: # Another capacity approval (re-estimation result) self.approval_type = "capacity" formatted_response = self.current_interrupt_data.get( "formatted_response", self._format_basic_capacity(self.current_interrupt_data) ) history[-1][1] = formatted_response yield ( history, gr.update(visible=True), # Show capacity approval gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) else: # Completed without further interrupts response = result.get("response", "Re-estimation completed") history[-1][1] = f"✅ **{response}**" self._clear_approval_state() yield self._all_hidden_response(history) except Exception as e: logger.error(f"❌ Re-estimation error: {e}") history[-1][1] = f"❌ **Error:** {str(e)}" yield self._all_hidden_response(history) # ======================================================================== # TOOL APPROVAL HANDLERS # ======================================================================== async def approve_all_tools(self, history: List, user_id: str, session_id: str): """Approve all tools.""" if not self.current_thread_id or self.approval_type != "tool": history.append([None, "âš ī¸ No pending tool approval"]) yield self._all_hidden_response(history) return history.append(["✅ **Approved All Tools**", "⚡ **Executing tools...**"]) yield self._all_hidden_response(history) try: result = await self.client.approve_tools( self.current_thread_id, {"action": "approve_all"} ) # Check if there's another interrupt (agent proposing more tools) if result.get("state") == "waiting_for_input": self.current_interrupt_data = result.get("interrupt_data", {}) # Determine approval type if "tool_calls" in self.current_interrupt_data: # More tools proposed self.approval_type = "tool" formatted_response = self._format_tool_approval(self.current_interrupt_data) history[-1][1] = formatted_response yield ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), # tool approval visible gr.update(visible=True), # tool list visible self.get_stats_display() ) else: # Unexpected capacity approval self.approval_type = "capacity" formatted_response = self.current_interrupt_data.get( "formatted_response", self._format_basic_capacity(self.current_interrupt_data) ) history[-1][1] = formatted_response yield ( history, gr.update(visible=True), # capacity approval visible gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) return # Normal completion if result.get("success"): response = result.get("response", "Tools executed successfully") history[-1][1] = f"✅ **{response}**" self.stats["total"] += 1 self.stats["successful"] += 1 else: history[-1][1] = f"❌ **Error:** {result.get('error', 'Unknown error')}" self._clear_approval_state() yield self._all_hidden_response(history) except Exception as e: logger.error(f"❌ Tool approval error: {e}") history[-1][1] = f"❌ **Error:** {str(e)}" yield self._all_hidden_response(history) async def reject_all_tools(self, history: List, user_id: str, session_id: str): """Reject all tools.""" if not self.current_thread_id or self.approval_type != "tool": return self._no_approval_response(history) history.append(["❌ **Rejected All Tools**", "Generating response without tools..."]) try: result = await self.client.approve_tools( self.current_thread_id, {"action": "reject_all"} ) if result.get("success"): response = result.get("response", "Completed without tools") history[-1][1] = f"✅ **{response}**" else: history[-1][1] = f"❌ **Error:** {result.get('error', 'Unknown error')}" self._clear_approval_state() except Exception as e: logger.error(f"❌ Tool rejection error: {e}") history[-1][1] = f"❌ **Error:** {str(e)}" return self._all_hidden_response(history) async def approve_selected_tools( self, history: List, user_id: str, session_id: str, selected_indices: str ): """Approve selected tools by indices.""" if not self.current_thread_id or self.approval_type != "tool": history.append([None, "âš ī¸ No pending tool approval"]) yield self._all_hidden_response(history) return # Parse indices (convert from 1-based to 0-based) try: # User enters 1-based indices (1,2,3), convert to 0-based (0,1,2) indices = [int(i.strip()) - 1 for i in selected_indices.split(",") if i.strip()] # Validate indices are non-negative if any(idx < 0 for idx in indices): history.append([None, "❌ Tool indices must be positive numbers (starting from 1). Example: 1,2,3"]) yield self._all_hidden_response(history) return except: history.append([None, "❌ Invalid indices format. Use: 1,2,3 (starting from 1)"]) yield self._all_hidden_response(history) return history.append([ f"✅ **Approved Tools: {indices}**", "⚡ **Executing selected tools...**" ]) yield self._all_hidden_response(history) try: result = await self.client.approve_tools( self.current_thread_id, { "action": "approve_selected", "tool_indices": indices } ) # Check if there's another interrupt if result.get("state") == "waiting_for_input": self.current_interrupt_data = result.get("interrupt_data", {}) if "tool_calls" in self.current_interrupt_data: self.approval_type = "tool" formatted_response = self._format_tool_approval(self.current_interrupt_data) history[-1][1] = formatted_response yield ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), self.get_stats_display() ) else: self.approval_type = "capacity" formatted_response = self.current_interrupt_data.get( "formatted_response", self._format_basic_capacity(self.current_interrupt_data) ) history[-1][1] = formatted_response yield ( history, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) return # Normal completion if result.get("success"): response = result.get("response", "Selected tools executed") history[-1][1] = f"✅ **{response}**" self.stats["total"] += 1 self.stats["successful"] += 1 else: history[-1][1] = f"❌ **Error:** {result.get('error', 'Unknown error')}" self._clear_approval_state() yield self._all_hidden_response(history) except Exception as e: logger.error(f"❌ Tool approval error: {e}") history[-1][1] = f"❌ **Error:** {str(e)}" yield self._all_hidden_response(history) async def request_re_reasoning( self, history: List, user_id: str, session_id: str, feedback: str ): """Request agent re-reasoning with feedback.""" if not self.current_thread_id or self.approval_type != "tool": history.append([None, "âš ī¸ No pending tool approval"]) yield self._all_hidden_response(history) return if not feedback.strip(): history.append([None, "❌ Please provide feedback for re-reasoning"]) yield self._all_hidden_response(history) return history.append([ f"🔄 **Re-reasoning Request:** {feedback}", "🤔 **Agent reconsidering approach...**" ]) yield self._all_hidden_response(history) try: result = await self.client.approve_tools( self.current_thread_id, { "action": "request_re_reasoning", "feedback": feedback } ) # Should get new tool proposals if result.get("state") == "waiting_for_input": self.current_interrupt_data = result.get("interrupt_data", {}) formatted_response = self._format_tool_approval(self.current_interrupt_data) history[-1][1] = formatted_response yield ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), # tool approval visible gr.update(visible=True), # tool list visible self.get_stats_display() ) else: response = result.get("response", "Re-reasoning completed") history[-1][1] = f"✅ **{response}**" self._clear_approval_state() yield self._all_hidden_response(history) except Exception as e: logger.error(f"❌ Re-reasoning error: {e}") history[-1][1] = f"❌ **Error:** {str(e)}" yield self._all_hidden_response(history) async def modify_tool_args( self, history: List, user_id: str, session_id: str, tool_index: int, new_args_json: str ): """Modify tool arguments and approve.""" if not self.current_thread_id or self.approval_type != "tool": history.append([None, "âš ī¸ No pending tool approval"]) yield self._all_hidden_response(history) return # Parse new arguments try: new_args = json.loads(new_args_json) except: history.append([None, "❌ Invalid JSON format for arguments"]) yield self._all_hidden_response(history) return history.append([ f"🔧 **Modified Tool {tool_index}**", "⚡ **Executing with new arguments...**" ]) yield self._all_hidden_response(history) # Convert from 1-based to 0-based index for backend backend_index = tool_index - 1 if backend_index < 0: history.append([None, "❌ Tool index must be positive (starting from 1)"]) yield self._all_hidden_response(history) return try: result = await self.client.approve_tools( self.current_thread_id, { "action": "modify_and_approve", "modifications": [ { "tool_index": backend_index, "new_args": new_args, "approve": True } ] } ) # Check if there's another interrupt if result.get("state") == "waiting_for_input": self.current_interrupt_data = result.get("interrupt_data", {}) if "tool_calls" in self.current_interrupt_data: self.approval_type = "tool" formatted_response = self._format_tool_approval(self.current_interrupt_data) history[-1][1] = formatted_response yield ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), self.get_stats_display() ) else: self.approval_type = "capacity" formatted_response = self.current_interrupt_data.get( "formatted_response", self._format_basic_capacity(self.current_interrupt_data) ) history[-1][1] = formatted_response yield ( history, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) return # Normal completion if result.get("success"): response = result.get("response", "Tool executed with modifications") history[-1][1] = f"✅ **{response}**" self.stats["total"] += 1 self.stats["successful"] += 1 else: history[-1][1] = f"❌ **Error:** {result.get('error', 'Unknown error')}" self._clear_approval_state() yield self._all_hidden_response(history) except Exception as e: logger.error(f"❌ Modification error: {e}") history[-1][1] = f"❌ **Error:** {str(e)}" yield self._all_hidden_response(history) # ======================================================================== # HELPER METHODS # ======================================================================== def _clear_approval_state(self): """Clear all approval state.""" self.current_thread_id = None self.current_interrupt_data = None self.approval_type = None self.selected_tools = set() self.tool_modifications = {} def _all_hidden_response(self, history): """Return response with all panels hidden.""" return ( history, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) def _no_approval_response(self, history): """Return response for no pending approval.""" history.append([None, "âš ī¸ No pending approval"]) return self._all_hidden_response(history) def show_capacity_modify_dialog(self): """Show capacity parameter modification dialog.""" if not self.current_interrupt_data: return ( gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), 2048, 256, 2048, "auto", 0.9, "France", "RTX 4090" ) model_info = self.current_interrupt_data.get("model_info", {}) return ( gr.update(visible=False), # Hide capacity approval gr.update(visible=True), # Show capacity param gr.update(visible=False), # Hide tool approval gr.update(visible=False), # Hide tool list model_info.get("max_model_len", 2048), model_info.get("max_num_seqs", 256), model_info.get("max_num_batched_tokens", 2048), model_info.get("kv_cache_dtype", "auto"), model_info.get("gpu_memory_utilization", 0.9), model_info.get("location", "France"), model_info.get("GPU_type", "RTX 4090") ) def cancel_capacity_modify(self): """Cancel capacity modification.""" return ( gr.update(visible=True), # Show capacity approval gr.update(visible=False), # Hide capacity param gr.update(visible=False), gr.update(visible=False) ) def clear_chat(self, user_id: str, session_id: str): """Clear chat history.""" self._clear_approval_state() return ( [], # Empty history "", # Clear input gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), self.get_stats_display() ) def new_session(self, user_id: str): """Generate new session ID.""" new_session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" self._clear_approval_state() return new_session_id # Create Gradio theme matching HiveNet brand colors def create_theme(): return gr.themes.Soft( primary_hue="orange", secondary_hue="stone", neutral_hue="slate", font=gr.themes.GoogleFont("Inter") ).set( body_background_fill="#1a1a1a", body_background_fill_dark="#0d0d0d", button_primary_background_fill="#d97706", button_primary_background_fill_hover="#ea580c", button_primary_text_color="#ffffff", block_background_fill="#262626", block_border_color="#404040", input_background_fill="#1f1f1f", slider_color="#d97706", ) def create_gradio_demo(api_base_url: str = "http://localhost:8000"): """ Create and return the Gradio demo interface. Args: api_base_url: Base URL for the FastAPI backend Returns: Gradio Blocks demo """ # Initialize interface with API base URL agent_interface = ComputeAgentInterface(api_base_url) # Create interface with gr.Blocks( title="ComputeAgent - Enhanced with Tool Approval", theme=create_theme(), css=""" .gradio-container { max-width: 100% !important; } .header-box { background: linear-gradient(135deg, #d97706 0%, #ea580c 50%, #dc2626 100%); color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; position: relative; overflow: hidden; } .header-box::before { content: ''; position: absolute; top: 0; left: 0; right: 0; bottom: 0; background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 600 600'%3E%3Cfilter id='noise'%3E%3CfeTurbulence type='fractalNoise' baseFrequency='0.9' numOctaves='3' /%3E%3C/filter%3E%3Crect width='100%25' height='100%25' filter='url(%23noise)' opacity='0.05' /%3E%3C/svg%3E"); pointer-events: none; } .tool-box { background: rgba(41, 37, 36, 0.5); border: 2px solid #57534e; border-radius: 8px; padding: 15px; margin: 10px 0; } /* Make chatbot fill available height dynamically */ .chatbot { height: calc(100vh - 750px) !important; max-height: calc(100vh - 750px) !important; } /* Make input text white */ textarea, input[type="text"], input[type="number"], .input-field { color: white !important; } /* Make labels white */ label { color: white !important; } /* Make number input values white */ input[type="number"] { color: white !important; } /* Make dropdown values white - comprehensive */ select, option { color: white !important; background-color: #1f1f1f !important; } /* Gradio-specific dropdown styling */ .dropdown, .dropdown-content, .dropdown-item { color: white !important; } /* Make sure all input elements show white text */ input, select, textarea { color: white !important; } """ ) as demo: # Header hivenet_logo_html = f'HiveNet Logo' if HIVENET_LOGO_BASE64 else '' computeagent_logo_html = f'ComputeAgent Logo' if COMPUTEAGENT_LOGO_BASE64 else '' gr.HTML(f"""
{computeagent_logo_html}

ComputeAgent

Hivenet AI-Powered Deployment using MCP of Compute by Hivenet

{hivenet_logo_html}
""") with gr.Row(): with gr.Column(scale=11): # Chat interface chatbot = gr.Chatbot( label="Agent Conversation", height=900, show_copy_button=True, elem_classes=["chatbot"] ) with gr.Row(): msg = gr.Textbox( placeholder="Deploy meta-llama/Llama-3.1-70B or ask: What are the latest AI developments?", scale=5, show_label=False ) send_btn = gr.Button("🚀 Send", variant="primary", scale=1) # ================================================================ # CAPACITY APPROVAL PANEL # ================================================================ with gr.Row(visible=False) as capacity_approval_panel: capacity_approve_btn = gr.Button("✅ Approve", variant="primary", scale=1) capacity_modify_btn = gr.Button("🔧 Modify", variant="secondary", scale=1) capacity_reject_btn = gr.Button("❌ Reject", variant="stop", scale=1) # Capacity parameter modification panel with gr.Column(visible=False) as capacity_param_panel: gr.Markdown('

🔧 Capacity Parameter Optimization

') with gr.Row(): with gr.Column(): max_model_len = gr.Number( label="Context Length", value=2048, minimum=1 ) max_num_seqs = gr.Number( label="Max Sequences", value=256, minimum=1 ) with gr.Column(): max_batched_tokens = gr.Number( label="Batch Size", value=2048, minimum=1 ) kv_cache_dtype = gr.Dropdown( choices=["auto", "float32", "float16", "bfloat16", "fp8"], value="auto", label="KV Cache Type" ) with gr.Column(): gpu_util = gr.Slider( minimum=0.1, maximum=1.0, value=0.9, step=0.05, label="GPU Utilization" ) location = gr.Dropdown( choices=list(LOCATION_GPU_MAP.keys()), value="France", label="Location" ) gpu_type = gr.Dropdown( choices=LOCATION_GPU_MAP["France"], value="RTX 4090", label="GPU Type" ) with gr.Row(): capacity_apply_btn = gr.Button("🔄 Re-estimate", variant="primary", scale=2) capacity_cancel_btn = gr.Button("â†Šī¸ Back", variant="secondary", scale=1) # ================================================================ # TOOL APPROVAL PANEL # ================================================================ with gr.Row(visible=False) as tool_approval_panel: tool_approve_all_btn = gr.Button("✅ Approve All", variant="primary", scale=1) tool_reject_all_btn = gr.Button("❌ Reject All", variant="stop", scale=1) with gr.Column(visible=False) as tool_list_panel: gr.Markdown("### 🔧 Tool Actions") with gr.Tab("Selective Approval"): tool_indices_input = gr.Textbox( label="Tool Indices to Approve (comma-separated)", placeholder="1,2,3", info="Enter indices of tools to approve (e.g., '1,3' to approve Tool 1 and Tool 3)" ) tool_approve_selected_btn = gr.Button("✅ Approve Selected", variant="primary") with gr.Tab("Modify Arguments"): tool_index_input = gr.Number( label="Tool Index", value=1, minimum=1, precision=0, info="Enter tool number (e.g., 1 for Tool 1)" ) tool_args_input = gr.TextArea( label="New Arguments (JSON)", placeholder='{"query": "modified search query"}', lines=5 ) tool_modify_btn = gr.Button("🔧 Modify & Approve", variant="secondary") with gr.Tab("Re-Reasoning"): feedback_input = gr.TextArea( label="Feedback for Agent", placeholder="Please search for academic papers instead of news articles...", lines=4 ) re_reasoning_btn = gr.Button("🔄 Request Re-Reasoning", variant="secondary") # Sidebar with gr.Column(scale=1): gr.Markdown('

Control Panel

') with gr.Group(): gr.Markdown('

User Session

') user_id = gr.Textbox( label="User ID", value="demo_user" ) session_id = gr.Textbox( label="Session ID", value=f"session_{datetime.now().strftime('%m%d_%H%M')}" ) with gr.Group(): stats_display = gr.Markdown('

Statistics

No requests yet

') with gr.Group(): gr.Markdown('

Management

') clear_btn = gr.Button("Clear History", variant="secondary") new_session_btn = gr.Button("New Session", variant="secondary") gr.Markdown("""

Examples

Model Deployment:

Tool Usage:

""") # Update GPU options when location changes location.change( fn=agent_interface.update_gpu_options, inputs=[location], outputs=[gpu_type] ) # ======================================================================== # EVENT HANDLERS # ======================================================================== # Query submission msg.submit( agent_interface.process_query, inputs=[msg, chatbot, user_id, session_id], outputs=[chatbot, msg, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) send_btn.click( agent_interface.process_query, inputs=[msg, chatbot, user_id, session_id], outputs=[chatbot, msg, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) # Capacity approval handlers capacity_approve_btn.click( agent_interface.approve_capacity, inputs=[chatbot, user_id, session_id], outputs=[chatbot, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) capacity_reject_btn.click( agent_interface.reject_capacity, inputs=[chatbot, user_id, session_id], outputs=[chatbot, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) capacity_modify_btn.click( agent_interface.show_capacity_modify_dialog, outputs=[capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, max_model_len, max_num_seqs, max_batched_tokens, kv_cache_dtype, gpu_util, location, gpu_type] ) capacity_cancel_btn.click( agent_interface.cancel_capacity_modify, outputs=[capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel] ) capacity_apply_btn.click( agent_interface.apply_capacity_modifications, inputs=[chatbot, user_id, session_id, max_model_len, max_num_seqs, max_batched_tokens, kv_cache_dtype, gpu_util, location, gpu_type], outputs=[chatbot, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) # Tool approval handlers tool_approve_all_btn.click( agent_interface.approve_all_tools, inputs=[chatbot, user_id, session_id], outputs=[chatbot, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) tool_reject_all_btn.click( agent_interface.reject_all_tools, inputs=[chatbot, user_id, session_id], outputs=[chatbot, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) tool_approve_selected_btn.click( agent_interface.approve_selected_tools, inputs=[chatbot, user_id, session_id, tool_indices_input], outputs=[chatbot, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) tool_modify_btn.click( agent_interface.modify_tool_args, inputs=[chatbot, user_id, session_id, tool_index_input, tool_args_input], outputs=[chatbot, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) re_reasoning_btn.click( agent_interface.request_re_reasoning, inputs=[chatbot, user_id, session_id, feedback_input], outputs=[chatbot, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) # Management handlers clear_btn.click( agent_interface.clear_chat, inputs=[user_id, session_id], outputs=[chatbot, msg, capacity_approval_panel, capacity_param_panel, tool_approval_panel, tool_list_panel, stats_display] ) new_session_btn.click( agent_interface.new_session, inputs=[user_id], outputs=[session_id] ) return demo