Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import subprocess | |
| import tempfile | |
| # subprocess.run('pip install flash-attn==2.8.0 --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True) | |
| import threading | |
| # subprocess.check_call([os.sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]) | |
| import spaces | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoProcessor, AutoTokenizer, TextIteratorStreamer | |
| from analytics import AnalyticsLogger | |
| from kernels import get_kernel | |
| from typing import Any | |
| #vllm_flash_attn3 = get_kernel("kernels-community/vllm-flash-attn3") | |
| #torch._dynamo.config.disable = True | |
| #MODEL_ID = "le-llm/lapa-v0.1-reasoning-only-32768" | |
| MODEL_ID = "le-llm/lapa-v0.1-instruct" | |
| MODEL_ID = "le-llm/lapa-v0.1-matt-instruction-5e06" | |
| MODEL_ID = "le-llm/lapa-v0.1-reprojected" | |
| logger = AnalyticsLogger() | |
| def _begin_analytics_session(): | |
| # Called once per client on app load | |
| _ = logger.start_session(MODEL_ID) | |
| def load_model(): | |
| """Lazy-load model, tokenizer, and optional processor (for zeroGPU).""" | |
| device = "cuda" # if torch.cuda.is_available() else "cpu" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) | |
| processor = None | |
| try: | |
| processor = AutoProcessor.from_pretrained(MODEL_ID) | |
| except Exception as err: # pragma: no cover - informative fallback | |
| print(f"Warning: AutoProcessor not available ({err}). Falling back to tokenizer.") | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| dtype=torch.bfloat16, # if device == "cuda" else torch.float32, | |
| device_map="auto", # if device == "cuda" else None, | |
| attn_implementation="flash_attention_2",# "kernels-community/vllm-flash-attn3", # # | |
| ) # .cuda() | |
| print(f"Selected device:", device) | |
| return model, tokenizer, processor, device | |
| # Load model/tokenizer each request → allows zeroGPU to cold start & then release | |
| model, tokenizer, processor, device = load_model() | |
| def _ensure_image_object(image_data: Any) -> Any | None: | |
| """Return a PIL Image object for the provided image data.""" | |
| if image_data is None: | |
| return None | |
| try: | |
| from PIL import Image | |
| except ImportError: # pragma: no cover - PIL is bundled with Gradio's image component | |
| return None | |
| # Already a PIL Image | |
| if isinstance(image_data, Image.Image): | |
| return image_data | |
| # Load from path | |
| if isinstance(image_data, str) and os.path.exists(image_data): | |
| return Image.open(image_data) | |
| return None | |
| def user(user_message, image_data, history: list): | |
| user_message = user_message or "" | |
| updated_history = list(history) | |
| has_content = False | |
| stripped_message = user_message.strip() | |
| image_obj = _ensure_image_object(image_data) | |
| # Store image as temp file for Gradio display, but keep PIL object in metadata | |
| if image_obj is not None: | |
| import tempfile | |
| fd, tmp_path = tempfile.mkstemp(suffix=".png") | |
| os.close(fd) | |
| image_obj.save(tmp_path, format="PNG") | |
| else: | |
| tmp_path = None | |
| # If we have both text and image, combine them in a single message | |
| if stripped_message and tmp_path is not None: | |
| updated_history.append({ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": stripped_message}, | |
| {"type": "image", "path": tmp_path, "alt_text": "uploaded image", "_pil_image": image_obj} | |
| ] | |
| }) | |
| has_content = True | |
| elif stripped_message: | |
| updated_history.append({"role": "user", "content": stripped_message}) | |
| has_content = True | |
| elif tmp_path is not None: | |
| updated_history.append({ | |
| "role": "user", | |
| "content": [{"type": "image", "path": tmp_path, "alt_text": "uploaded image", "_pil_image": image_obj}] | |
| }) | |
| has_content = True | |
| if not has_content: | |
| # Nothing to submit yet; keep inputs unchanged | |
| return user_message, image_data, history | |
| return "", None, updated_history | |
| def append_example_message(x: gr.SelectData, history): | |
| print(x) | |
| print(x.value) | |
| print(x.value["text"]) | |
| if x.value["text"] is not None: | |
| history.append({"role": "user", "content": x.value["text"]}) | |
| return history | |
| def _message_contains_image(message: dict[str, Any]) -> bool: | |
| content = message.get("content") | |
| if isinstance(content, dict): | |
| if "path" in content or "image" in content: | |
| return True | |
| if content.get("type") in {"image", "image_url"}: | |
| return True | |
| if isinstance(content, list): | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") in {"image", "image_url"}: | |
| return True | |
| return False | |
| def _content_to_text(content: Any) -> str: | |
| if isinstance(content, dict): | |
| if "text" in content: | |
| return content.get("text", "") | |
| if "path" in content: | |
| alt_text = content.get("alt_text") | |
| placeholder = alt_text or os.path.basename(content["path"]) or "image" | |
| return f"[image: {placeholder}]" | |
| if "image" in content: | |
| return "[image]" | |
| if content.get("type") == "image_url": | |
| image_url = content.get("image_url") | |
| if isinstance(image_url, dict): | |
| image_url = image_url.get("url", "") | |
| return f"[image: {image_url}]" | |
| if content.get("type") == "text": | |
| return content.get("text", "") | |
| return str(content) | |
| if isinstance(content, list): | |
| text_parts: list[str] = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| item_type = item.get("type") | |
| if item_type == "text": | |
| text_parts.append(item.get("text", "")) | |
| elif item_type == "image": | |
| text_parts.append("[image]") | |
| elif item_type == "image_url": | |
| image_url = item.get("image_url") | |
| if isinstance(image_url, dict): | |
| image_url = image_url.get("url", "") | |
| text_parts.append(f"[image: {image_url}]") | |
| else: | |
| text_parts.append(str(item)) | |
| else: | |
| text_parts.append(str(item)) | |
| filtered = [part for part in text_parts if part] | |
| return "\n".join(filtered) if filtered else "[image]" | |
| return str(content) | |
| def _collect_recent_user_contents(history: list[dict[str, Any]]) -> list[Any]: | |
| """Collect the trailing sequence of user messages prior to the assistant reply.""" | |
| chunks: list[Any] = [] | |
| for message in reversed(history): | |
| if message.get("role") != "user": | |
| break | |
| chunks.append(message.get("content")) | |
| chunks.reverse() | |
| return chunks | |
| def _prepare_text_history(history: list[dict[str, Any]]) -> list[dict[str, str]]: | |
| text_history: list[dict[str, str]] = [] | |
| for message in history: | |
| role = message.get("role", "user") | |
| content_text = _content_to_text(message.get("content")) | |
| if not content_text: | |
| continue | |
| if text_history and text_history[-1]["role"] == role: | |
| text_history[-1]["content"] = text_history[-1]["content"] + "\n" + content_text | |
| else: | |
| text_history.append({"role": role, "content": content_text}) | |
| return text_history | |
| def _prepare_processor_history(history: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """Prepare history for processor with proper image format.""" | |
| processor_history = [] | |
| for message in history: | |
| role = message.get("role", "user") | |
| content = message.get("content") | |
| # Handle different content formats | |
| if isinstance(content, str): | |
| # Simple text message | |
| processor_history.append({"role": role, "content": content}) | |
| elif isinstance(content, list): | |
| # Multi-modal content (text + images) | |
| formatted_content = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| item_type = item.get("type") | |
| if item_type == "text": | |
| formatted_content.append({"type": "text", "text": item.get("text", "")}) | |
| elif item_type == "image": | |
| # Extract PIL Image from _pil_image field or load from path | |
| pil_image = item.get("_pil_image") | |
| if pil_image is None and "path" in item: | |
| from PIL import Image | |
| pil_image = Image.open(item["path"]) | |
| if pil_image is not None: | |
| formatted_content.append({"type": "image", "image": pil_image}) | |
| if formatted_content: | |
| processor_history.append({"role": role, "content": formatted_content}) | |
| elif isinstance(content, dict): | |
| # Legacy format or single image | |
| if "image" in content or "_pil_image" in content: | |
| pil_image = content.get("_pil_image") or content.get("image") | |
| if pil_image is None and "path" in content: | |
| from PIL import Image | |
| pil_image = Image.open(content["path"]) | |
| if pil_image is not None: | |
| processor_history.append({ | |
| "role": role, | |
| "content": [{"type": "image", "image": pil_image}] | |
| }) | |
| else: | |
| # Try to extract text | |
| text = _content_to_text(content) | |
| if text: | |
| processor_history.append({"role": role, "content": text}) | |
| return processor_history | |
| def _clean_history_for_display(history: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """Remove internal metadata fields like _pil_image before displaying in Gradio.""" | |
| cleaned = [] | |
| for message in history: | |
| cleaned_message = {"role": message.get("role", "user")} | |
| content = message.get("content") | |
| if isinstance(content, str): | |
| cleaned_message["content"] = content | |
| elif isinstance(content, list): | |
| cleaned_content = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| # Remove _pil_image and ensure alt_text is string or absent | |
| cleaned_item = {} | |
| for k, v in item.items(): | |
| if k == "_pil_image": | |
| continue | |
| if k == "alt_text" and not isinstance(v, str): | |
| continue | |
| cleaned_item[k] = v | |
| cleaned_content.append(cleaned_item) | |
| else: | |
| cleaned_content.append(item) | |
| cleaned_message["content"] = cleaned_content | |
| elif isinstance(content, dict): | |
| # Remove _pil_image and ensure alt_text is string or absent | |
| cleaned_item = {} | |
| for k, v in content.items(): | |
| if k == "_pil_image": | |
| continue | |
| if k == "alt_text" and not isinstance(v, str): | |
| continue | |
| cleaned_item[k] = v | |
| cleaned_message["content"] = cleaned_item | |
| else: | |
| cleaned_message["content"] = content | |
| cleaned.append(cleaned_message) | |
| return cleaned | |
| def bot( | |
| history: list[dict[str, Any]] | |
| # max_tokens, | |
| # temperature, | |
| # top_p, | |
| ): | |
| user_chunks = _collect_recent_user_contents(history) | |
| if not user_chunks: | |
| user_message_text = "" | |
| else: | |
| user_message_text = "\n".join(filter(None, (_content_to_text(chunk) for chunk in user_chunks))) | |
| print('User message:', user_message_text) | |
| # [{"role": "system", "content": system_message}] + | |
| # Build conversation | |
| max_tokens = 4096 | |
| temperature = 0.7 | |
| top_p = 0.95 | |
| text_history = _prepare_text_history(history) | |
| # Handle empty history case | |
| if not text_history: | |
| input_text = "" | |
| else: | |
| input_text: str = tokenizer.apply_chat_template( | |
| text_history, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| # enable_thinking=True, | |
| ) | |
| if input_text and tokenizer.bos_token: | |
| input_text = input_text.replace(tokenizer.bos_token, "", 1) | |
| print(input_text) | |
| model_inputs = None | |
| # Early return if no input | |
| if not input_text and not any(_message_contains_image(msg) for msg in history): | |
| return | |
| if processor is not None and any(_message_contains_image(msg) for msg in history): | |
| try: | |
| processor_history = _prepare_processor_history(history) | |
| model_inputs = processor( | |
| messages=processor_history, | |
| return_tensors="pt", | |
| add_generation_prompt=True, | |
| ).to(model.device) | |
| except Exception as exc: # pragma: no cover - diagnostic logging | |
| print(f"Processor failed, using tokenizer pipeline instead: {exc}") | |
| if model_inputs is None: | |
| model_inputs = tokenizer(input_text, return_tensors="pt").to(model.device) # .to(device) | |
| decoded_input = tokenizer.decode(model_inputs["input_ids"][0]) | |
| print("Decoded input:", decoded_input) | |
| print([{int(token_id.item()): tokenizer.decode([int(token_id.item())])} for token_id in model_inputs["input_ids"][0]]) | |
| # Streamer setup | |
| streamer = TextIteratorStreamer( | |
| tokenizer, skip_prompt=True # skip_special_tokens=True # , | |
| ) | |
| # Run model.generate in background thread | |
| generation_kwargs = dict( | |
| **model_inputs, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| top_k=64, | |
| do_sample=True, | |
| # eos_token_id=tokenizer.eos_token_id, | |
| streamer=streamer, | |
| ) | |
| thread = threading.Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| history.append({"role": "assistant", "content": ""}) | |
| # Yield tokens as they come in | |
| for new_text in streamer: | |
| history[-1]["content"] += new_text | |
| yield _clean_history_for_display(history) | |
| assistant_message = history[-1]["content"] | |
| logger.log_interaction(user=user_message_text, answer=assistant_message) | |
| # --- drop-in UI compatible with older Gradio versions --- | |
| import os, tempfile, time | |
| import gradio as gr | |
| # Ukrainian-inspired theme with deep, muted colors reflecting unbeatable spirit: | |
| THEME = gr.themes.Soft( | |
| primary_hue="blue", # Deep blue representing Ukrainian sky and resolve | |
| secondary_hue="amber", # Warm amber representing golden fields and determination | |
| neutral_hue="stone", # Earthy stone representing strength and foundation | |
| ) | |
| # Load CSS from external file | |
| def load_css(): | |
| try: | |
| with open("static/style.css", "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| print("Warning: static/style.css not found") | |
| return "" | |
| CSS = load_css() | |
| def _clear_chat(): | |
| return "", None, [] | |
| with gr.Blocks(theme=THEME, css=CSS, fill_height=True) as demo: | |
| demo.load(fn=_begin_analytics_session, inputs=None, outputs=None) | |
| # Header (no gr.Box to avoid version issues) | |
| gr.HTML( | |
| """ | |
| <div id="app-header"> | |
| <div class="app-title">✨ LAPA</div> | |
| <div class="app-subtitle">LLM for Ukrainian Language</div> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(equal_height=True): | |
| # Left side: Chat | |
| with gr.Column(scale=7, elem_id="left-pane"): | |
| with gr.Column(elem_id="chat-card"): | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| height=560, | |
| render_markdown=True, | |
| show_copy_button=True, | |
| show_label=False, | |
| # likeable=True, | |
| allow_tags=["think"], | |
| elem_id="chatbot", | |
| examples=[ | |
| {"text": i} | |
| for i in [ | |
| "хто тримає цей район?", | |
| "Напиши історію про Івасика-Телесика", | |
| "Яка найвища гора в Україні?", | |
| "Як звали батька Тараса Григоровича Шевченка?", | |
| "Яка з цих гір не знаходиться у Європі? Говерла, Монблан, Гран-Парадізо, Еверест", | |
| "Дай відповідь на питання\nЧому у качки жовті ноги?", | |
| ] | |
| ], | |
| ) | |
| image_input = gr.Image( | |
| label="Attach image (optional)", | |
| type="pil", | |
| sources=["upload", "clipboard"], | |
| height=200, | |
| interactive=True, | |
| elem_id="image-input", | |
| ) | |
| # ChatGPT-style input box with stop button | |
| with gr.Row(elem_id="chat-input-row"): | |
| msg = gr.Textbox( | |
| label=None, | |
| placeholder="Message… (Press Enter to send)", | |
| autofocus=True, | |
| lines=1, | |
| max_lines=6, | |
| container=False, | |
| show_label=False, | |
| elem_id="chat-input", | |
| elem_classes=["chat-input-box"] | |
| ) | |
| stop_btn_visible = gr.Button( | |
| "⏹️", | |
| variant="secondary", | |
| elem_id="stop-btn-visible", | |
| elem_classes=["stop-btn-chat"], | |
| visible=False, | |
| size="sm" | |
| ) | |
| # Hidden buttons for functionality | |
| with gr.Row(visible=True, elem_id="hidden-buttons"): | |
| send_btn = gr.Button("Send", variant="primary", elem_id="send-btn") | |
| stop_btn = gr.Button("Stop", variant="secondary", elem_id="stop-btn") | |
| clear_btn = gr.Button("Clear", variant="secondary", elem_id="clear-btn") | |
| # export_btn = gr.Button("Export chat (.md)", variant="secondary", elem_classes=["rounded-btn","secondary-btn"]) | |
| # exported_file = gr.File(label="", interactive=False, visible=True) | |
| gr.HTML('<div class="footer-tip">Shortcuts: Enter to send • Shift+Enter for new line</div>') | |
| # Helper functions for managing UI state | |
| def show_stop_button(): | |
| return gr.update(visible=True) | |
| def hide_stop_button(): | |
| return gr.update(visible=False) | |
| # Events (preserve your original handlers) | |
| e1 = msg.submit(fn=user, inputs=[msg, image_input, chatbot], outputs=[msg, image_input, chatbot], queue=True).then( | |
| fn=show_stop_button, inputs=None, outputs=stop_btn_visible | |
| ).then( | |
| fn=bot, inputs=chatbot, outputs=chatbot | |
| ).then( | |
| fn=hide_stop_button, inputs=None, outputs=stop_btn_visible | |
| ) | |
| e2 = send_btn.click(fn=user, inputs=[msg, image_input, chatbot], outputs=[msg, image_input, chatbot], queue=True).then( | |
| fn=show_stop_button, inputs=None, outputs=stop_btn_visible | |
| ).then( | |
| fn=bot, inputs=chatbot, outputs=chatbot | |
| ).then( | |
| fn=hide_stop_button, inputs=None, outputs=stop_btn_visible | |
| ) | |
| e3 = chatbot.example_select(fn=append_example_message, inputs=[chatbot], outputs=[chatbot], queue=True).then( | |
| fn=show_stop_button, inputs=None, outputs=stop_btn_visible | |
| ).then( | |
| fn=bot, inputs=chatbot, outputs=chatbot | |
| ).then( | |
| fn=hide_stop_button, inputs=None, outputs=stop_btn_visible | |
| ) | |
| # Stop cancels running events (both buttons work) | |
| stop_btn.click(fn=hide_stop_button, inputs=None, outputs=stop_btn_visible, cancels=[e1, e2, e3], queue=True) | |
| stop_btn_visible.click(fn=hide_stop_button, inputs=None, outputs=stop_btn_visible, cancels=[e1, e2, e3], queue=True) | |
| # Clear chat + input | |
| clear_btn.click(fn=_clear_chat, inputs=None, outputs=[msg, image_input, chatbot]) | |
| # Export markdown | |
| # export_btn.click(fn=_export_markdown, inputs=chatbot, outputs=exported_file) | |
| # Load and inject external JavaScript | |
| def load_javascript(): | |
| try: | |
| with open("static/script.js", "r", encoding="utf-8") as f: | |
| return f"<script>{f.read()}</script>" | |
| except FileNotFoundError: | |
| print("Warning: static/script.js not found") | |
| return "" | |
| gr.HTML(load_javascript()) | |
| if __name__ == "__main__": | |
| demo.queue().launch() | |