|
|
|
|
|
""" |
|
|
HiveGPT Agent ReAct Graph Module |
|
|
|
|
|
This module implements the ReAct workflow for the HiveGPT Agent system. |
|
|
It orchestrates agent reasoning, human approval, tool execution, and response refinement |
|
|
using LangGraph for workflow management and memory support. |
|
|
|
|
|
Key Features: |
|
|
- Human-in-the-loop approval for tool execution |
|
|
- MCP tool integration |
|
|
- Memory-enabled state management |
|
|
- Modular node functions for extensibility |
|
|
|
|
|
Author: HiveNetCode |
|
|
License: Private |
|
|
""" |
|
|
|
|
|
from typing import Sequence, Dict, Any |
|
|
from langchain_core.tools import BaseTool |
|
|
from langchain_core.messages import HumanMessage |
|
|
from langgraph.graph import StateGraph, END |
|
|
import logging |
|
|
from typing_extensions import TypedDict |
|
|
from typing import Dict, Any, Sequence, List |
|
|
from langchain_core.messages import BaseMessage |
|
|
from langchain_core.tools import BaseTool |
|
|
from langchain_openai.chat_models import ChatOpenAI |
|
|
from ComputeAgent.graph.state import AgentState |
|
|
|
|
|
from ComputeAgent.nodes.ReAct import ( |
|
|
agent_reasoning_node, |
|
|
human_approval_node, |
|
|
auto_approval_node, |
|
|
tool_execution_node, |
|
|
generate_node, |
|
|
tool_rejection_exit_node, |
|
|
should_continue_to_approval, |
|
|
should_continue_after_approval, |
|
|
should_continue_after_execution |
|
|
) |
|
|
logger = logging.getLogger("ReAct Workflow") |
|
|
|
|
|
|
|
|
|
|
|
_TOOLS_REGISTRY = {} |
|
|
_LLM_REGISTRY = {} |
|
|
|
|
|
|
|
|
|
|
|
class ReactState(AgentState): |
|
|
""" |
|
|
ReactState extends HiveGPTMemoryState to support ReAct workflow fields. |
|
|
""" |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
class ReactWorkflow: |
|
|
""" |
|
|
Orchestrates the ReAct workflow: |
|
|
1. Agent reasoning and tool selection |
|
|
2. Human approval for tool execution |
|
|
3. Tool execution (special handling for researcher tool) |
|
|
4. Response refinement (skipped for researcher tool) |
|
|
|
|
|
Features: |
|
|
- MCP tool integration |
|
|
- Human-in-the-loop approval for all tool calls |
|
|
- Special handling for researcher tool (bypasses refinement, uses generate_node) |
|
|
- Memory management with conversation summaries and recent message context |
|
|
- Proper state management following AgenticRAG pattern |
|
|
""" |
|
|
def __init__(self, llm, tools: Sequence[BaseTool]): |
|
|
""" |
|
|
Initialize ReAct workflow with LLMs, tools, and optional memory checkpointer. |
|
|
|
|
|
Args: |
|
|
llm: Main LLM for reasoning (will be bound with tools) |
|
|
refining_llm: LLM for response refinement |
|
|
tools: Sequence of MCP tools for execution |
|
|
checkpointer: Optional memory checkpointer for conversation memory |
|
|
""" |
|
|
self.llm = llm.bind_tools(tools) |
|
|
self.tools = tools |
|
|
|
|
|
|
|
|
|
|
|
self.workflow_id = id(self) |
|
|
_TOOLS_REGISTRY[self.workflow_id] = tools |
|
|
_LLM_REGISTRY[self.workflow_id] = self.llm |
|
|
logger.info(f"β
Registered {len(tools)} tools and LLM in global registry (ID: {self.workflow_id})") |
|
|
|
|
|
self.graph = self._create_graph() |
|
|
|
|
|
def _initialize_react_state(self, state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Initialize or update state with workflow_id. |
|
|
The workflow_id is used to retrieve both tools and LLM from the global registry, |
|
|
avoiding serialization issues with the checkpointer. |
|
|
|
|
|
Args: |
|
|
state: Current state (may be from parent graph) |
|
|
|
|
|
Returns: |
|
|
Updated state with workflow_id |
|
|
""" |
|
|
updated_state = state.copy() |
|
|
|
|
|
|
|
|
if not updated_state.get("workflow_id"): |
|
|
updated_state["workflow_id"] = self.workflow_id |
|
|
logger.info(f"β
Workflow ID set in state: {self.workflow_id}") |
|
|
|
|
|
|
|
|
if not updated_state.get("messages"): |
|
|
query = updated_state.get("query", "") |
|
|
if query: |
|
|
updated_state["messages"] = [HumanMessage(content=query)] |
|
|
logger.info(f"π¬ Initialized messages with query for ReACT workflow") |
|
|
else: |
|
|
updated_state["messages"] = [] |
|
|
logger.warning(f"β οΈ No query found to initialize messages") |
|
|
|
|
|
return updated_state |
|
|
|
|
|
def _create_graph(self) -> StateGraph: |
|
|
""" |
|
|
Creates and configures the ReAct workflow graph with memory support. |
|
|
|
|
|
Returns: |
|
|
Compiled StateGraph for ReAct workflow |
|
|
""" |
|
|
workflow = StateGraph(ReactState) |
|
|
|
|
|
|
|
|
workflow.add_node("initialize_react", self._initialize_react_state) |
|
|
|
|
|
|
|
|
workflow.add_node("agent_reasoning", agent_reasoning_node) |
|
|
workflow.add_node("human_approval", human_approval_node) |
|
|
workflow.add_node("auto_approval", auto_approval_node) |
|
|
workflow.add_node("tool_execution", tool_execution_node) |
|
|
workflow.add_node("generate", generate_node) |
|
|
workflow.add_node("tool_rejection_exit", tool_rejection_exit_node) |
|
|
|
|
|
|
|
|
workflow.set_entry_point("initialize_react") |
|
|
|
|
|
|
|
|
workflow.add_edge("initialize_react", "agent_reasoning") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"agent_reasoning", |
|
|
should_continue_to_approval, |
|
|
{ |
|
|
"human_approval": "human_approval", |
|
|
"auto_approval": "auto_approval", |
|
|
"generate": "generate", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"human_approval", |
|
|
should_continue_after_approval, |
|
|
{ |
|
|
"tool_execution": "tool_execution", |
|
|
"tool_rejection_exit": "tool_rejection_exit", |
|
|
"agent_reasoning": "agent_reasoning", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"auto_approval", |
|
|
should_continue_after_approval, |
|
|
{ |
|
|
"tool_execution": "tool_execution", |
|
|
"tool_rejection_exit": "tool_rejection_exit", |
|
|
"agent_reasoning": "agent_reasoning", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"tool_execution", |
|
|
should_continue_after_execution, |
|
|
{ |
|
|
"agent_reasoning": "agent_reasoning", |
|
|
"generate": "generate", |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
workflow.add_edge("generate", END) |
|
|
|
|
|
|
|
|
workflow.add_edge("generate", END) |
|
|
|
|
|
|
|
|
workflow.add_edge("tool_rejection_exit", END) |
|
|
|
|
|
|
|
|
return workflow.compile() |
|
|
|
|
|
def get_compiled_graph(self): |
|
|
"""Return the compiled graph for embedding in parent graph""" |
|
|
return self.graph |
|
|
|
|
|
async def ainvoke(self, query: str, user_id: str = "default_user", session_id: str = "default_session") -> Dict[str, Any]: |
|
|
""" |
|
|
Execute the ReAct workflow with a given query and memory context (async version). |
|
|
|
|
|
Args: |
|
|
query: The user's question/request |
|
|
user_id: User identifier for memory management |
|
|
session_id: Session identifier for memory management |
|
|
|
|
|
Returns: |
|
|
Final state with response and execution details |
|
|
""" |
|
|
initial_state = { |
|
|
|
|
|
"user_id": user_id, |
|
|
"session_id": session_id, |
|
|
"summary": "", |
|
|
|
|
|
|
|
|
"query": query, |
|
|
"response": "", |
|
|
"messages": [HumanMessage(content=query)], |
|
|
|
|
|
|
|
|
"tools": self.tools, |
|
|
"pending_tool_calls": [], |
|
|
"approved_tool_calls": [], |
|
|
"rejected_tool_calls": [], |
|
|
"tool_results": [], |
|
|
|
|
|
|
|
|
"llm": self.llm, |
|
|
|
|
|
|
|
|
"current_step": "initialized", |
|
|
"skip_refinement": False, |
|
|
"researcher_executed": False, |
|
|
|
|
|
|
|
|
"retrieved_documents": [], |
|
|
"search_results": "", |
|
|
"web_search": "No", |
|
|
|
|
|
|
|
|
"final_response_dict": {} |
|
|
} |
|
|
|
|
|
|
|
|
config = None |
|
|
if self.checkpointer: |
|
|
from helpers.memory import get_memory_manager |
|
|
memory_manager = get_memory_manager() |
|
|
thread_id = f"{user_id}:{session_id}" |
|
|
config = {"configurable": {"thread_id": thread_id}} |
|
|
|
|
|
|
|
|
await memory_manager.add_user_message(user_id, session_id, query) |
|
|
|
|
|
logger.info(f"π Starting ReAct workflow for user {user_id}, session {session_id}") |
|
|
|
|
|
if config: |
|
|
result = await self.graph.ainvoke(initial_state, config) |
|
|
else: |
|
|
result = await self.graph.ainvoke(initial_state) |
|
|
|
|
|
|
|
|
if self.checkpointer and result.get("response"): |
|
|
from helpers.memory import get_memory_manager |
|
|
memory_manager = get_memory_manager() |
|
|
await memory_manager.add_ai_response(user_id, session_id, result["response"]) |
|
|
|
|
|
logger.info("β
ReAct workflow completed successfully") |
|
|
return result |
|
|
|
|
|
def invoke(self, query: str, user_id: str = "default_user", session_id: str = "default_session") -> Dict[str, Any]: |
|
|
""" |
|
|
Synchronous wrapper for async workflow with memory support. |
|
|
|
|
|
Args: |
|
|
query: The user's question/request |
|
|
user_id: User identifier for memory management |
|
|
session_id: Session identifier for memory management |
|
|
|
|
|
Returns: |
|
|
Final state with response and execution details |
|
|
""" |
|
|
import asyncio |
|
|
try: |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
if loop.is_running(): |
|
|
|
|
|
import concurrent.futures |
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
future = executor.submit(asyncio.run, self.ainvoke(query, user_id, session_id)) |
|
|
return future.result() |
|
|
else: |
|
|
|
|
|
return loop.run_until_complete(self.ainvoke(query, user_id, session_id)) |
|
|
except RuntimeError: |
|
|
|
|
|
return asyncio.run(self.ainvoke(query, user_id, session_id)) |
|
|
|
|
|
def draw_graph(self, output_file_path: str = "react_workflow_graph.png"): |
|
|
""" |
|
|
Generate and save a visual representation of the ReAct workflow graph. |
|
|
|
|
|
Args: |
|
|
output_file_path: Path where to save the graph PNG file |
|
|
""" |
|
|
try: |
|
|
self.graph.get_graph().draw_mermaid_png(output_file_path=output_file_path) |
|
|
logger.info(f"β
ReAct graph visualization saved to: {output_file_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to generate ReAct graph visualization: {e}") |
|
|
print(f"Error generating ReAct graph: {e}") |
|
|
|
|
|
|
|
|
|
|
|
ReactAgent = ReactWorkflow |
|
|
|