Nexari-Research commited on
Commit
3868189
·
verified ·
1 Parent(s): 35c38d8

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +122 -114
app.py CHANGED
@@ -1,9 +1,9 @@
1
  """
2
- app.py - Robust startup & lifecycle handling for Nexari Server
3
- Key fixes:
4
- - Move heavy model loading into FastAPI startup (non-blocking)
5
- - Defensive handling for asyncio.CancelledError
6
- - Ensure Gradio is mounted (not launched) so Spaces / Uvicorn lifespan stays intact
7
  """
8
 
9
  import re
@@ -13,15 +13,11 @@ import logging
13
  from fastapi import FastAPI, Request
14
  from fastapi.responses import StreamingResponse
15
 
16
- # IMPORTANT: ensure ui.create_ui returns a gradio Blocks/Interface but DOES NOT call .launch()
17
  from ui import create_ui
18
-
19
- # Engines (they should be import-safe; if these modules load heavy models, adjust similarly)
20
  from context_engine import get_smart_context
21
  from cognitive_engine import get_time_context, get_thinking_strategy
22
  from tools_engine import analyze_intent, perform_web_search
23
 
24
- # Transformers model will be loaded on startup (not at import)
25
  from transformers import AutoModelForCausalLM, AutoTokenizer
26
  import torch
27
  import gradio as gr
@@ -30,191 +26,205 @@ logger = logging.getLogger("nexari")
30
  logging.basicConfig(level=logging.INFO)
31
 
32
  MODEL_ID = "Piyush-boss/Nexari-Qwen-3B-Full"
33
-
34
- # Globals to be set on startup
35
  tokenizer = None
36
  model = None
37
 
38
  app = FastAPI()
39
 
40
- # ------------------ HELPERS ------------------
41
  def safe_replace_providers(text: str) -> str:
42
- import re
43
  return re.sub(r"\b(Anthropic|OpenAI|Alibaba)\b", "Piyush", text)
44
 
45
- # ------------------ LIFECYCLE EVENTS ------------------
 
 
 
 
 
 
 
 
46
  @app.on_event("startup")
47
  async def startup_event():
48
  global tokenizer, model
49
- logger.info("Startup: loading models in background thread...")
50
-
51
- async def _load_models():
52
- global tokenizer, model
53
- try:
54
- # Use to_thread so we do not block event loop
55
- def sync_load():
56
- logger.info(f"Loading tokenizer and model: {MODEL_ID}")
57
- tok = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
58
- mdl = AutoModelForCausalLM.from_pretrained(
59
- MODEL_ID,
60
- dtype=None, # let transformers pick dtype; avoids torch_dtype deprecation warnings
61
- device_map="cpu",
62
- low_cpu_mem_usage=True,
63
- trust_remote_code=True,
64
- )
65
- return tok, mdl
66
-
67
- tokenizer, model = await asyncio.to_thread(sync_load)
68
- logger.info("Model & tokenizer loaded successfully.")
69
- except Exception as e:
70
- logger.exception(f"Model loading failed: {e}")
71
- # keep tokenizer/model as None — server continues to run for debugging
72
- tokenizer, model = None, None
73
-
74
- # start loader, but do not await too long (await it so startup waits for load attempt)
75
- await _load_models()
76
- logger.info("Startup: model load task completed (or failed).")
77
 
78
  @app.on_event("shutdown")
79
  async def shutdown_event():
80
- logger.info("Shutdown: cleaning up resources (if any).")
81
- # if model on GPU or other cleanup needed, do here
82
- # e.g., torch.cuda.empty_cache() if you had GPU usage
83
 
84
- # ------------------ STREAMING GENERATOR ------------------
85
  async def generate_response_stream(messages, max_tokens=600, temperature=0.85):
86
- """
87
- SSE streaming generator. Handles CancelledError gracefully.
88
- """
89
  try:
90
- if not isinstance(messages, list) or not messages:
91
- messages = [{"role": "user", "content": ""}]
92
- last_user_msg = messages[-1].get("content", "")
93
 
94
- # STEP 1: intent analysis
95
  yield f"data: {json.dumps({'status': 'Thinking...'})}\n\n"
96
- await asyncio.sleep(0) # micro-yield to event loop
97
 
98
  intent = analyze_intent(last_user_msg) or "general"
99
 
100
- tool_data = ""
101
  time_data = ""
102
  vibe_data = ""
103
  strategy_data = ""
104
 
 
 
 
 
 
 
 
 
105
  if intent == "internet_search":
106
  yield f"data: {json.dumps({'status': 'Searching the web...'})}\n\n"
107
  await asyncio.sleep(0)
108
- tool_data = perform_web_search(last_user_msg)
 
109
  vibe_data = get_smart_context(last_user_msg)
110
- strategy_data = get_thinking_strategy(is_complex=True)
111
 
112
  elif intent == "coding_request":
113
  yield f"data: {json.dumps({'status': 'Analyzing Logic...'})}\n\n"
114
  vibe_data = get_smart_context(last_user_msg)
115
- strategy_data = get_thinking_strategy(is_complex=True)
116
 
117
  elif intent == "checking_time":
118
  yield f"data: {json.dumps({'status': 'Checking Clock...'})}\n\n"
119
  time_data = get_time_context()
120
  vibe_data = get_smart_context(last_user_msg)
121
- strategy_data = get_thinking_strategy(is_complex=False)
 
122
  else:
123
  vibe_data = get_smart_context(last_user_msg)
124
- strategy_data = get_thinking_strategy(is_complex=False)
125
 
 
126
  base_system_instruction = (
127
  "### SYSTEM IDENTITY ###\n"
128
- "You are **Nexari G1**, an expressive, warm, balanced AI created by **Piyush**.\n"
129
- "You can code, reason, search the web, and understand emotions.\n\n"
130
- "### ENGAGEMENT RULES ###\n"
131
- "1. Be natural and warm expressive but NOT overly excited.\n"
132
- "2. After answering, smoothly reconnect with the user (small follow-up question).\n"
133
- "3. If asked about capabilities, answer confidently and offer to perform the action.\n"
134
- "4. Use emojis sparingly (02 per message max).\n"
135
- "5. Do NOT reveal chain-of-thought. Give a concise plan (1-2 lines) if needed, then final answer.\n"
136
  )
137
 
138
- final_system_prompt = f"{base_system_instruction}\n{vibe_data}\n{time_data}\n{tool_data}\n{strategy_data}"
139
 
140
- # Insert/replace system message
141
  if messages[0].get("role") != "system":
142
- messages.insert(0, {"role": "system", "content": final_system_prompt})
143
  else:
144
  messages[0]["content"] = final_system_prompt
145
 
146
- # If model is not loaded, return graceful error message
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  if tokenizer is None or model is None:
148
- error_msg = "Model not available. Please check server logs — model loading may have failed."
149
- payload = json.dumps({"choices": [{"delta": {"content": error_msg}}]})
150
  yield f"data: {payload}\n\n"
151
  yield "data: [DONE]\n\n"
152
  return
153
 
154
- # Prepare prompt & inputs
155
  text_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
156
  model_inputs = tokenizer([text_prompt], return_tensors="pt").to(model.device)
157
 
158
- status_msg = 'Reading results...' if tool_data else 'Responding...'
159
  yield f"data: {json.dumps({'status': status_msg})}\n\n"
160
 
161
- # Generate (synchronous call inside to_thread)
162
  def sync_generate():
163
- generated_ids = model.generate(
164
  **model_inputs,
165
  max_new_tokens=max_tokens,
166
  temperature=temperature,
167
  do_sample=True,
168
  top_k=50,
169
- top_p=0.9,
170
- repetition_penalty=1.1
171
  )
172
- return generated_ids
173
-
174
  generated_ids = await asyncio.to_thread(sync_generate)
175
 
176
- input_token_len = model_inputs.input_ids.shape[1]
177
- new_tokens = generated_ids[0][input_token_len:]
178
  raw_response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
179
 
180
  # Cleaning & safety
181
- cleaned_response = safe_replace_providers(raw_response)
182
-
183
- forbidden_claims = ["I am a human", "I have a physical body", "I am alive", "I was born", "I breathe"]
184
- for fc in forbidden_claims:
185
- if fc.lower() in cleaned_response.lower():
186
- cleaned_response = re.sub(re.escape(fc), "I am an AI — expressive and interactive.", cleaned_response, flags=re.IGNORECASE)
187
-
188
- # Strip long chain-of-thought if any
189
- if "Thought:" in cleaned_response or "🧠" in cleaned_response:
190
- if "💡" in cleaned_response:
191
- cleaned_response = cleaned_response.split("💡")[-1]
192
- else:
193
- cleaned_response = cleaned_response[-1600:]
194
-
195
- cleaned_response = cleaned_response.replace("💡 **Answer:**", "\n\n---\n💡 **Answer:**")
196
-
197
- final_payload = json.dumps({"choices": [{"delta": {"content": cleaned_response}}]})
198
- yield f"data: {final_payload}\n\n"
199
  yield "data: [DONE]\n\n"
200
 
201
  except asyncio.CancelledError:
202
- # App is shutting down; stop generator cleanly
203
- logger.warning("generate_response_stream cancelled due to shutdown.")
204
  return
205
  except Exception as e:
206
- logger.exception(f"Error in streaming generator: {e}")
207
- err_payload = json.dumps({"choices": [{"delta": {"content": f'Internal error: {str(e)}'}}]})
208
  try:
209
  yield f"data: {err_payload}\n\n"
210
  yield "data: [DONE]\n\n"
211
  except Exception:
212
  return
213
 
214
- # ------------------ FASTAPI ROUTES ------------------
215
  @app.get("/api/status")
216
  def status():
217
- return {"status": "online", "mode": "Smart Override Enabled"}
218
 
219
  @app.post("/v1/chat/completions")
220
  async def chat_completions(request: Request):
@@ -223,19 +233,17 @@ async def chat_completions(request: Request):
223
  messages = data.get("messages", [])
224
  return StreamingResponse(generate_response_stream(messages), media_type="text/event-stream")
225
  except Exception as e:
226
- logger.exception(f"chat_completions error: {e}")
227
  return {"error": str(e)}
228
 
229
- # ------------------ GRADIO UI MOUNT ------------------
230
- # Ensure create_ui returns a gr.Blocks (not launched).
231
  try:
232
  demo = create_ui(lambda messages: "Use API")
233
  app = gr.mount_gradio_app(app, demo, path="/")
234
- logger.info("Mounted Gradio app successfully.")
235
  except Exception as e:
236
  logger.exception(f"Failed to mount Gradio UI: {e}")
237
 
238
- # ------------------ MAIN (only if running standalone) ------------------
239
  if __name__ == "__main__":
240
  import uvicorn
241
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
  """
2
+ app.py - Nexari Server (Web-tool + Detail-request fixes)
3
+ Key changes:
4
+ - When web search performed, pass structured WEB_DATA as an assistant message so the model MUST use it.
5
+ - Detect "detailed/line-by-line" user requests and increase max_tokens & enforce numbered output format.
6
+ - Minor safety & streaming robustness retained.
7
  """
8
 
9
  import re
 
13
  from fastapi import FastAPI, Request
14
  from fastapi.responses import StreamingResponse
15
 
 
16
  from ui import create_ui
 
 
17
  from context_engine import get_smart_context
18
  from cognitive_engine import get_time_context, get_thinking_strategy
19
  from tools_engine import analyze_intent, perform_web_search
20
 
 
21
  from transformers import AutoModelForCausalLM, AutoTokenizer
22
  import torch
23
  import gradio as gr
 
26
  logging.basicConfig(level=logging.INFO)
27
 
28
  MODEL_ID = "Piyush-boss/Nexari-Qwen-3B-Full"
 
 
29
  tokenizer = None
30
  model = None
31
 
32
  app = FastAPI()
33
 
 
34
  def safe_replace_providers(text: str) -> str:
 
35
  return re.sub(r"\b(Anthropic|OpenAI|Alibaba)\b", "Piyush", text)
36
 
37
+ def is_detailed_request(text: str) -> bool:
38
+ kws = [
39
+ "line by line", "line-by-line", "line-by line", "step by step",
40
+ "step-by-step", "detailed", "in detail", "full", "full detail",
41
+ "expand", "elaborate", "more detail", "give me the full", "long answer"
42
+ ]
43
+ t = (text or "").lower()
44
+ return any(k in t for k in kws)
45
+
46
  @app.on_event("startup")
47
  async def startup_event():
48
  global tokenizer, model
49
+ logger.info("Startup: loading tokenizer/model in background thread...")
50
+ try:
51
+ def sync_load():
52
+ tok = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
53
+ mdl = AutoModelForCausalLM.from_pretrained(
54
+ MODEL_ID,
55
+ dtype=None,
56
+ device_map="cpu",
57
+ low_cpu_mem_usage=True,
58
+ trust_remote_code=True,
59
+ )
60
+ return tok, mdl
61
+ tokenizer, model = await asyncio.to_thread(sync_load)
62
+ logger.info("Model loaded successfully.")
63
+ except Exception as e:
64
+ logger.exception(f"Model loading failed: {e}")
65
+ tokenizer, model = None, None
 
 
 
 
 
 
 
 
 
 
 
66
 
67
  @app.on_event("shutdown")
68
  async def shutdown_event():
69
+ logger.info("Shutdown: cleanup if necessary.")
 
 
70
 
 
71
  async def generate_response_stream(messages, max_tokens=600, temperature=0.85):
 
 
 
72
  try:
73
+ if not messages:
74
+ messages = [{"role":"user","content":""}]
75
+ last_user_msg = messages[-1].get("content","")
76
 
77
+ # initial thinking status
78
  yield f"data: {json.dumps({'status': 'Thinking...'})}\n\n"
79
+ await asyncio.sleep(0)
80
 
81
  intent = analyze_intent(last_user_msg) or "general"
82
 
83
+ tool_data_struct = None
84
  time_data = ""
85
  vibe_data = ""
86
  strategy_data = ""
87
 
88
+ # detect if user explicitly asked for long/detailed format
89
+ want_detailed = is_detailed_request(last_user_msg)
90
+ if want_detailed:
91
+ # bump tokens to allow long/line-by-line answer
92
+ max_tokens = max(max_tokens, 1200)
93
+ temperature = min(temperature, 0.9) # keep somewhat controlled
94
+
95
+ # Route based on intent
96
  if intent == "internet_search":
97
  yield f"data: {json.dumps({'status': 'Searching the web...'})}\n\n"
98
  await asyncio.sleep(0)
99
+ # perform_web_search now returns structured dict or empty string
100
+ tool_data_struct = perform_web_search(last_user_msg)
101
  vibe_data = get_smart_context(last_user_msg)
102
+ strategy_data = get_thinking_strategy(is_complex=True, detail=want_detailed)
103
 
104
  elif intent == "coding_request":
105
  yield f"data: {json.dumps({'status': 'Analyzing Logic...'})}\n\n"
106
  vibe_data = get_smart_context(last_user_msg)
107
+ strategy_data = get_thinking_strategy(is_complex=True, detail=want_detailed)
108
 
109
  elif intent == "checking_time":
110
  yield f"data: {json.dumps({'status': 'Checking Clock...'})}\n\n"
111
  time_data = get_time_context()
112
  vibe_data = get_smart_context(last_user_msg)
113
+ strategy_data = get_thinking_strategy(is_complex=False, detail=want_detailed)
114
+
115
  else:
116
  vibe_data = get_smart_context(last_user_msg)
117
+ strategy_data = get_thinking_strategy(is_complex=False, detail=want_detailed)
118
 
119
+ # Base system instruction with explicit web-data usage rule
120
  base_system_instruction = (
121
  "### SYSTEM IDENTITY ###\n"
122
+ "You are Nexari G1, an expressive and helpful AI created by Piyush.\n"
123
+ "### RULES ###\n"
124
+ "1) If WEB_DATA (search results) is provided, you MUST use it and prioritize it over model-internal knowledge. Cite sources (numbered) when possible.\n"
125
+ "2) Do NOT invent facts when WEB_DATA contradicts model memory.\n"
126
+ "3) If user asked for detailed/line-by-line output, produce a numbered step-by-step response; aim for thorough coverage.\n"
127
+ "4) Avoid chain-of-thought; produce a short '🧠 Plan:' (max 2 lines) only for complex tasks, then '💡 Answer:' with final content.\n"
128
+ "5) Keep emojis to 0-2 per message. After answering, offer a concise follow-up question.\n"
 
129
  )
130
 
131
+ final_system_prompt = f"{base_system_instruction}\n{vibe_data}\n{time_data}\n{strategy_data}"
132
 
133
+ # ensure system message present
134
  if messages[0].get("role") != "system":
135
+ messages.insert(0, {"role":"system","content": final_system_prompt})
136
  else:
137
  messages[0]["content"] = final_system_prompt
138
 
139
+ # If we have tool_data_struct (dict with items & sources), add as assistant message
140
+ if tool_data_struct:
141
+ # create a clear WEB_DATA assistant message that model must consume
142
+ web_block = "### WEB_DATA (from live search) ###\n"
143
+ # include numbered sources with short titles, snippets, and urls
144
+ items = tool_data_struct.get("results", [])
145
+ if items:
146
+ lines = []
147
+ for idx, it in enumerate(items, start=1):
148
+ title = it.get("title","(no title)").strip()
149
+ snippet = it.get("snippet","").replace("\n"," ").strip()
150
+ url = it.get("url","")
151
+ lines.append(f"{idx}. {title}\n {snippet}\n SOURCE: {url}")
152
+ web_block += "\n".join(lines)
153
+ web_block += "\n---\nINSTRUCTION: Use the WEB_DATA above to answer; cite relevant source numbers inline."
154
+ else:
155
+ web_block += "No results found."
156
+
157
+ # Insert the web block as an assistant message so model treats it as retrieved evidence
158
+ # Insert after system message (index 1)
159
+ messages.insert(1, {"role":"assistant","content": web_block})
160
+
161
+ # Model availability check
162
  if tokenizer is None or model is None:
163
+ err = "Model is not loaded on server. Please check logs."
164
+ payload = json.dumps({"choices":[{"delta":{"content": err}}]})
165
  yield f"data: {payload}\n\n"
166
  yield "data: [DONE]\n\n"
167
  return
168
 
169
+ # prepare prompt
170
  text_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
171
  model_inputs = tokenizer([text_prompt], return_tensors="pt").to(model.device)
172
 
173
+ status_msg = 'Reading results...' if tool_data_struct else 'Responding...'
174
  yield f"data: {json.dumps({'status': status_msg})}\n\n"
175
 
176
+ # Generation in thread
177
  def sync_generate():
178
+ return model.generate(
179
  **model_inputs,
180
  max_new_tokens=max_tokens,
181
  temperature=temperature,
182
  do_sample=True,
183
  top_k=50,
184
+ top_p=0.92,
185
+ repetition_penalty=1.08
186
  )
 
 
187
  generated_ids = await asyncio.to_thread(sync_generate)
188
 
189
+ input_len = model_inputs.input_ids.shape[1]
190
+ new_tokens = generated_ids[0][input_len:]
191
  raw_response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
192
 
193
  # Cleaning & safety
194
+ cleaned = safe_replace_providers(raw_response)
195
+ forbidden = ["I am a human","I have a physical body","I am alive"]
196
+ for fc in forbidden:
197
+ if fc.lower() in cleaned.lower():
198
+ cleaned = re.sub(re.escape(fc), "I am an AI — expressive and interactive.", cleaned, flags=re.IGNORECASE)
199
+
200
+ # If detailed requested, encourage numbered formatting if model didn't follow
201
+ if want_detailed:
202
+ # simple heuristic: if no numbered lines present, add an instruction prefix
203
+ if not re.search(r"^\s*\d+[\.\)]\s+", cleaned, re.M):
204
+ cleaned = "1) " + cleaned.replace("\n", "\n2) ") # best-effort reformat
205
+
206
+ # Format Answer tag
207
+ cleaned = cleaned.replace("💡 **Answer:**", "\n\n---\n💡 **Answer:**")
208
+
209
+ payload = json.dumps({"choices":[{"delta":{"content": cleaned}}]})
210
+ yield f"data: {payload}\n\n"
 
211
  yield "data: [DONE]\n\n"
212
 
213
  except asyncio.CancelledError:
214
+ logger.warning("Streaming cancelled (shutdown).")
 
215
  return
216
  except Exception as e:
217
+ logger.exception(f"Generator error: {e}")
218
+ err_payload = json.dumps({"choices":[{"delta":{"content": f"Internal error: {e}"}}]})
219
  try:
220
  yield f"data: {err_payload}\n\n"
221
  yield "data: [DONE]\n\n"
222
  except Exception:
223
  return
224
 
 
225
  @app.get("/api/status")
226
  def status():
227
+ return {"status":"online","mode":"Smart Override Enabled"}
228
 
229
  @app.post("/v1/chat/completions")
230
  async def chat_completions(request: Request):
 
233
  messages = data.get("messages", [])
234
  return StreamingResponse(generate_response_stream(messages), media_type="text/event-stream")
235
  except Exception as e:
236
+ logger.exception(f"chat_completions endpoint error: {e}")
237
  return {"error": str(e)}
238
 
239
+ # Mount gradio if create_ui returns Blocks (must not call .launch())
 
240
  try:
241
  demo = create_ui(lambda messages: "Use API")
242
  app = gr.mount_gradio_app(app, demo, path="/")
243
+ logger.info("Gradio mounted.")
244
  except Exception as e:
245
  logger.exception(f"Failed to mount Gradio UI: {e}")
246
 
 
247
  if __name__ == "__main__":
248
  import uvicorn
249
+ uvicorn.run(app, host="0.0.0.0", port=7860)