Spaces:
Sleeping
Sleeping
| # app.py - FINAL: ensure "Reasoning (planner)..." shows during planning (before heavy analysis), | |
| # then show "Generating β LLM (attempt N)..." only when invoking the LLM. | |
| import re | |
| import json | |
| import asyncio | |
| import logging | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| from typing import List, Dict, Any | |
| from ui import create_ui | |
| from context_engine import get_smart_context | |
| from cognitive_engine import get_time_context, get_thinking_strategy | |
| from tools_engine import analyze_intent, perform_web_search | |
| from behavior_model import analyze_flow | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| import torch | |
| import gradio as gr | |
| import os | |
| import time | |
| logger = logging.getLogger("nexari") | |
| logging.basicConfig(level=logging.INFO) | |
| MODEL_ID = os.environ.get("MODEL_ID", "Piyush-boss/Nexari-Qwen-3B-Full") | |
| tokenizer = None | |
| model = None | |
| device = "cpu" | |
| app = FastAPI() | |
| # ------------------------- | |
| # Helper: identity detection (SAFE REGEX) | |
| # ------------------------- | |
| _identity_patterns = [ | |
| r"\bwho\s+created\s+you\b", | |
| r"\bwho\s+made\s+you\b", | |
| r"\byou\s+created\s+by\b", | |
| r"\bwho\s+is\s+your\s+creator\b", | |
| r"\bwho\s+built\s+you\b", | |
| r"\bwho\s+developed\s+you\b", | |
| r"\b(?:aap|tum)\s+(?:ne\s+)?kaun\s+bana(?:ya)?\b", | |
| ] | |
| try: | |
| _identity_re = re.compile("|".join(_identity_patterns), flags=re.IGNORECASE) | |
| except re.error as rex: | |
| logger.exception("Identity regex compile failed: %s. Falling back to english-only patterns.", rex) | |
| _identity_re = re.compile(r"\b(?:who\s+created\s+you|who\s+made\s+you|who\s+is\s+your\s+creator)\b", flags=re.IGNORECASE) | |
| CANONICAL_CREATOR_ANSWER = "I was created by Piyush. π" | |
| def is_identity_question(text: str) -> bool: | |
| if not text: | |
| return False | |
| t = text.strip() | |
| direct_forms = {"who created you?", "who created you", "who made you?", "who made you"} | |
| if t.lower() in direct_forms: | |
| return True | |
| try: | |
| return bool(_identity_re.search(t)) | |
| except Exception: | |
| short = t.lower() | |
| return any(s in short for s in ["who created", "who made", "kaun bana"]) | |
| # ------------------------- | |
| # Safe provider replacer | |
| # ------------------------- | |
| def safe_replace_providers(text: str) -> str: | |
| if not text: | |
| return text | |
| replacements = {"Anthropic": "Piyush", "OpenAI": "Piyush", "Alibaba": "Piyush"} | |
| for k, v in replacements.items(): | |
| text = re.sub(rf"\b{k}\b", v, text) | |
| return text | |
| # ------------------------- | |
| # Model load (lazy) | |
| # ------------------------- | |
| async def startup_event(): | |
| global tokenizer, model, device | |
| logger.info("Startup: initiating background model load...") | |
| try: | |
| if torch.cuda.is_available(): | |
| device = "cuda" | |
| else: | |
| device = "cpu" | |
| def sync_load(): | |
| tok = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) | |
| mdl = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=(device == "cpu"), | |
| device_map="auto" if device == "cuda" else None | |
| ) | |
| if device == "cpu": | |
| mdl.to("cpu") | |
| return tok, mdl | |
| tokenizer, model = await asyncio.to_thread(sync_load) | |
| logger.info("Model loaded successfully on %s.", device) | |
| except Exception as e: | |
| logger.exception(f"Model loading failed at startup: {e}") | |
| tokenizer, model = None, None | |
| # ------------------------- | |
| # Prompt builder & utils | |
| # ------------------------- | |
| def _build_prompt_from_messages(messages: List[Dict[str, str]]) -> str: | |
| parts = [] | |
| for m in messages: | |
| role = m.get("role","user") | |
| content = m.get("content","") | |
| if role == "system": | |
| parts.append(f"[SYSTEM]\n{content}\n") | |
| elif role == "user": | |
| parts.append(f"[USER]\n{content}\n") | |
| elif role == "assistant": | |
| parts.append(f"[ASSISTANT]\n{content}\n") | |
| else: | |
| parts.append(content) | |
| return "\n".join(parts) | |
| def word_count(text: str) -> int: | |
| if not text: | |
| return 0 | |
| return len(re.findall(r"\w+", text)) | |
| def plan_response_requirements(messages: List[Dict[str,str]], last_user_msg: str, flow_context: Dict[str,Any], vibe_block: str) -> Dict[str,Any]: | |
| min_words = 30 | |
| if "Deep Dive Mode" in vibe_block: | |
| min_words = 70 | |
| elif "Standard Chat Mode" in vibe_block: | |
| min_words = 30 | |
| elif "Ping-Pong Mode" in vibe_block: | |
| min_words = 12 | |
| emoji_min, emoji_max = 0, 2 | |
| m = re.search(r"Use\s+(\d+)β(\d+)\s+emoji", vibe_block) | |
| if m: | |
| try: | |
| emoji_min, emoji_max = int(m.group(1)), int(m.group(2)) | |
| except: | |
| pass | |
| flow_label = flow_context.get("flow_label","") | |
| strictness = 0 | |
| if flow_label == "escalation": | |
| strictness = 1 | |
| min_words = max(min_words, 40) | |
| emoji_min, emoji_max = 0, min(emoji_max, 1) | |
| elif flow_label == "clarification": | |
| strictness = 1 | |
| min_words = max(min_words, 30) | |
| elif flow_label == "task_request": | |
| strictness = 1 | |
| min_words = max(min_words, 50) | |
| if re.search(r"\b(short|brief|quick|short and simple)\b", last_user_msg, re.IGNORECASE): | |
| min_words = 6 | |
| strictness = 0 | |
| return {"min_words": min_words, "emoji_min": emoji_min, "emoji_max": emoji_max, "strictness": strictness, "flow_label": flow_label, "flow_confidence": float(flow_context.get("confidence",0.0) or 0.0)} | |
| # ------------------------- | |
| # Plan-extract & sanitize helper | |
| # ------------------------- | |
| def extract_and_sanitize_plan(text: str, max_plan_chars: int = 240) -> (str, str): | |
| if not text: | |
| return None, text | |
| patterns = [ | |
| r"(?:π§ \s*Plan\s*:\s*)(.+?)(?:\n{2,}|\n$|$)", | |
| r"(?:\bPlan\s*:\s*)(.+?)(?:\n{2,}|\n$|$)" | |
| ] | |
| for pat in patterns: | |
| m = re.search(pat, text, flags=re.IGNORECASE | re.DOTALL) | |
| if m: | |
| plan_raw = m.group(1).strip() | |
| plan_clean = re.sub(r"\s+", " ", plan_raw)[:max_plan_chars].strip() | |
| cleaned_body = re.sub(pat, "", text, flags=re.IGNORECASE | re.DOTALL).strip() | |
| cleaned_body = re.sub(r"^\s*[\:\-\β\β]+", "", cleaned_body).strip() | |
| plan_label = f"π§ Plan: {plan_clean}" | |
| return plan_label, cleaned_body | |
| return None, text | |
| # ------------------------- | |
| # Streaming generator with corrected ordering: | |
| # Emit "Reasoning (planner)..." first, THEN run planning analysis, | |
| # then emit "Generating β LLM (attempt N)..." for model attempts. | |
| # ------------------------- | |
| async def generate_response_stream(messages: List[Dict[str,str]], max_tokens=600, temperature=0.85): | |
| try: | |
| if not messages: | |
| messages = [{"role":"user","content":""}] | |
| last_user_msg = messages[-1].get("content","").strip() | |
| # Deterministic identity preflight | |
| if is_identity_question(last_user_msg): | |
| reply_text = CANONICAL_CREATOR_ANSWER | |
| follow_up = " Would you like to know more about how I work or my features?" | |
| payload = json.dumps({"choices":[{"delta":{"content": reply_text + follow_up}}]}) | |
| yield f"data: {json.dumps({'status': 'Responding (identity)'} )}\n\n" | |
| await asyncio.sleep(0.01) | |
| yield f"data: {payload}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return | |
| # Quick initial indicator to keep UI responsive | |
| yield f"data: {json.dumps({'status': 'Thinking...'})}\n\n" | |
| await asyncio.sleep(0) | |
| intent = analyze_intent(last_user_msg) or "general" | |
| # Emit Reasoning indicator BEFORE heavy planning so UI shows it during planning | |
| yield f"data: {json.dumps({'status': 'Reasoning (planner)...'})}\n\n" | |
| # small pause to allow UI to render the status before we start analysis | |
| await asyncio.sleep(0.15) | |
| # ---------- PLANNING WORK (now executed while UI shows Reasoning) ---------- | |
| try: | |
| flow_context = analyze_flow(messages) | |
| except Exception as e: | |
| logger.exception("Flow analysis failed: %s", e) | |
| flow_context = {} | |
| vibe_block = get_smart_context(last_user_msg) | |
| plan_req = plan_response_requirements(messages, last_user_msg, flow_context, vibe_block) | |
| min_words = plan_req["min_words"] | |
| strictness = plan_req["strictness"] | |
| # adjust tokens/temperature if strict | |
| if strictness: | |
| temperature = min(temperature + 0.05, 0.95) | |
| max_tokens = max(max_tokens, min_words // 2 + 120) | |
| strategy_data = get_thinking_strategy(is_complex=(intent=="coding_request" or min_words>50), detail=(min_words>50), min_words_hint=min_words) | |
| time_data = get_time_context() | |
| base_system_instruction = ( | |
| "### SYSTEM IDENTITY ###\n" | |
| "You are Nexari G1, an expressive and helpful AI created by Piyush.\n" | |
| "### RULES ###\n" | |
| "1) If WEB_DATA is provided, prioritize it and cite sources.\n" | |
| "2) Avoid chain-of-thought exposure. If requested to provide a short 'Plan', keep it concise (max 2 lines) and label it 'π§ Plan:'.\n" | |
| "3) Use natural phrasing; follow emoji & verbosity guidance below.\n" | |
| ) | |
| flow_desc = "" | |
| if flow_context: | |
| label = flow_context.get("flow_label","unknown") | |
| conf = round(float(flow_context.get("confidence", 0.0)), 2) | |
| expl = flow_context.get("explanation", "") | |
| flow_desc = f"\n[FLOW] Detected: {label} (confidence {conf}). {expl}\n" | |
| final_system_prompt = f"{base_system_instruction}\n{flow_desc}\n{vibe_block}\n{time_data}\n{strategy_data}" | |
| if messages and messages[0].get("role") == "system": | |
| messages[0]["content"] = final_system_prompt | |
| else: | |
| messages.insert(0, {"role":"system","content": final_system_prompt}) | |
| # web search if needed | |
| tool_data_struct = None | |
| if intent == "internet_search": | |
| yield f"data: {json.dumps({'status': 'Searching the web...'})}\n\n" | |
| await asyncio.sleep(0) | |
| try: | |
| tool_data_struct = perform_web_search(last_user_msg) | |
| except Exception as e: | |
| logger.exception("Web search failed: %s", e) | |
| tool_data_struct = {"query": last_user_msg, "results": []} | |
| if tool_data_struct: | |
| web_block = "### WEB_DATA (from live search) ###\n" | |
| items = tool_data_struct.get("results", []) | |
| if items: | |
| lines = [] | |
| for idx, it in enumerate(items, start=1): | |
| title = it.get("title","(no title)").strip() | |
| snippet = it.get("snippet","").replace("\n"," ").strip() | |
| url = it.get("url","") | |
| lines.append(f"{idx}. {title}\n {snippet}\n SOURCE: {url}") | |
| web_block += "\n".join(lines) | |
| web_block += "\n---\nINSTRUCTION: Use the WEB_DATA above to answer; cite relevant source numbers inline." | |
| else: | |
| web_block += "No results found." | |
| messages.insert(1, {"role":"assistant","content": web_block}) | |
| if tokenizer is None or model is None: | |
| err = "Model not loaded. Check server logs." | |
| payload = json.dumps({"choices":[{"delta":{"content": err}}]}) | |
| yield f"data: {payload}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return | |
| try: | |
| if hasattr(tokenizer, "apply_chat_template"): | |
| text_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| else: | |
| text_prompt = _build_prompt_from_messages(messages) | |
| except Exception: | |
| text_prompt = _build_prompt_from_messages(messages) | |
| # ---------- GENERATION STAGE ---------- | |
| max_attempts = 2 | |
| attempts = 0 | |
| last_meta = {} | |
| generated_text = "" | |
| while attempts < max_attempts: | |
| attempts += 1 | |
| # Emit explicit generating label (after planning completed) | |
| yield f"data: {json.dumps({'status': f'Generating LLM ({attempts})...'})}\n\n" | |
| # tiny sleep to let UI update | |
| await asyncio.sleep(0.06) | |
| model_inputs = tokenizer(text_prompt, return_tensors="pt", truncation=True, max_length=4096).to(next(model.parameters()).device) | |
| def sync_generate(): | |
| return model.generate( | |
| **model_inputs, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| do_sample=True, | |
| top_k=50, | |
| top_p=0.92, | |
| repetition_penalty=1.08 | |
| ) | |
| try: | |
| generated_ids = await asyncio.to_thread(sync_generate) | |
| except RuntimeError as e: | |
| logger.exception("Generation failed (possible OOM): %s", e) | |
| err_payload = json.dumps({"choices":[{"delta":{"content": "Model generation failed due to resource limits."}}]}) | |
| yield f"data: {err_payload}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return | |
| input_len = model_inputs["input_ids"].shape[1] | |
| new_tokens = generated_ids[0][input_len:] | |
| raw_response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip() | |
| cleaned = safe_replace_providers(raw_response) | |
| forbidden = ["I am a human","I have a physical body","I am alive"] | |
| for fc in forbidden: | |
| if fc.lower() in cleaned.lower(): | |
| cleaned = re.sub(re.escape(fc), "I am an AI β expressive and interactive.", cleaned, flags=re.IGNORECASE) | |
| plan_label, cleaned_body = extract_and_sanitize_plan(cleaned, max_plan_chars=240) | |
| wc = word_count(cleaned_body) | |
| last_meta = {"attempt": attempts, "word_count": wc, "raw_len": len(cleaned_body)} | |
| if wc >= min_words or attempts >= max_attempts or plan_req["strictness"] == 0: | |
| generated_text = cleaned_body | |
| if plan_label: | |
| generated_text = plan_label + "\n\n" + generated_text | |
| break | |
| else: | |
| expand_note = f"\n\nEXPAND: The user's request needs ~{min_words} words. Expand previous answer (concise style) and avoid chain-of-thought." | |
| if messages and messages[0].get("role") == "system": | |
| messages[0]["content"] = messages[0]["content"] + "\n" + expand_note | |
| else: | |
| messages.insert(0, {"role":"system","content": expand_note}) | |
| temperature = min(temperature + 0.07, 0.98) | |
| try: | |
| if hasattr(tokenizer, "apply_chat_template"): | |
| text_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| else: | |
| text_prompt = _build_prompt_from_messages(messages) | |
| except Exception: | |
| text_prompt = _build_prompt_from_messages(messages) | |
| # allow a short break so UI shows the attempted generate label | |
| await asyncio.sleep(0.02) | |
| continue | |
| if not generated_text: | |
| plan_label, cleaned_body = extract_and_sanitize_plan(cleaned, max_plan_chars=240) | |
| generated_text = (plan_label + "\n\n" if plan_label else "") + (cleaned_body or cleaned) | |
| generated_text = re.sub(r"\bPlan\s*:\s*$", "", generated_text, flags=re.IGNORECASE).strip() | |
| generated_text = generated_text.replace("I can help with that.", "I can help with that β let me explain. π") | |
| payload = json.dumps({ | |
| "choices":[{"delta":{"content": generated_text}}], | |
| "generation_attempts": attempts, | |
| "last_attempt_meta": last_meta | |
| }) | |
| yield f"data: {payload}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return | |
| except asyncio.CancelledError: | |
| logger.warning("Streaming cancelled.") | |
| return | |
| except Exception as e: | |
| logger.exception(f"Generator error: {e}") | |
| err_payload = json.dumps({"choices":[{"delta":{"content": f"Internal error: {e}"}}]}) | |
| try: | |
| yield f"data: {err_payload}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception: | |
| return | |
| # ------------------------- | |
| # Endpoints | |
| # ------------------------- | |
| def status(): | |
| ok = tokenizer is not None and model is not None | |
| return {"status":"online" if ok else "degraded", "mode":"Smart Override Enabled", "model_loaded": ok} | |
| async def chat_completions(request: Request): | |
| try: | |
| data = await request.json() | |
| messages = data.get("messages", []) | |
| return StreamingResponse(generate_response_stream(messages), media_type="text/event-stream") | |
| except Exception as e: | |
| logger.exception(f"chat_completions endpoint error: {e}") | |
| return {"error": str(e)} | |
| async def flow_debug(request: Request): | |
| try: | |
| data = await request.json() | |
| messages = data.get("messages", []) | |
| flow_context = analyze_flow(messages) | |
| last_msg = messages[-1].get("content","") if messages else "" | |
| vibe_block = get_smart_context(last_msg) | |
| m = re.search(r"Aim for ~(\d+)\s+words", vibe_block) | |
| min_words = int(m.group(1)) if m else None | |
| em = re.search(r"Use\s+(\d+)β(\d+)\s+emoji", vibe_block) | |
| emoji_range = (int(em.group(1)), int(em.group(2))) if em else None | |
| return JSONResponse({"flow_context": flow_context, "vibe_block": vibe_block, "min_words": min_words, "emoji_range": emoji_range}) | |
| except Exception as e: | |
| logger.exception("flow-debug error: %s", e) | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # Mount gradio UI (unchanged) | |
| try: | |
| demo = create_ui(lambda messages: "Use API") | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| logger.info("Gradio mounted.") | |
| except Exception as e: | |
| logger.exception(f"Failed to mount Gradio UI: {e}") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |