carraraig's picture
finish (#8)
5dd4236 verified
"""
HiveGPT Agent ReAct Graph Module
This module implements the ReAct workflow for the HiveGPT Agent system.
It orchestrates agent reasoning, human approval, tool execution, and response refinement
using LangGraph for workflow management and memory support.
Key Features:
- Human-in-the-loop approval for tool execution
- MCP tool integration
- Memory-enabled state management
- Modular node functions for extensibility
Author: HiveNetCode
License: Private
"""
from typing import Sequence, Dict, Any
from langchain_core.tools import BaseTool
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, END
import logging
from typing_extensions import TypedDict
from typing import Dict, Any, Sequence, List
from langchain_core.messages import BaseMessage
from langchain_core.tools import BaseTool
from langchain_openai.chat_models import ChatOpenAI
from ComputeAgent.graph.state import AgentState
from ComputeAgent.nodes.ReAct import (
agent_reasoning_node,
human_approval_node,
auto_approval_node,
tool_execution_node,
generate_node,
tool_rejection_exit_node,
should_continue_to_approval,
should_continue_after_approval,
should_continue_after_execution
)
logger = logging.getLogger("ReAct Workflow")
# Global registries (to avoid serialization issues with checkpointer)
# Nodes access tools and LLM from here instead of storing them in state
_TOOLS_REGISTRY = {}
_LLM_REGISTRY = {}
# State class for ReAct workflow
class ReactState(AgentState):
"""
ReactState extends HiveGPTMemoryState to support ReAct workflow fields.
"""
pass
# Main workflow class for ReAct
class ReactWorkflow:
"""
Orchestrates the ReAct workflow:
1. Agent reasoning and tool selection
2. Human approval for tool execution
3. Tool execution (special handling for researcher tool)
4. Response refinement (skipped for researcher tool)
Features:
- MCP tool integration
- Human-in-the-loop approval for all tool calls
- Special handling for researcher tool (bypasses refinement, uses generate_node)
- Memory management with conversation summaries and recent message context
- Proper state management following AgenticRAG pattern
"""
def __init__(self, llm, tools: Sequence[BaseTool]):
"""
Initialize ReAct workflow with LLMs, tools, and optional memory checkpointer.
Args:
llm: Main LLM for reasoning (will be bound with tools)
refining_llm: LLM for response refinement
tools: Sequence of MCP tools for execution
checkpointer: Optional memory checkpointer for conversation memory
"""
self.llm = llm.bind_tools(tools)
self.tools = tools
# Register tools and LLM in global registry to avoid serialization issues
# Nodes will access them from the registry instead of state
self.workflow_id = id(self)
_TOOLS_REGISTRY[self.workflow_id] = tools
_LLM_REGISTRY[self.workflow_id] = self.llm
logger.info(f"βœ… Registered {len(tools)} tools and LLM in global registry (ID: {self.workflow_id})")
self.graph = self._create_graph()
def _initialize_react_state(self, state: Dict[str, Any]) -> Dict[str, Any]:
"""
Initialize or update state with workflow_id.
The workflow_id is used to retrieve both tools and LLM from the global registry,
avoiding serialization issues with the checkpointer.
Args:
state: Current state (may be from parent graph)
Returns:
Updated state with workflow_id
"""
updated_state = state.copy()
# Store workflow ID for registry lookup (both tools and LLM)
if not updated_state.get("workflow_id"):
updated_state["workflow_id"] = self.workflow_id
logger.info(f"βœ… Workflow ID set in state: {self.workflow_id}")
# Initialize messages if empty (when coming from parent graph)
if not updated_state.get("messages"):
query = updated_state.get("query", "")
if query:
updated_state["messages"] = [HumanMessage(content=query)]
logger.info(f"πŸ’¬ Initialized messages with query for ReACT workflow")
else:
updated_state["messages"] = []
logger.warning(f"⚠️ No query found to initialize messages")
return updated_state
def _create_graph(self) -> StateGraph:
"""
Creates and configures the ReAct workflow graph with memory support.
Returns:
Compiled StateGraph for ReAct workflow
"""
workflow = StateGraph(ReactState)
# Add initialization node to set up LLM and tools
workflow.add_node("initialize_react", self._initialize_react_state)
# Add nodes - REMOVED refinement node, always use generate for final response
workflow.add_node("agent_reasoning", agent_reasoning_node)
workflow.add_node("human_approval", human_approval_node)
workflow.add_node("auto_approval", auto_approval_node)
workflow.add_node("tool_execution", tool_execution_node)
workflow.add_node("generate", generate_node)
workflow.add_node("tool_rejection_exit", tool_rejection_exit_node)
# Set entry point - start with initialization
workflow.set_entry_point("initialize_react")
# Connect initialization to agent reasoning
workflow.add_edge("initialize_react", "agent_reasoning")
# Add conditional edges from agent reasoning
workflow.add_conditional_edges(
"agent_reasoning",
should_continue_to_approval,
{
"human_approval": "human_approval",
"auto_approval": "auto_approval",
"generate": "generate", # Changed from refinement to generate
}
)
# Add conditional edges from human approval
workflow.add_conditional_edges(
"human_approval",
should_continue_after_approval,
{
"tool_execution": "tool_execution",
"tool_rejection_exit": "tool_rejection_exit",
"agent_reasoning": "agent_reasoning", # For re-reasoning
}
)
# Add conditional edges from auto approval (for consistency with human approval)
workflow.add_conditional_edges(
"auto_approval",
should_continue_after_approval,
{
"tool_execution": "tool_execution",
"tool_rejection_exit": "tool_rejection_exit",
"agent_reasoning": "agent_reasoning", # For re-reasoning
}
)
# Add conditional edges from tool execution
workflow.add_conditional_edges(
"tool_execution",
should_continue_after_execution,
{
"agent_reasoning": "agent_reasoning",
"generate": "generate", # Always generate, never refinement
}
)
# Generate goes directly to END (response formatting is done in generate_node)
workflow.add_edge("generate", END)
# Generation goes directly to END (response formatting is done in generate_node)
workflow.add_edge("generate", END)
# Tool rejection exit goes to END
workflow.add_edge("tool_rejection_exit", END)
# Compile with memory checkpointer if provided
return workflow.compile()
def get_compiled_graph(self):
"""Return the compiled graph for embedding in parent graph"""
return self.graph
async def ainvoke(self, query: str, user_id: str = "default_user", session_id: str = "default_session") -> Dict[str, Any]:
"""
Execute the ReAct workflow with a given query and memory context (async version).
Args:
query: The user's question/request
user_id: User identifier for memory management
session_id: Session identifier for memory management
Returns:
Final state with response and execution details
"""
initial_state = {
# Memory fields
"user_id": user_id,
"session_id": session_id,
"summary": "", # Will be loaded from memory if available
# Core fields
"query": query,
"response": "",
"messages": [HumanMessage(content=query)],
# Tool-related state
"tools": self.tools,
"pending_tool_calls": [],
"approved_tool_calls": [],
"rejected_tool_calls": [],
"tool_results": [],
# LLM instances
"llm": self.llm,
# Flow control
"current_step": "initialized",
"skip_refinement": False,
"researcher_executed": False,
# Retrieved data (for researcher integration)
"retrieved_documents": [],
"search_results": "",
"web_search": "No",
# Final response formatting
"final_response_dict": {}
}
# Configure thread for memory if checkpointer is available
config = None
if self.checkpointer:
from helpers.memory import get_memory_manager
memory_manager = get_memory_manager()
thread_id = f"{user_id}:{session_id}"
config = {"configurable": {"thread_id": thread_id}}
# Add user message to memory
await memory_manager.add_user_message(user_id, session_id, query)
logger.info(f"πŸš€ Starting ReAct workflow for user {user_id}, session {session_id}")
if config:
result = await self.graph.ainvoke(initial_state, config)
else:
result = await self.graph.ainvoke(initial_state)
# Add AI response to memory if checkpointer is available
if self.checkpointer and result.get("response"):
from helpers.memory import get_memory_manager
memory_manager = get_memory_manager()
await memory_manager.add_ai_response(user_id, session_id, result["response"])
logger.info("βœ… ReAct workflow completed successfully")
return result
def invoke(self, query: str, user_id: str = "default_user", session_id: str = "default_session") -> Dict[str, Any]:
"""
Synchronous wrapper for async workflow with memory support.
Args:
query: The user's question/request
user_id: User identifier for memory management
session_id: Session identifier for memory management
Returns:
Final state with response and execution details
"""
import asyncio
try:
# Try to get existing event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is running, create a task
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, self.ainvoke(query, user_id, session_id))
return future.result()
else:
# Run directly
return loop.run_until_complete(self.ainvoke(query, user_id, session_id))
except RuntimeError:
# No event loop, create new one
return asyncio.run(self.ainvoke(query, user_id, session_id))
def draw_graph(self, output_file_path: str = "react_workflow_graph.png"):
"""
Generate and save a visual representation of the ReAct workflow graph.
Args:
output_file_path: Path where to save the graph PNG file
"""
try:
self.graph.get_graph().draw_mermaid_png(output_file_path=output_file_path)
logger.info(f"βœ… ReAct graph visualization saved to: {output_file_path}")
except Exception as e:
logger.error(f"❌ Failed to generate ReAct graph visualization: {e}")
print(f"Error generating ReAct graph: {e}")
# Legacy ReactAgent class for backward compatibility
ReactAgent = ReactWorkflow