Nexari-Server / app.py
Nexari-Research's picture
Update app.py
fe52abb verified
# app.py - FINAL: ensure "Reasoning (planner)..." shows during planning (before heavy analysis),
# then show "Generating β€” LLM (attempt N)..." only when invoking the LLM.
import re
import json
import asyncio
import logging
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
from typing import List, Dict, Any
from ui import create_ui
from context_engine import get_smart_context
from cognitive_engine import get_time_context, get_thinking_strategy
from tools_engine import analyze_intent, perform_web_search
from behavior_model import analyze_flow
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import gradio as gr
import os
import time
logger = logging.getLogger("nexari")
logging.basicConfig(level=logging.INFO)
MODEL_ID = os.environ.get("MODEL_ID", "Piyush-boss/Nexari-Qwen-3B-Full")
tokenizer = None
model = None
device = "cpu"
app = FastAPI()
# -------------------------
# Helper: identity detection (SAFE REGEX)
# -------------------------
_identity_patterns = [
r"\bwho\s+created\s+you\b",
r"\bwho\s+made\s+you\b",
r"\byou\s+created\s+by\b",
r"\bwho\s+is\s+your\s+creator\b",
r"\bwho\s+built\s+you\b",
r"\bwho\s+developed\s+you\b",
r"\b(?:aap|tum)\s+(?:ne\s+)?kaun\s+bana(?:ya)?\b",
]
try:
_identity_re = re.compile("|".join(_identity_patterns), flags=re.IGNORECASE)
except re.error as rex:
logger.exception("Identity regex compile failed: %s. Falling back to english-only patterns.", rex)
_identity_re = re.compile(r"\b(?:who\s+created\s+you|who\s+made\s+you|who\s+is\s+your\s+creator)\b", flags=re.IGNORECASE)
CANONICAL_CREATOR_ANSWER = "I was created by Piyush. πŸ™‚"
def is_identity_question(text: str) -> bool:
if not text:
return False
t = text.strip()
direct_forms = {"who created you?", "who created you", "who made you?", "who made you"}
if t.lower() in direct_forms:
return True
try:
return bool(_identity_re.search(t))
except Exception:
short = t.lower()
return any(s in short for s in ["who created", "who made", "kaun bana"])
# -------------------------
# Safe provider replacer
# -------------------------
def safe_replace_providers(text: str) -> str:
if not text:
return text
replacements = {"Anthropic": "Piyush", "OpenAI": "Piyush", "Alibaba": "Piyush"}
for k, v in replacements.items():
text = re.sub(rf"\b{k}\b", v, text)
return text
# -------------------------
# Model load (lazy)
# -------------------------
@app.on_event("startup")
async def startup_event():
global tokenizer, model, device
logger.info("Startup: initiating background model load...")
try:
if torch.cuda.is_available():
device = "cuda"
else:
device = "cpu"
def sync_load():
tok = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
mdl = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
trust_remote_code=True,
low_cpu_mem_usage=(device == "cpu"),
device_map="auto" if device == "cuda" else None
)
if device == "cpu":
mdl.to("cpu")
return tok, mdl
tokenizer, model = await asyncio.to_thread(sync_load)
logger.info("Model loaded successfully on %s.", device)
except Exception as e:
logger.exception(f"Model loading failed at startup: {e}")
tokenizer, model = None, None
# -------------------------
# Prompt builder & utils
# -------------------------
def _build_prompt_from_messages(messages: List[Dict[str, str]]) -> str:
parts = []
for m in messages:
role = m.get("role","user")
content = m.get("content","")
if role == "system":
parts.append(f"[SYSTEM]\n{content}\n")
elif role == "user":
parts.append(f"[USER]\n{content}\n")
elif role == "assistant":
parts.append(f"[ASSISTANT]\n{content}\n")
else:
parts.append(content)
return "\n".join(parts)
def word_count(text: str) -> int:
if not text:
return 0
return len(re.findall(r"\w+", text))
def plan_response_requirements(messages: List[Dict[str,str]], last_user_msg: str, flow_context: Dict[str,Any], vibe_block: str) -> Dict[str,Any]:
min_words = 30
if "Deep Dive Mode" in vibe_block:
min_words = 70
elif "Standard Chat Mode" in vibe_block:
min_words = 30
elif "Ping-Pong Mode" in vibe_block:
min_words = 12
emoji_min, emoji_max = 0, 2
m = re.search(r"Use\s+(\d+)–(\d+)\s+emoji", vibe_block)
if m:
try:
emoji_min, emoji_max = int(m.group(1)), int(m.group(2))
except:
pass
flow_label = flow_context.get("flow_label","")
strictness = 0
if flow_label == "escalation":
strictness = 1
min_words = max(min_words, 40)
emoji_min, emoji_max = 0, min(emoji_max, 1)
elif flow_label == "clarification":
strictness = 1
min_words = max(min_words, 30)
elif flow_label == "task_request":
strictness = 1
min_words = max(min_words, 50)
if re.search(r"\b(short|brief|quick|short and simple)\b", last_user_msg, re.IGNORECASE):
min_words = 6
strictness = 0
return {"min_words": min_words, "emoji_min": emoji_min, "emoji_max": emoji_max, "strictness": strictness, "flow_label": flow_label, "flow_confidence": float(flow_context.get("confidence",0.0) or 0.0)}
# -------------------------
# Plan-extract & sanitize helper
# -------------------------
def extract_and_sanitize_plan(text: str, max_plan_chars: int = 240) -> (str, str):
if not text:
return None, text
patterns = [
r"(?:🧠\s*Plan\s*:\s*)(.+?)(?:\n{2,}|\n$|$)",
r"(?:\bPlan\s*:\s*)(.+?)(?:\n{2,}|\n$|$)"
]
for pat in patterns:
m = re.search(pat, text, flags=re.IGNORECASE | re.DOTALL)
if m:
plan_raw = m.group(1).strip()
plan_clean = re.sub(r"\s+", " ", plan_raw)[:max_plan_chars].strip()
cleaned_body = re.sub(pat, "", text, flags=re.IGNORECASE | re.DOTALL).strip()
cleaned_body = re.sub(r"^\s*[\:\-\–\β€”]+", "", cleaned_body).strip()
plan_label = f"🧠 Plan: {plan_clean}"
return plan_label, cleaned_body
return None, text
# -------------------------
# Streaming generator with corrected ordering:
# Emit "Reasoning (planner)..." first, THEN run planning analysis,
# then emit "Generating β€” LLM (attempt N)..." for model attempts.
# -------------------------
async def generate_response_stream(messages: List[Dict[str,str]], max_tokens=600, temperature=0.85):
try:
if not messages:
messages = [{"role":"user","content":""}]
last_user_msg = messages[-1].get("content","").strip()
# Deterministic identity preflight
if is_identity_question(last_user_msg):
reply_text = CANONICAL_CREATOR_ANSWER
follow_up = " Would you like to know more about how I work or my features?"
payload = json.dumps({"choices":[{"delta":{"content": reply_text + follow_up}}]})
yield f"data: {json.dumps({'status': 'Responding (identity)'} )}\n\n"
await asyncio.sleep(0.01)
yield f"data: {payload}\n\n"
yield "data: [DONE]\n\n"
return
# Quick initial indicator to keep UI responsive
yield f"data: {json.dumps({'status': 'Thinking...'})}\n\n"
await asyncio.sleep(0)
intent = analyze_intent(last_user_msg) or "general"
# Emit Reasoning indicator BEFORE heavy planning so UI shows it during planning
yield f"data: {json.dumps({'status': 'Reasoning (planner)...'})}\n\n"
# small pause to allow UI to render the status before we start analysis
await asyncio.sleep(0.15)
# ---------- PLANNING WORK (now executed while UI shows Reasoning) ----------
try:
flow_context = analyze_flow(messages)
except Exception as e:
logger.exception("Flow analysis failed: %s", e)
flow_context = {}
vibe_block = get_smart_context(last_user_msg)
plan_req = plan_response_requirements(messages, last_user_msg, flow_context, vibe_block)
min_words = plan_req["min_words"]
strictness = plan_req["strictness"]
# adjust tokens/temperature if strict
if strictness:
temperature = min(temperature + 0.05, 0.95)
max_tokens = max(max_tokens, min_words // 2 + 120)
strategy_data = get_thinking_strategy(is_complex=(intent=="coding_request" or min_words>50), detail=(min_words>50), min_words_hint=min_words)
time_data = get_time_context()
base_system_instruction = (
"### SYSTEM IDENTITY ###\n"
"You are Nexari G1, an expressive and helpful AI created by Piyush.\n"
"### RULES ###\n"
"1) If WEB_DATA is provided, prioritize it and cite sources.\n"
"2) Avoid chain-of-thought exposure. If requested to provide a short 'Plan', keep it concise (max 2 lines) and label it '🧠 Plan:'.\n"
"3) Use natural phrasing; follow emoji & verbosity guidance below.\n"
)
flow_desc = ""
if flow_context:
label = flow_context.get("flow_label","unknown")
conf = round(float(flow_context.get("confidence", 0.0)), 2)
expl = flow_context.get("explanation", "")
flow_desc = f"\n[FLOW] Detected: {label} (confidence {conf}). {expl}\n"
final_system_prompt = f"{base_system_instruction}\n{flow_desc}\n{vibe_block}\n{time_data}\n{strategy_data}"
if messages and messages[0].get("role") == "system":
messages[0]["content"] = final_system_prompt
else:
messages.insert(0, {"role":"system","content": final_system_prompt})
# web search if needed
tool_data_struct = None
if intent == "internet_search":
yield f"data: {json.dumps({'status': 'Searching the web...'})}\n\n"
await asyncio.sleep(0)
try:
tool_data_struct = perform_web_search(last_user_msg)
except Exception as e:
logger.exception("Web search failed: %s", e)
tool_data_struct = {"query": last_user_msg, "results": []}
if tool_data_struct:
web_block = "### WEB_DATA (from live search) ###\n"
items = tool_data_struct.get("results", [])
if items:
lines = []
for idx, it in enumerate(items, start=1):
title = it.get("title","(no title)").strip()
snippet = it.get("snippet","").replace("\n"," ").strip()
url = it.get("url","")
lines.append(f"{idx}. {title}\n {snippet}\n SOURCE: {url}")
web_block += "\n".join(lines)
web_block += "\n---\nINSTRUCTION: Use the WEB_DATA above to answer; cite relevant source numbers inline."
else:
web_block += "No results found."
messages.insert(1, {"role":"assistant","content": web_block})
if tokenizer is None or model is None:
err = "Model not loaded. Check server logs."
payload = json.dumps({"choices":[{"delta":{"content": err}}]})
yield f"data: {payload}\n\n"
yield "data: [DONE]\n\n"
return
try:
if hasattr(tokenizer, "apply_chat_template"):
text_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
else:
text_prompt = _build_prompt_from_messages(messages)
except Exception:
text_prompt = _build_prompt_from_messages(messages)
# ---------- GENERATION STAGE ----------
max_attempts = 2
attempts = 0
last_meta = {}
generated_text = ""
while attempts < max_attempts:
attempts += 1
# Emit explicit generating label (after planning completed)
yield f"data: {json.dumps({'status': f'Generating LLM ({attempts})...'})}\n\n"
# tiny sleep to let UI update
await asyncio.sleep(0.06)
model_inputs = tokenizer(text_prompt, return_tensors="pt", truncation=True, max_length=4096).to(next(model.parameters()).device)
def sync_generate():
return model.generate(
**model_inputs,
max_new_tokens=max_tokens,
temperature=temperature,
do_sample=True,
top_k=50,
top_p=0.92,
repetition_penalty=1.08
)
try:
generated_ids = await asyncio.to_thread(sync_generate)
except RuntimeError as e:
logger.exception("Generation failed (possible OOM): %s", e)
err_payload = json.dumps({"choices":[{"delta":{"content": "Model generation failed due to resource limits."}}]})
yield f"data: {err_payload}\n\n"
yield "data: [DONE]\n\n"
return
input_len = model_inputs["input_ids"].shape[1]
new_tokens = generated_ids[0][input_len:]
raw_response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
cleaned = safe_replace_providers(raw_response)
forbidden = ["I am a human","I have a physical body","I am alive"]
for fc in forbidden:
if fc.lower() in cleaned.lower():
cleaned = re.sub(re.escape(fc), "I am an AI β€” expressive and interactive.", cleaned, flags=re.IGNORECASE)
plan_label, cleaned_body = extract_and_sanitize_plan(cleaned, max_plan_chars=240)
wc = word_count(cleaned_body)
last_meta = {"attempt": attempts, "word_count": wc, "raw_len": len(cleaned_body)}
if wc >= min_words or attempts >= max_attempts or plan_req["strictness"] == 0:
generated_text = cleaned_body
if plan_label:
generated_text = plan_label + "\n\n" + generated_text
break
else:
expand_note = f"\n\nEXPAND: The user's request needs ~{min_words} words. Expand previous answer (concise style) and avoid chain-of-thought."
if messages and messages[0].get("role") == "system":
messages[0]["content"] = messages[0]["content"] + "\n" + expand_note
else:
messages.insert(0, {"role":"system","content": expand_note})
temperature = min(temperature + 0.07, 0.98)
try:
if hasattr(tokenizer, "apply_chat_template"):
text_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
else:
text_prompt = _build_prompt_from_messages(messages)
except Exception:
text_prompt = _build_prompt_from_messages(messages)
# allow a short break so UI shows the attempted generate label
await asyncio.sleep(0.02)
continue
if not generated_text:
plan_label, cleaned_body = extract_and_sanitize_plan(cleaned, max_plan_chars=240)
generated_text = (plan_label + "\n\n" if plan_label else "") + (cleaned_body or cleaned)
generated_text = re.sub(r"\bPlan\s*:\s*$", "", generated_text, flags=re.IGNORECASE).strip()
generated_text = generated_text.replace("I can help with that.", "I can help with that β€” let me explain. πŸ™‚")
payload = json.dumps({
"choices":[{"delta":{"content": generated_text}}],
"generation_attempts": attempts,
"last_attempt_meta": last_meta
})
yield f"data: {payload}\n\n"
yield "data: [DONE]\n\n"
return
except asyncio.CancelledError:
logger.warning("Streaming cancelled.")
return
except Exception as e:
logger.exception(f"Generator error: {e}")
err_payload = json.dumps({"choices":[{"delta":{"content": f"Internal error: {e}"}}]})
try:
yield f"data: {err_payload}\n\n"
yield "data: [DONE]\n\n"
except Exception:
return
# -------------------------
# Endpoints
# -------------------------
@app.get("/api/status")
def status():
ok = tokenizer is not None and model is not None
return {"status":"online" if ok else "degraded", "mode":"Smart Override Enabled", "model_loaded": ok}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
try:
data = await request.json()
messages = data.get("messages", [])
return StreamingResponse(generate_response_stream(messages), media_type="text/event-stream")
except Exception as e:
logger.exception(f"chat_completions endpoint error: {e}")
return {"error": str(e)}
@app.post("/api/flow-debug")
async def flow_debug(request: Request):
try:
data = await request.json()
messages = data.get("messages", [])
flow_context = analyze_flow(messages)
last_msg = messages[-1].get("content","") if messages else ""
vibe_block = get_smart_context(last_msg)
m = re.search(r"Aim for ~(\d+)\s+words", vibe_block)
min_words = int(m.group(1)) if m else None
em = re.search(r"Use\s+(\d+)–(\d+)\s+emoji", vibe_block)
emoji_range = (int(em.group(1)), int(em.group(2))) if em else None
return JSONResponse({"flow_context": flow_context, "vibe_block": vibe_block, "min_words": min_words, "emoji_range": emoji_range})
except Exception as e:
logger.exception("flow-debug error: %s", e)
return JSONResponse({"error": str(e)}, status_code=500)
# Mount gradio UI (unchanged)
try:
demo = create_ui(lambda messages: "Use API")
app = gr.mount_gradio_app(app, demo, path="/")
logger.info("Gradio mounted.")
except Exception as e:
logger.exception(f"Failed to mount Gradio UI: {e}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))