import ast import base64 import json import os import tempfile from typing import Any, Dict, List from dotenv import load_dotenv import gradio as gr from mcp import ClientSession, types as mcp_types from mcp.client.streamable_http import streamablehttp_client # Load local environment (for dev) including optional .env.blaxel with Blaxel creds load_dotenv() load_dotenv(".env.blaxel", override=False) # All tools now talk to a local MCP gateway (uvicorn + mcp_servers.gateway) _MCP_GATEWAY_BASE_URL = "http://127.0.0.1:8004" _BL_API_KEY = os.getenv("BL_API_KEY") _BL_WORKSPACE = os.getenv("BL_WORKSPACE") _MCP_HEADERS: Dict[str, str] = {} if _BL_API_KEY: # Support both generic Authorization and Blaxel-specific header names (harmless for local) _MCP_HEADERS["Authorization"] = f"Bearer {_BL_API_KEY}" _MCP_HEADERS["X-Blaxel-Authorization"] = f"Bearer {_BL_API_KEY}" if _BL_WORKSPACE: _MCP_HEADERS["X-Blaxel-Workspace"] = _BL_WORKSPACE LOGS_SERVER_URL = f"{_MCP_GATEWAY_BASE_URL}/logs/mcp" VOICE_SERVER_URL = f"{_MCP_GATEWAY_BASE_URL}/voice/mcp" NEBIUS_SERVER_URL = f"{_MCP_GATEWAY_BASE_URL}/nebius/mcp" MODAL_SERVER_URL = f"{_MCP_GATEWAY_BASE_URL}/modal/mcp" BLAXEL_SERVER_URL = f"{_MCP_GATEWAY_BASE_URL}/blaxel/mcp" def _prepare_spoken_text(markdown: str) -> str: """Convert the rich incident markdown into a concise, TTS-friendly summary. - Strips markdown syntax like **, ``, and leading list markers. - Collapses newlines into sentences. """ # Remove bold/inline code markers text = markdown.replace("**", "").replace("`", "") # Remove leading list markers like "- " lines = [] for line in text.splitlines(): stripped = line.lstrip() if stripped.startswith("- "): stripped = stripped[2:] lines.append(stripped) text = " ".join(lines) # Normalize whitespace text = " ".join(text.split()) # Add a small preface so it sounds more natural return f"Here is a short incident recap: {text}".strip() async def _summarize_logs_via_mcp(service: str = "recs-api", env: str = "prod") -> Dict[str, Any]: async with streamablehttp_client(LOGS_SERVER_URL, headers=_MCP_HEADERS) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool( "summarize_logs", arguments={"service": service, "env": env}, ) # Prefer structured content but fall back to parsing JSON text data: Dict[str, Any] = result.structuredContent or {} if not data and getattr(result, "content", None): first_block = result.content[0] if isinstance(first_block, mcp_types.TextContent): try: data = json.loads(first_block.text) except Exception: data = {} if not isinstance(data, dict): return {} return data async def _generate_voice_summary_via_mcp(text: str) -> str: if not text.strip(): raise ValueError("No content available to synthesize.") async with streamablehttp_client(VOICE_SERVER_URL, headers=_MCP_HEADERS) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() voices_result = await session.call_tool("list_voices", arguments={}) # Prefer structuredContent but fall back to parsing unstructured JSON data: Dict[str, Any] = voices_result.structuredContent or {} if not data and getattr(voices_result, "content", None): first_block = voices_result.content[0] if isinstance(first_block, mcp_types.TextContent): try: data = json.loads(first_block.text) except Exception: data = {} voices_list = data.get("voices") or data.get("data") or [] voice_id = None if isinstance(voices_list, list) and voices_list: first_voice = voices_list[0] if isinstance(first_voice, dict): voice_id = first_voice.get("voice_id") or first_voice.get("id") if not voice_id: raise RuntimeError("No ElevenLabs voices available from MCP server.") audio_result = await session.call_tool( "generate_incident_summary_audio", arguments={"text": text, "voice_id": voice_id}, ) # Prefer structured JSON but fall back to parsing text blocks audio_data: Dict[str, Any] = audio_result.structuredContent or {} if not audio_data and getattr(audio_result, "content", None): first_block = audio_result.content[0] if isinstance(first_block, mcp_types.TextContent): try: audio_data = json.loads(first_block.text) except Exception: audio_data = {} audio_b64 = audio_data.get("audio_base64") if not audio_b64: # Try one more time to extract from text content (JSON or Python literal) if getattr(audio_result, "content", None): first_block = audio_result.content[0] if isinstance(first_block, mcp_types.TextContent): raw_text = first_block.text parsed: Dict[str, Any] = {} try: parsed = json.loads(raw_text) except Exception: try: parsed_obj = ast.literal_eval(raw_text) if isinstance(parsed_obj, dict): parsed = parsed_obj except Exception: parsed = {} if isinstance(parsed, dict): audio_b64 = parsed.get("audio_base64") or audio_b64 if audio_b64: audio_data = parsed if not audio_b64: # Try to surface any error message returned by the MCP tool error_details: Dict[str, Any] = {} if audio_data: error_details["structured"] = audio_data if getattr(audio_result, "content", None): first_block = audio_result.content[0] if isinstance(first_block, mcp_types.TextContent): error_details["text"] = first_block.text raise RuntimeError( "No audio_base64 field returned from ElevenLabs MCP server. " f"Details: {error_details}" ) audio_bytes = base64.b64decode(audio_b64) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp: tmp.write(audio_bytes) path = tmp.name return path async def _get_logs_via_mcp(service: str = "recs-api", env: str = "prod") -> List[Dict[str, Any]]: """Fetch raw logs via MCP (from Neon-backed or synthetic store).""" async with streamablehttp_client(LOGS_SERVER_URL, headers=_MCP_HEADERS) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool( "get_logs", arguments={"service": service, "env": env}, ) raw = result.structuredContent # FastMCP wraps json_response results under a "result" key. logs: List[Dict[str, Any]] = [] if isinstance(raw, list): logs = raw elif isinstance(raw, dict): maybe_list = raw.get("result") or raw.get("logs") if isinstance(maybe_list, list): logs = maybe_list # If structuredContent is empty, fall back to parsing text (if present). if not logs and getattr(result, "content", None): first_block = result.content[0] if isinstance(first_block, mcp_types.TextContent): try: parsed = json.loads(first_block.text) if isinstance(parsed, list): logs = parsed elif isinstance(parsed, dict): maybe_list = parsed.get("result") or parsed.get("logs") if isinstance(maybe_list, list): logs = maybe_list except Exception: logs = [] return logs async def _nebius_incident_summary_via_mcp( user_message: str, logs_summary_text: str, ) -> Dict[str, Any]: async with streamablehttp_client(NEBIUS_SERVER_URL, headers=_MCP_HEADERS) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool( "nebius_incident_summary", arguments={ "user_description": user_message, "logs_summary": logs_summary_text, }, ) return result.structuredContent or {} async def _modal_deep_analysis_via_mcp() -> str: """Call the Modal MCP server for deep log analysis and pretty-print the result.""" logs = await _get_logs_via_mcp(service="recs-api", env="prod") async with streamablehttp_client(MODAL_SERVER_URL, headers=_MCP_HEADERS) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool( "deep_log_analysis", arguments={"service": "recs-api", "env": "prod", "logs": logs}, ) raw = result.structuredContent or {} # FastMCP with json_response=True wraps tool return values under # a top-level "result" key. Unwrap to get the actual analysis dict. if isinstance(raw, dict) and "result" in raw and isinstance(raw["result"], dict): data: Dict[str, Any] = raw["result"] else: data = raw if isinstance(raw, dict) else {} if not isinstance(data, dict): return f"Deep analysis (Modal) returned non-dict result: {raw!r}" # Build a concise human-readable headline from the analysis payload. log_count = data.get("log_count") sev_counts = data.get("severity_counts") or {} top_region = data.get("top_region") latest_error = data.get("latest_error") or {} sev_parts = [] if isinstance(sev_counts, dict): for sev, cnt in sev_counts.items(): sev_parts.append(f"{sev}={cnt}") sev_str = ", ".join(sev_parts) if sev_parts else "no severity distribution available" latest_err_msg = latest_error.get("message") if isinstance(latest_error, dict) else None latest_err_ts = latest_error.get("timestamp") if isinstance(latest_error, dict) else None headline_parts = [] if isinstance(log_count, int): headline_parts.append(f"Analyzed {log_count} logs") if sev_parts: headline_parts.append(f"severity mix: {sev_str}") if top_region: headline_parts.append(f"top region: {top_region}") headline = "; ".join(headline_parts) if headline_parts else "Deep log analysis summary" if latest_err_msg: if latest_err_ts: headline += f". Latest error at {latest_err_ts}: {latest_err_msg}" else: headline += f". Latest error: {latest_err_msg}" pretty = json.dumps(data, indent=2) return ( f"**Deep Analysis (Modal)**\n\n{headline}\n\n" f"```json\n{pretty}\n```" ) async def _blaxel_run_diagnostic_via_mcp() -> str: """Run a simple sandbox diagnostic via Blaxel MCP server.""" async with streamablehttp_client(BLAXEL_SERVER_URL, headers=_MCP_HEADERS) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() result = await session.call_tool( "run_simple_diagnostic", arguments={ # A slightly richer default command that demonstrates the # sandbox is actually running OS-level commands. "command": ( "echo '[sandbox] incident diagnostics start' && " "uname -a && echo 'sandbox diagnostics ok'" ), }, ) raw = result.structuredContent # FastMCP with json_response=True wraps tool return values under # a top-level "result" key. Unwrap that so we get the actual # diagnostics dict from the Blaxel MCP server. if isinstance(raw, dict) and "result" in raw and isinstance(raw["result"], dict): data: Dict[str, Any] = raw["result"] else: data = raw if isinstance(raw, dict) else {} # If we still don't have structured data, surface any textual error # returned by the MCP tool so the user can see what went wrong # (e.g. auth issues, quota, etc.). if not data and getattr(result, "content", None): first_block = result.content[0] if isinstance(first_block, mcp_types.TextContent): text = first_block.text.strip() return ( "**Sandbox Diagnostics (Blaxel)**\n" "Blaxel MCP did not return structured diagnostics data. Raw response:\n\n" f"```text\n{text}\n```" ) if not isinstance(data, dict): return f"Diagnostics (Blaxel) returned non-dict result: {raw!r}" stdout = str(data.get("stdout", "")).strip() stderr = str(data.get("stderr", "")).strip() exit_code = data.get("exit_code") parts = ["**Sandbox Diagnostics (Blaxel)**"] parts.append(f"Exit code: {exit_code}") if stdout: parts.append("\n**stdout:**\n") parts.append(f"```\n{stdout}\n```") if stderr: parts.append("\n**stderr:**\n") parts.append(f"```\n{stderr}\n```") return "\n".join(parts) async def _auto_triage_incident(history: List[Dict[str, Any]]) -> str: """Agentic triage flow that orchestrates logs, Nebius, Modal, and Blaxel. It reads the latest user incident description, pulls logs via the Logs MCP (Neon-backed), runs Nebius for a structured incident summary, then runs Modal deep analysis and Blaxel sandbox diagnostics. The result is a markdown report describing the steps taken and the findings. """ # Find the latest user message to triage last_user = None for msg in reversed(history or []): if msg.get("role") == "user": last_user = msg break if not last_user: return ( "**Agent Triage Report**\n\n" "No user incident description found yet. " "Please describe an incident in the chat first." ) user_message = str(last_user.get("content", "")) steps: List[str] = [] steps.append("1. Read your latest incident description.") logs_summary_text = "No logs summary available." logs_data: Dict[str, Any] = {} nebius_data: Dict[str, Any] = {} # Step 2: Pull logs and build a brief summary try: logs_data = await _summarize_logs_via_mcp(service="recs-api", env="prod") logs_summary_text = logs_data.get("summary", logs_summary_text) steps.append( "2. Pulled recent logs for `recs-api` (prod) from Neon via the Logs MCP server." ) except Exception as exc: # pragma: no cover - defensive logs_summary_text = f"(Error fetching logs summary from MCP: {exc})" steps.append("2. Attempted to pull logs from Neon but hit an error.") # Step 3: Nebius incident summary try: nebius_data = await _nebius_incident_summary_via_mcp( user_message=user_message, logs_summary_text=logs_summary_text, ) severity = nebius_data.get("severity", "Unknown") steps.append( f"3. Generated a structured incident summary using Nebius (severity: {severity})." ) except Exception as exc: # pragma: no cover - defensive nebius_data = {} steps.append(f"3. Nebius incident summarization failed: {exc}.") # Step 4: Modal deep analysis try: modal_section = await _modal_deep_analysis_via_mcp() steps.append( "4. Ran deep log analysis with Modal over the same Neon-backed logs." ) except Exception as exc: # pragma: no cover - defensive modal_section = f"Deep analysis (Modal) failed: {exc}" steps.append("4. Attempted deep log analysis with Modal but hit an error.") # Step 5: Blaxel sandbox diagnostics try: blaxel_section = await _blaxel_run_diagnostic_via_mcp() steps.append( "5. Executed sandbox diagnostics in a Blaxel VM to validate basic system health." ) except Exception as exc: # pragma: no cover - defensive blaxel_section = f"Sandbox diagnostics (Blaxel) failed: {exc}" steps.append("5. Attempted sandbox diagnostics with Blaxel but hit an error.") # Format the Nebius summary section if nebius_data: title = nebius_data.get("title", "Incident Summary") severity = nebius_data.get("severity", "Unknown") impact = nebius_data.get("impact", "Not specified") root_cause = nebius_data.get("root_cause", "Not specified") actions = nebius_data.get("actions", []) if isinstance(actions, list): actions_text = "\n".join(f"- {a}" for a in actions) else: actions_text = str(actions) nebius_section = ( f"**{title}** (severity: {severity})\n\n" f"**Impact:** {impact}\n\n" f"**Probable root cause:** {root_cause}\n\n" "**Log summary (recs-api, prod):**\n" f"{logs_summary_text}\n\n" "**Recommended actions:**\n" f"{actions_text}\n" ) else: nebius_section = ( "**Incident Summary:** Nebius summarization was not available.\n\n" "**Log summary (recs-api, prod):**\n" f"{logs_summary_text}\n" ) steps_md = "\n".join(f"- {s}" for s in steps) report = ( "**Agent Triage Report**\n\n" "**Steps taken:**\n" f"{steps_md}\n\n" "---\n\n" "### Incident Summary (Nebius)\n\n" f"{nebius_section}\n\n" "### Deep Log Analysis (Modal)\n\n" f"{modal_section}\n\n" "### Sandbox Diagnostics (Blaxel)\n\n" f"{blaxel_section}\n" ) return report async def _chat_fn(message: str, history: List[Dict[str, Any]]) -> str: try: logs_data = await _summarize_logs_via_mcp(service="recs-api", env="prod") logs_summary_text = logs_data.get("summary", "No logs summary available.") except Exception as exc: logs_summary_text = f"(Error fetching logs summary from MCP: {exc})" try: nebius_data = await _nebius_incident_summary_via_mcp( user_message=message, logs_summary_text=logs_summary_text, ) except Exception as exc: # Fall back to a simpler message if Nebius is unavailable return ( "Thanks for the incident description.\n\n" "Here is a synthetic log summary for service `recs-api` (prod):\n" f"{logs_summary_text}\n\n" f"(Nebius incident summary call failed: {exc})" ) title = nebius_data.get("title", "Incident Summary") severity = nebius_data.get("severity", "Unknown") impact = nebius_data.get("impact", "Not specified") root_cause = nebius_data.get("root_cause", "Not specified") actions = nebius_data.get("actions", []) if isinstance(actions, list): actions_text = "\n".join(f"- {a}" for a in actions) else: actions_text = str(actions) reply = ( f"**{title}** (severity: {severity})\n\n" f"**Impact:** {impact}\n\n" f"**Probable root cause:** {root_cause}\n\n" "**Log summary (recs-api, prod):**\n" f"{logs_summary_text}\n\n" "**Recommended actions:**\n" f"{actions_text}\n\n" "This summary was generated via Nebius Token Factory. " "You can click **Generate Voice Summary (ElevenLabs)** to hear an audio recap." ) return reply async def _voice_from_history(history: List[Dict[str, Any]]) -> str: last_assistant = None for msg in reversed(history or []): if msg.get("role") == "assistant": last_assistant = msg break if not last_assistant: raise ValueError("No assistant messages found to synthesize.") content = str(last_assistant.get("content", "")) spoken_text = _prepare_spoken_text(content) return await _generate_voice_summary_via_mcp(spoken_text) def build_interface() -> gr.Blocks: with gr.Blocks( title="Incident & Error Copilot", ) as demo: gr.Markdown("# Incident & Error Copilot", elem_classes=["incident-header"]) gr.Markdown( "Enterprise incident assistant powered by MCP servers (logs, Nebius, Modal, Blaxel, ElevenLabs).", elem_classes=["incident-subheader"], ) with gr.Row(): with gr.Column(scale=3): chat = gr.ChatInterface( fn=_chat_fn, textbox=gr.Textbox( placeholder="Describe an incident or paste an error stack trace...", label="Incident description", ), title="Incident Copilot", ) with gr.Column(scale=2): gr.Markdown("### Incident Tools") gr.Markdown( "Use these MCP-backed tools to go deeper on the current incident.", ) # Voice summary panel with gr.Group(elem_classes=["tool-panel"]): gr.Markdown( "**Play Voice Summary (ElevenLabs)** — listen to the latest assistant summary as audio.", ) voice_button = gr.Button("Play Voice Summary (ElevenLabs)") audio_player = gr.Audio( label="Incident Voice Summary", type="filepath", interactive=False, ) # Agentic auto-triage panel with gr.Group(elem_classes=["tool-panel"]): gr.Markdown( "**Auto Triage Incident (Agent)** — runs logs, Nebius, Modal and Blaxel in sequence and produces a triage report.", ) auto_button = gr.Button("Auto Triage Incident (Agent)") auto_output = gr.Markdown(label="Agent Triage Report") # Modal deep analysis panel with gr.Group(elem_classes=["tool-panel"]): gr.Markdown( "**Deep Log Analysis (Modal)** — calls a Modal function for detailed log statistics and latest error context.", ) modal_button = gr.Button("Deep Log Analysis (Modal)") modal_output = gr.Markdown(label="Deep Analysis Result") # Blaxel sandbox diagnostics panel with gr.Group(elem_classes=["tool-panel"]): gr.Markdown( "**Sandbox Health Check (Blaxel)** — runs a lightweight command inside a Blaxel sandbox and shows its output.", ) blaxel_button = gr.Button("Sandbox Health Check (Blaxel)") blaxel_output = gr.Markdown(label="Diagnostics Output") voice_button.click(_voice_from_history, inputs=[chat.chatbot], outputs=[audio_player]) auto_button.click(_auto_triage_incident, inputs=[chat.chatbot], outputs=[auto_output]) modal_button.click(_modal_deep_analysis_via_mcp, inputs=None, outputs=[modal_output]) blaxel_button.click(_blaxel_run_diagnostic_via_mcp, inputs=None, outputs=[blaxel_output]) return demo if __name__ == "__main__": demo = build_interface() demo.launch() # For local dev; HF Spaces will call `demo = build_interface()` implicitly