File size: 12,510 Bytes
8816dfd 5dd4236 8816dfd 5dd4236 8816dfd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
"""
HiveGPT Agent ReAct Graph Module
This module implements the ReAct workflow for the HiveGPT Agent system.
It orchestrates agent reasoning, human approval, tool execution, and response refinement
using LangGraph for workflow management and memory support.
Key Features:
- Human-in-the-loop approval for tool execution
- MCP tool integration
- Memory-enabled state management
- Modular node functions for extensibility
Author: HiveNetCode
License: Private
"""
from typing import Sequence, Dict, Any
from langchain_core.tools import BaseTool
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, END
import logging
from typing_extensions import TypedDict
from typing import Dict, Any, Sequence, List
from langchain_core.messages import BaseMessage
from langchain_core.tools import BaseTool
from langchain_openai.chat_models import ChatOpenAI
from ComputeAgent.graph.state import AgentState
from ComputeAgent.nodes.ReAct import (
agent_reasoning_node,
human_approval_node,
auto_approval_node,
tool_execution_node,
generate_node,
tool_rejection_exit_node,
should_continue_to_approval,
should_continue_after_approval,
should_continue_after_execution
)
logger = logging.getLogger("ReAct Workflow")
# Global registries (to avoid serialization issues with checkpointer)
# Nodes access tools and LLM from here instead of storing them in state
_TOOLS_REGISTRY = {}
_LLM_REGISTRY = {}
# State class for ReAct workflow
class ReactState(AgentState):
"""
ReactState extends HiveGPTMemoryState to support ReAct workflow fields.
"""
pass
# Main workflow class for ReAct
class ReactWorkflow:
"""
Orchestrates the ReAct workflow:
1. Agent reasoning and tool selection
2. Human approval for tool execution
3. Tool execution (special handling for researcher tool)
4. Response refinement (skipped for researcher tool)
Features:
- MCP tool integration
- Human-in-the-loop approval for all tool calls
- Special handling for researcher tool (bypasses refinement, uses generate_node)
- Memory management with conversation summaries and recent message context
- Proper state management following AgenticRAG pattern
"""
def __init__(self, llm, tools: Sequence[BaseTool]):
"""
Initialize ReAct workflow with LLMs, tools, and optional memory checkpointer.
Args:
llm: Main LLM for reasoning (will be bound with tools)
refining_llm: LLM for response refinement
tools: Sequence of MCP tools for execution
checkpointer: Optional memory checkpointer for conversation memory
"""
self.llm = llm.bind_tools(tools)
self.tools = tools
# Register tools and LLM in global registry to avoid serialization issues
# Nodes will access them from the registry instead of state
self.workflow_id = id(self)
_TOOLS_REGISTRY[self.workflow_id] = tools
_LLM_REGISTRY[self.workflow_id] = self.llm
logger.info(f"β
Registered {len(tools)} tools and LLM in global registry (ID: {self.workflow_id})")
self.graph = self._create_graph()
def _initialize_react_state(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""
Initialize or update state with workflow_id.
The workflow_id is used to retrieve both tools and LLM from the global registry,
avoiding serialization issues with the checkpointer.
Args:
state: Current state (may be from parent graph)
Returns:
Updated state with workflow_id
"""
updated_state = state.copy()
# Store workflow ID for registry lookup (both tools and LLM)
if not updated_state.get("workflow_id"):
updated_state["workflow_id"] = self.workflow_id
logger.info(f"β
Workflow ID set in state: {self.workflow_id}")
# Initialize messages if empty (when coming from parent graph)
if not updated_state.get("messages"):
query = updated_state.get("query", "")
if query:
updated_state["messages"] = [HumanMessage(content=query)]
logger.info(f"π¬ Initialized messages with query for ReACT workflow")
else:
updated_state["messages"] = []
logger.warning(f"β οΈ No query found to initialize messages")
return updated_state
def _create_graph(self) -> StateGraph:
"""
Creates and configures the ReAct workflow graph with memory support.
Returns:
Compiled StateGraph for ReAct workflow
"""
workflow = StateGraph(ReactState)
# Add initialization node to set up LLM and tools
workflow.add_node("initialize_react", self._initialize_react_state)
# Add nodes - REMOVED refinement node, always use generate for final response
workflow.add_node("agent_reasoning", agent_reasoning_node)
workflow.add_node("human_approval", human_approval_node)
workflow.add_node("auto_approval", auto_approval_node)
workflow.add_node("tool_execution", tool_execution_node)
workflow.add_node("generate", generate_node)
workflow.add_node("tool_rejection_exit", tool_rejection_exit_node)
# Set entry point - start with initialization
workflow.set_entry_point("initialize_react")
# Connect initialization to agent reasoning
workflow.add_edge("initialize_react", "agent_reasoning")
# Add conditional edges from agent reasoning
workflow.add_conditional_edges(
"agent_reasoning",
should_continue_to_approval,
{
"human_approval": "human_approval",
"auto_approval": "auto_approval",
"generate": "generate", # Changed from refinement to generate
}
)
# Add conditional edges from human approval
workflow.add_conditional_edges(
"human_approval",
should_continue_after_approval,
{
"tool_execution": "tool_execution",
"tool_rejection_exit": "tool_rejection_exit",
"agent_reasoning": "agent_reasoning", # For re-reasoning
}
)
# Add conditional edges from auto approval (for consistency with human approval)
workflow.add_conditional_edges(
"auto_approval",
should_continue_after_approval,
{
"tool_execution": "tool_execution",
"tool_rejection_exit": "tool_rejection_exit",
"agent_reasoning": "agent_reasoning", # For re-reasoning
}
)
# Add conditional edges from tool execution
workflow.add_conditional_edges(
"tool_execution",
should_continue_after_execution,
{
"agent_reasoning": "agent_reasoning",
"generate": "generate", # Always generate, never refinement
}
)
# Generate goes directly to END (response formatting is done in generate_node)
workflow.add_edge("generate", END)
# Generation goes directly to END (response formatting is done in generate_node)
workflow.add_edge("generate", END)
# Tool rejection exit goes to END
workflow.add_edge("tool_rejection_exit", END)
# Compile with memory checkpointer if provided
return workflow.compile()
def get_compiled_graph(self):
"""Return the compiled graph for embedding in parent graph"""
return self.graph
async def ainvoke(self, query: str, user_id: str = "default_user", session_id: str = "default_session") -> Dict[str, Any]:
"""
Execute the ReAct workflow with a given query and memory context (async version).
Args:
query: The user's question/request
user_id: User identifier for memory management
session_id: Session identifier for memory management
Returns:
Final state with response and execution details
"""
initial_state = {
# Memory fields
"user_id": user_id,
"session_id": session_id,
"summary": "", # Will be loaded from memory if available
# Core fields
"query": query,
"response": "",
"messages": [HumanMessage(content=query)],
# Tool-related state
"tools": self.tools,
"pending_tool_calls": [],
"approved_tool_calls": [],
"rejected_tool_calls": [],
"tool_results": [],
# LLM instances
"llm": self.llm,
# Flow control
"current_step": "initialized",
"skip_refinement": False,
"researcher_executed": False,
# Retrieved data (for researcher integration)
"retrieved_documents": [],
"search_results": "",
"web_search": "No",
# Final response formatting
"final_response_dict": {}
}
# Configure thread for memory if checkpointer is available
config = None
if self.checkpointer:
from helpers.memory import get_memory_manager
memory_manager = get_memory_manager()
thread_id = f"{user_id}:{session_id}"
config = {"configurable": {"thread_id": thread_id}}
# Add user message to memory
await memory_manager.add_user_message(user_id, session_id, query)
logger.info(f"π Starting ReAct workflow for user {user_id}, session {session_id}")
if config:
result = await self.graph.ainvoke(initial_state, config)
else:
result = await self.graph.ainvoke(initial_state)
# Add AI response to memory if checkpointer is available
if self.checkpointer and result.get("response"):
from helpers.memory import get_memory_manager
memory_manager = get_memory_manager()
await memory_manager.add_ai_response(user_id, session_id, result["response"])
logger.info("β
ReAct workflow completed successfully")
return result
def invoke(self, query: str, user_id: str = "default_user", session_id: str = "default_session") -> Dict[str, Any]:
"""
Synchronous wrapper for async workflow with memory support.
Args:
query: The user's question/request
user_id: User identifier for memory management
session_id: Session identifier for memory management
Returns:
Final state with response and execution details
"""
import asyncio
try:
# Try to get existing event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is running, create a task
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, self.ainvoke(query, user_id, session_id))
return future.result()
else:
# Run directly
return loop.run_until_complete(self.ainvoke(query, user_id, session_id))
except RuntimeError:
# No event loop, create new one
return asyncio.run(self.ainvoke(query, user_id, session_id))
def draw_graph(self, output_file_path: str = "react_workflow_graph.png"):
"""
Generate and save a visual representation of the ReAct workflow graph.
Args:
output_file_path: Path where to save the graph PNG file
"""
try:
self.graph.get_graph().draw_mermaid_png(output_file_path=output_file_path)
logger.info(f"β
ReAct graph visualization saved to: {output_file_path}")
except Exception as e:
logger.error(f"β Failed to generate ReAct graph visualization: {e}")
print(f"Error generating ReAct graph: {e}")
# Legacy ReactAgent class for backward compatibility
ReactAgent = ReactWorkflow
|