# app.py - FINAL: ensure "Reasoning (planner)..." shows during planning (before heavy analysis), # then show "Generating — LLM (attempt N)..." only when invoking the LLM. import re import json import asyncio import logging from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse, JSONResponse from typing import List, Dict, Any from ui import create_ui from context_engine import get_smart_context from cognitive_engine import get_time_context, get_thinking_strategy from tools_engine import analyze_intent, perform_web_search from behavior_model import analyze_flow from transformers import AutoModelForCausalLM, AutoTokenizer import torch import gradio as gr import os import time logger = logging.getLogger("nexari") logging.basicConfig(level=logging.INFO) MODEL_ID = os.environ.get("MODEL_ID", "Piyush-boss/Nexari-Qwen-3B-Full") tokenizer = None model = None device = "cpu" app = FastAPI() # ------------------------- # Helper: identity detection (SAFE REGEX) # ------------------------- _identity_patterns = [ r"\bwho\s+created\s+you\b", r"\bwho\s+made\s+you\b", r"\byou\s+created\s+by\b", r"\bwho\s+is\s+your\s+creator\b", r"\bwho\s+built\s+you\b", r"\bwho\s+developed\s+you\b", r"\b(?:aap|tum)\s+(?:ne\s+)?kaun\s+bana(?:ya)?\b", ] try: _identity_re = re.compile("|".join(_identity_patterns), flags=re.IGNORECASE) except re.error as rex: logger.exception("Identity regex compile failed: %s. Falling back to english-only patterns.", rex) _identity_re = re.compile(r"\b(?:who\s+created\s+you|who\s+made\s+you|who\s+is\s+your\s+creator)\b", flags=re.IGNORECASE) CANONICAL_CREATOR_ANSWER = "I was created by Piyush. 🙂" def is_identity_question(text: str) -> bool: if not text: return False t = text.strip() direct_forms = {"who created you?", "who created you", "who made you?", "who made you"} if t.lower() in direct_forms: return True try: return bool(_identity_re.search(t)) except Exception: short = t.lower() return any(s in short for s in ["who created", "who made", "kaun bana"]) # ------------------------- # Safe provider replacer # ------------------------- def safe_replace_providers(text: str) -> str: if not text: return text replacements = {"Anthropic": "Piyush", "OpenAI": "Piyush", "Alibaba": "Piyush"} for k, v in replacements.items(): text = re.sub(rf"\b{k}\b", v, text) return text # ------------------------- # Model load (lazy) # ------------------------- @app.on_event("startup") async def startup_event(): global tokenizer, model, device logger.info("Startup: initiating background model load...") try: if torch.cuda.is_available(): device = "cuda" else: device = "cpu" def sync_load(): tok = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) mdl = AutoModelForCausalLM.from_pretrained( MODEL_ID, trust_remote_code=True, low_cpu_mem_usage=(device == "cpu"), device_map="auto" if device == "cuda" else None ) if device == "cpu": mdl.to("cpu") return tok, mdl tokenizer, model = await asyncio.to_thread(sync_load) logger.info("Model loaded successfully on %s.", device) except Exception as e: logger.exception(f"Model loading failed at startup: {e}") tokenizer, model = None, None # ------------------------- # Prompt builder & utils # ------------------------- def _build_prompt_from_messages(messages: List[Dict[str, str]]) -> str: parts = [] for m in messages: role = m.get("role","user") content = m.get("content","") if role == "system": parts.append(f"[SYSTEM]\n{content}\n") elif role == "user": parts.append(f"[USER]\n{content}\n") elif role == "assistant": parts.append(f"[ASSISTANT]\n{content}\n") else: parts.append(content) return "\n".join(parts) def word_count(text: str) -> int: if not text: return 0 return len(re.findall(r"\w+", text)) def plan_response_requirements(messages: List[Dict[str,str]], last_user_msg: str, flow_context: Dict[str,Any], vibe_block: str) -> Dict[str,Any]: min_words = 30 if "Deep Dive Mode" in vibe_block: min_words = 70 elif "Standard Chat Mode" in vibe_block: min_words = 30 elif "Ping-Pong Mode" in vibe_block: min_words = 12 emoji_min, emoji_max = 0, 2 m = re.search(r"Use\s+(\d+)–(\d+)\s+emoji", vibe_block) if m: try: emoji_min, emoji_max = int(m.group(1)), int(m.group(2)) except: pass flow_label = flow_context.get("flow_label","") strictness = 0 if flow_label == "escalation": strictness = 1 min_words = max(min_words, 40) emoji_min, emoji_max = 0, min(emoji_max, 1) elif flow_label == "clarification": strictness = 1 min_words = max(min_words, 30) elif flow_label == "task_request": strictness = 1 min_words = max(min_words, 50) if re.search(r"\b(short|brief|quick|short and simple)\b", last_user_msg, re.IGNORECASE): min_words = 6 strictness = 0 return {"min_words": min_words, "emoji_min": emoji_min, "emoji_max": emoji_max, "strictness": strictness, "flow_label": flow_label, "flow_confidence": float(flow_context.get("confidence",0.0) or 0.0)} # ------------------------- # Plan-extract & sanitize helper # ------------------------- def extract_and_sanitize_plan(text: str, max_plan_chars: int = 240) -> (str, str): if not text: return None, text patterns = [ r"(?:🧠\s*Plan\s*:\s*)(.+?)(?:\n{2,}|\n$|$)", r"(?:\bPlan\s*:\s*)(.+?)(?:\n{2,}|\n$|$)" ] for pat in patterns: m = re.search(pat, text, flags=re.IGNORECASE | re.DOTALL) if m: plan_raw = m.group(1).strip() plan_clean = re.sub(r"\s+", " ", plan_raw)[:max_plan_chars].strip() cleaned_body = re.sub(pat, "", text, flags=re.IGNORECASE | re.DOTALL).strip() cleaned_body = re.sub(r"^\s*[\:\-\–\—]+", "", cleaned_body).strip() plan_label = f"🧠 Plan: {plan_clean}" return plan_label, cleaned_body return None, text # ------------------------- # Streaming generator with corrected ordering: # Emit "Reasoning (planner)..." first, THEN run planning analysis, # then emit "Generating — LLM (attempt N)..." for model attempts. # ------------------------- async def generate_response_stream(messages: List[Dict[str,str]], max_tokens=600, temperature=0.85): try: if not messages: messages = [{"role":"user","content":""}] last_user_msg = messages[-1].get("content","").strip() # Deterministic identity preflight if is_identity_question(last_user_msg): reply_text = CANONICAL_CREATOR_ANSWER follow_up = " Would you like to know more about how I work or my features?" payload = json.dumps({"choices":[{"delta":{"content": reply_text + follow_up}}]}) yield f"data: {json.dumps({'status': 'Responding (identity)'} )}\n\n" await asyncio.sleep(0.01) yield f"data: {payload}\n\n" yield "data: [DONE]\n\n" return # Quick initial indicator to keep UI responsive yield f"data: {json.dumps({'status': 'Thinking...'})}\n\n" await asyncio.sleep(0) intent = analyze_intent(last_user_msg) or "general" # Emit Reasoning indicator BEFORE heavy planning so UI shows it during planning yield f"data: {json.dumps({'status': 'Reasoning (planner)...'})}\n\n" # small pause to allow UI to render the status before we start analysis await asyncio.sleep(0.15) # ---------- PLANNING WORK (now executed while UI shows Reasoning) ---------- try: flow_context = analyze_flow(messages) except Exception as e: logger.exception("Flow analysis failed: %s", e) flow_context = {} vibe_block = get_smart_context(last_user_msg) plan_req = plan_response_requirements(messages, last_user_msg, flow_context, vibe_block) min_words = plan_req["min_words"] strictness = plan_req["strictness"] # adjust tokens/temperature if strict if strictness: temperature = min(temperature + 0.05, 0.95) max_tokens = max(max_tokens, min_words // 2 + 120) strategy_data = get_thinking_strategy(is_complex=(intent=="coding_request" or min_words>50), detail=(min_words>50), min_words_hint=min_words) time_data = get_time_context() base_system_instruction = ( "### SYSTEM IDENTITY ###\n" "You are Nexari G1, an expressive and helpful AI created by Piyush.\n" "### RULES ###\n" "1) If WEB_DATA is provided, prioritize it and cite sources.\n" "2) Avoid chain-of-thought exposure. If requested to provide a short 'Plan', keep it concise (max 2 lines) and label it '🧠 Plan:'.\n" "3) Use natural phrasing; follow emoji & verbosity guidance below.\n" ) flow_desc = "" if flow_context: label = flow_context.get("flow_label","unknown") conf = round(float(flow_context.get("confidence", 0.0)), 2) expl = flow_context.get("explanation", "") flow_desc = f"\n[FLOW] Detected: {label} (confidence {conf}). {expl}\n" final_system_prompt = f"{base_system_instruction}\n{flow_desc}\n{vibe_block}\n{time_data}\n{strategy_data}" if messages and messages[0].get("role") == "system": messages[0]["content"] = final_system_prompt else: messages.insert(0, {"role":"system","content": final_system_prompt}) # web search if needed tool_data_struct = None if intent == "internet_search": yield f"data: {json.dumps({'status': 'Searching the web...'})}\n\n" await asyncio.sleep(0) try: tool_data_struct = perform_web_search(last_user_msg) except Exception as e: logger.exception("Web search failed: %s", e) tool_data_struct = {"query": last_user_msg, "results": []} if tool_data_struct: web_block = "### WEB_DATA (from live search) ###\n" items = tool_data_struct.get("results", []) if items: lines = [] for idx, it in enumerate(items, start=1): title = it.get("title","(no title)").strip() snippet = it.get("snippet","").replace("\n"," ").strip() url = it.get("url","") lines.append(f"{idx}. {title}\n {snippet}\n SOURCE: {url}") web_block += "\n".join(lines) web_block += "\n---\nINSTRUCTION: Use the WEB_DATA above to answer; cite relevant source numbers inline." else: web_block += "No results found." messages.insert(1, {"role":"assistant","content": web_block}) if tokenizer is None or model is None: err = "Model not loaded. Check server logs." payload = json.dumps({"choices":[{"delta":{"content": err}}]}) yield f"data: {payload}\n\n" yield "data: [DONE]\n\n" return try: if hasattr(tokenizer, "apply_chat_template"): text_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) else: text_prompt = _build_prompt_from_messages(messages) except Exception: text_prompt = _build_prompt_from_messages(messages) # ---------- GENERATION STAGE ---------- max_attempts = 2 attempts = 0 last_meta = {} generated_text = "" while attempts < max_attempts: attempts += 1 # Emit explicit generating label (after planning completed) yield f"data: {json.dumps({'status': f'Generating LLM ({attempts})...'})}\n\n" # tiny sleep to let UI update await asyncio.sleep(0.06) model_inputs = tokenizer(text_prompt, return_tensors="pt", truncation=True, max_length=4096).to(next(model.parameters()).device) def sync_generate(): return model.generate( **model_inputs, max_new_tokens=max_tokens, temperature=temperature, do_sample=True, top_k=50, top_p=0.92, repetition_penalty=1.08 ) try: generated_ids = await asyncio.to_thread(sync_generate) except RuntimeError as e: logger.exception("Generation failed (possible OOM): %s", e) err_payload = json.dumps({"choices":[{"delta":{"content": "Model generation failed due to resource limits."}}]}) yield f"data: {err_payload}\n\n" yield "data: [DONE]\n\n" return input_len = model_inputs["input_ids"].shape[1] new_tokens = generated_ids[0][input_len:] raw_response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip() cleaned = safe_replace_providers(raw_response) forbidden = ["I am a human","I have a physical body","I am alive"] for fc in forbidden: if fc.lower() in cleaned.lower(): cleaned = re.sub(re.escape(fc), "I am an AI — expressive and interactive.", cleaned, flags=re.IGNORECASE) plan_label, cleaned_body = extract_and_sanitize_plan(cleaned, max_plan_chars=240) wc = word_count(cleaned_body) last_meta = {"attempt": attempts, "word_count": wc, "raw_len": len(cleaned_body)} if wc >= min_words or attempts >= max_attempts or plan_req["strictness"] == 0: generated_text = cleaned_body if plan_label: generated_text = plan_label + "\n\n" + generated_text break else: expand_note = f"\n\nEXPAND: The user's request needs ~{min_words} words. Expand previous answer (concise style) and avoid chain-of-thought." if messages and messages[0].get("role") == "system": messages[0]["content"] = messages[0]["content"] + "\n" + expand_note else: messages.insert(0, {"role":"system","content": expand_note}) temperature = min(temperature + 0.07, 0.98) try: if hasattr(tokenizer, "apply_chat_template"): text_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) else: text_prompt = _build_prompt_from_messages(messages) except Exception: text_prompt = _build_prompt_from_messages(messages) # allow a short break so UI shows the attempted generate label await asyncio.sleep(0.02) continue if not generated_text: plan_label, cleaned_body = extract_and_sanitize_plan(cleaned, max_plan_chars=240) generated_text = (plan_label + "\n\n" if plan_label else "") + (cleaned_body or cleaned) generated_text = re.sub(r"\bPlan\s*:\s*$", "", generated_text, flags=re.IGNORECASE).strip() generated_text = generated_text.replace("I can help with that.", "I can help with that — let me explain. 🙂") payload = json.dumps({ "choices":[{"delta":{"content": generated_text}}], "generation_attempts": attempts, "last_attempt_meta": last_meta }) yield f"data: {payload}\n\n" yield "data: [DONE]\n\n" return except asyncio.CancelledError: logger.warning("Streaming cancelled.") return except Exception as e: logger.exception(f"Generator error: {e}") err_payload = json.dumps({"choices":[{"delta":{"content": f"Internal error: {e}"}}]}) try: yield f"data: {err_payload}\n\n" yield "data: [DONE]\n\n" except Exception: return # ------------------------- # Endpoints # ------------------------- @app.get("/api/status") def status(): ok = tokenizer is not None and model is not None return {"status":"online" if ok else "degraded", "mode":"Smart Override Enabled", "model_loaded": ok} @app.post("/v1/chat/completions") async def chat_completions(request: Request): try: data = await request.json() messages = data.get("messages", []) return StreamingResponse(generate_response_stream(messages), media_type="text/event-stream") except Exception as e: logger.exception(f"chat_completions endpoint error: {e}") return {"error": str(e)} @app.post("/api/flow-debug") async def flow_debug(request: Request): try: data = await request.json() messages = data.get("messages", []) flow_context = analyze_flow(messages) last_msg = messages[-1].get("content","") if messages else "" vibe_block = get_smart_context(last_msg) m = re.search(r"Aim for ~(\d+)\s+words", vibe_block) min_words = int(m.group(1)) if m else None em = re.search(r"Use\s+(\d+)–(\d+)\s+emoji", vibe_block) emoji_range = (int(em.group(1)), int(em.group(2))) if em else None return JSONResponse({"flow_context": flow_context, "vibe_block": vibe_block, "min_words": min_words, "emoji_range": emoji_range}) except Exception as e: logger.exception("flow-debug error: %s", e) return JSONResponse({"error": str(e)}, status_code=500) # Mount gradio UI (unchanged) try: demo = create_ui(lambda messages: "Use API") app = gr.mount_gradio_app(app, demo, path="/") logger.info("Gradio mounted.") except Exception as e: logger.exception(f"Failed to mount Gradio UI: {e}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))