lapa / app.py
robinhad's picture
Update app.py
ab2add0 verified
raw
history blame
20.9 kB
import os
import subprocess
import tempfile
# subprocess.run('pip install flash-attn==2.8.0 --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True)
import threading
# subprocess.check_call([os.sys.executable, "-m", "pip", "install", "-r", "requirements.txt"])
import spaces
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoProcessor, AutoTokenizer, TextIteratorStreamer
from analytics import AnalyticsLogger
from kernels import get_kernel
from typing import Any
#vllm_flash_attn3 = get_kernel("kernels-community/vllm-flash-attn3")
#torch._dynamo.config.disable = True
#MODEL_ID = "le-llm/lapa-v0.1-reasoning-only-32768"
MODEL_ID = "le-llm/lapa-v0.1-instruct"
MODEL_ID = "le-llm/lapa-v0.1-matt-instruction-5e06"
MODEL_ID = "le-llm/lapa-v0.1-reprojected"
logger = AnalyticsLogger()
def _begin_analytics_session():
# Called once per client on app load
_ = logger.start_session(MODEL_ID)
def load_model():
"""Lazy-load model, tokenizer, and optional processor (for zeroGPU)."""
device = "cuda" # if torch.cuda.is_available() else "cpu"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
processor = None
try:
processor = AutoProcessor.from_pretrained(MODEL_ID)
except Exception as err: # pragma: no cover - informative fallback
print(f"Warning: AutoProcessor not available ({err}). Falling back to tokenizer.")
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
dtype=torch.bfloat16, # if device == "cuda" else torch.float32,
device_map="auto", # if device == "cuda" else None,
attn_implementation="flash_attention_2",# "kernels-community/vllm-flash-attn3", # #
) # .cuda()
print(f"Selected device:", device)
return model, tokenizer, processor, device
# Load model/tokenizer each request → allows zeroGPU to cold start & then release
model, tokenizer, processor, device = load_model()
def _ensure_image_object(image_data: Any) -> Any | None:
"""Return a PIL Image object for the provided image data."""
if image_data is None:
return None
try:
from PIL import Image
except ImportError: # pragma: no cover - PIL is bundled with Gradio's image component
return None
# Already a PIL Image
if isinstance(image_data, Image.Image):
return image_data
# Load from path
if isinstance(image_data, str) and os.path.exists(image_data):
return Image.open(image_data)
return None
def user(user_message, image_data, history: list):
user_message = user_message or ""
updated_history = list(history)
has_content = False
stripped_message = user_message.strip()
image_obj = _ensure_image_object(image_data)
# Store image as temp file for Gradio display, but keep PIL object in metadata
if image_obj is not None:
import tempfile
fd, tmp_path = tempfile.mkstemp(suffix=".png")
os.close(fd)
image_obj.save(tmp_path, format="PNG")
else:
tmp_path = None
# If we have both text and image, combine them in a single message
if stripped_message and tmp_path is not None:
updated_history.append({
"role": "user",
"content": [
{"type": "text", "text": stripped_message},
{"type": "image", "path": tmp_path, "alt_text": "uploaded image", "_pil_image": image_obj}
]
})
has_content = True
elif stripped_message:
updated_history.append({"role": "user", "content": stripped_message})
has_content = True
elif tmp_path is not None:
updated_history.append({
"role": "user",
"content": [{"type": "image", "path": tmp_path, "alt_text": "uploaded image", "_pil_image": image_obj}]
})
has_content = True
if not has_content:
# Nothing to submit yet; keep inputs unchanged
return user_message, image_data, history
return "", None, updated_history
def append_example_message(x: gr.SelectData, history):
print(x)
print(x.value)
print(x.value["text"])
if x.value["text"] is not None:
history.append({"role": "user", "content": x.value["text"]})
return history
def _message_contains_image(message: dict[str, Any]) -> bool:
content = message.get("content")
if isinstance(content, dict):
if "path" in content or "image" in content:
return True
if content.get("type") in {"image", "image_url"}:
return True
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") in {"image", "image_url"}:
return True
return False
def _content_to_text(content: Any) -> str:
if isinstance(content, dict):
if "text" in content:
return content.get("text", "")
if "path" in content:
alt_text = content.get("alt_text")
placeholder = alt_text or os.path.basename(content["path"]) or "image"
return f"[image: {placeholder}]"
if "image" in content:
return "[image]"
if content.get("type") == "image_url":
image_url = content.get("image_url")
if isinstance(image_url, dict):
image_url = image_url.get("url", "")
return f"[image: {image_url}]"
if content.get("type") == "text":
return content.get("text", "")
return str(content)
if isinstance(content, list):
text_parts: list[str] = []
for item in content:
if isinstance(item, dict):
item_type = item.get("type")
if item_type == "text":
text_parts.append(item.get("text", ""))
elif item_type == "image":
text_parts.append("[image]")
elif item_type == "image_url":
image_url = item.get("image_url")
if isinstance(image_url, dict):
image_url = image_url.get("url", "")
text_parts.append(f"[image: {image_url}]")
else:
text_parts.append(str(item))
else:
text_parts.append(str(item))
filtered = [part for part in text_parts if part]
return "\n".join(filtered) if filtered else "[image]"
return str(content)
def _collect_recent_user_contents(history: list[dict[str, Any]]) -> list[Any]:
"""Collect the trailing sequence of user messages prior to the assistant reply."""
chunks: list[Any] = []
for message in reversed(history):
if message.get("role") != "user":
break
chunks.append(message.get("content"))
chunks.reverse()
return chunks
def _prepare_text_history(history: list[dict[str, Any]]) -> list[dict[str, str]]:
text_history: list[dict[str, str]] = []
for message in history:
role = message.get("role", "user")
content_text = _content_to_text(message.get("content"))
if not content_text:
continue
if text_history and text_history[-1]["role"] == role:
text_history[-1]["content"] = text_history[-1]["content"] + "\n" + content_text
else:
text_history.append({"role": role, "content": content_text})
return text_history
def _prepare_processor_history(history: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Prepare history for processor with proper image format."""
processor_history = []
for message in history:
role = message.get("role", "user")
content = message.get("content")
# Handle different content formats
if isinstance(content, str):
# Simple text message
processor_history.append({"role": role, "content": content})
elif isinstance(content, list):
# Multi-modal content (text + images)
formatted_content = []
for item in content:
if isinstance(item, dict):
item_type = item.get("type")
if item_type == "text":
formatted_content.append({"type": "text", "text": item.get("text", "")})
elif item_type == "image":
# Extract PIL Image from _pil_image field or load from path
pil_image = item.get("_pil_image")
if pil_image is None and "path" in item:
from PIL import Image
pil_image = Image.open(item["path"])
if pil_image is not None:
formatted_content.append({"type": "image", "image": pil_image})
if formatted_content:
processor_history.append({"role": role, "content": formatted_content})
elif isinstance(content, dict):
# Legacy format or single image
if "image" in content or "_pil_image" in content:
pil_image = content.get("_pil_image") or content.get("image")
if pil_image is None and "path" in content:
from PIL import Image
pil_image = Image.open(content["path"])
if pil_image is not None:
processor_history.append({
"role": role,
"content": [{"type": "image", "image": pil_image}]
})
else:
# Try to extract text
text = _content_to_text(content)
if text:
processor_history.append({"role": role, "content": text})
return processor_history
def _clean_history_for_display(history: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Remove internal metadata fields like _pil_image before displaying in Gradio."""
cleaned = []
for message in history:
cleaned_message = {"role": message.get("role", "user")}
content = message.get("content")
if isinstance(content, str):
cleaned_message["content"] = content
elif isinstance(content, list):
cleaned_content = []
for item in content:
if isinstance(item, dict):
# Remove _pil_image and ensure alt_text is string or absent
cleaned_item = {}
for k, v in item.items():
if k == "_pil_image":
continue
if k == "alt_text" and not isinstance(v, str):
continue
cleaned_item[k] = v
cleaned_content.append(cleaned_item)
else:
cleaned_content.append(item)
cleaned_message["content"] = cleaned_content
elif isinstance(content, dict):
# Remove _pil_image and ensure alt_text is string or absent
cleaned_item = {}
for k, v in content.items():
if k == "_pil_image":
continue
if k == "alt_text" and not isinstance(v, str):
continue
cleaned_item[k] = v
cleaned_message["content"] = cleaned_item
else:
cleaned_message["content"] = content
cleaned.append(cleaned_message)
return cleaned
@spaces.GPU
def bot(
history: list[dict[str, Any]]
# max_tokens,
# temperature,
# top_p,
):
user_chunks = _collect_recent_user_contents(history)
if not user_chunks:
user_message_text = ""
else:
user_message_text = "\n".join(filter(None, (_content_to_text(chunk) for chunk in user_chunks)))
print('User message:', user_message_text)
# [{"role": "system", "content": system_message}] +
# Build conversation
max_tokens = 4096
temperature = 0.7
top_p = 0.95
text_history = _prepare_text_history(history)
# Handle empty history case
if not text_history:
input_text = ""
else:
input_text: str = tokenizer.apply_chat_template(
text_history,
tokenize=False,
add_generation_prompt=True,
# enable_thinking=True,
)
if input_text and tokenizer.bos_token:
input_text = input_text.replace(tokenizer.bos_token, "", 1)
print(input_text)
model_inputs = None
# Early return if no input
if not input_text and not any(_message_contains_image(msg) for msg in history):
return
if processor is not None and any(_message_contains_image(msg) for msg in history):
try:
processor_history = _prepare_processor_history(history)
model_inputs = processor(
messages=processor_history,
return_tensors="pt",
add_generation_prompt=True,
).to(model.device)
except Exception as exc: # pragma: no cover - diagnostic logging
print(f"Processor failed, using tokenizer pipeline instead: {exc}")
if model_inputs is None:
model_inputs = tokenizer(input_text, return_tensors="pt").to(model.device) # .to(device)
decoded_input = tokenizer.decode(model_inputs["input_ids"][0])
print("Decoded input:", decoded_input)
print([{int(token_id.item()): tokenizer.decode([int(token_id.item())])} for token_id in model_inputs["input_ids"][0]])
# Streamer setup
streamer = TextIteratorStreamer(
tokenizer, skip_prompt=True # skip_special_tokens=True # ,
)
# Run model.generate in background thread
generation_kwargs = dict(
**model_inputs,
max_new_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=64,
do_sample=True,
# eos_token_id=tokenizer.eos_token_id,
streamer=streamer,
)
thread = threading.Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
history.append({"role": "assistant", "content": ""})
# Yield tokens as they come in
for new_text in streamer:
history[-1]["content"] += new_text
yield _clean_history_for_display(history)
assistant_message = history[-1]["content"]
logger.log_interaction(user=user_message_text, answer=assistant_message)
# --- drop-in UI compatible with older Gradio versions ---
import os, tempfile, time
import gradio as gr
# Ukrainian-inspired theme with deep, muted colors reflecting unbeatable spirit:
THEME = gr.themes.Soft(
primary_hue="blue", # Deep blue representing Ukrainian sky and resolve
secondary_hue="amber", # Warm amber representing golden fields and determination
neutral_hue="stone", # Earthy stone representing strength and foundation
)
# Load CSS from external file
def load_css():
try:
with open("static/style.css", "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
print("Warning: static/style.css not found")
return ""
CSS = load_css()
def _clear_chat():
return "", None, []
with gr.Blocks(theme=THEME, css=CSS, fill_height=True) as demo:
demo.load(fn=_begin_analytics_session, inputs=None, outputs=None)
# Header (no gr.Box to avoid version issues)
gr.HTML(
"""
<div id="app-header">
<div class="app-title">✨ LAPA</div>
<div class="app-subtitle">LLM for Ukrainian Language</div>
</div>
"""
)
with gr.Row(equal_height=True):
# Left side: Chat
with gr.Column(scale=7, elem_id="left-pane"):
with gr.Column(elem_id="chat-card"):
chatbot = gr.Chatbot(
type="messages",
height=560,
render_markdown=True,
show_copy_button=True,
show_label=False,
# likeable=True,
allow_tags=["think"],
elem_id="chatbot",
examples=[
{"text": i}
for i in [
"хто тримає цей район?",
"Напиши історію про Івасика-Телесика",
"Яка найвища гора в Україні?",
"Як звали батька Тараса Григоровича Шевченка?",
"Яка з цих гір не знаходиться у Європі? Говерла, Монблан, Гран-Парадізо, Еверест",
"Дай відповідь на питання\nЧому у качки жовті ноги?",
]
],
)
image_input = gr.Image(
label="Attach image (optional)",
type="pil",
sources=["upload", "clipboard"],
height=200,
interactive=True,
elem_id="image-input",
)
# ChatGPT-style input box with stop button
with gr.Row(elem_id="chat-input-row"):
msg = gr.Textbox(
label=None,
placeholder="Message… (Press Enter to send)",
autofocus=True,
lines=1,
max_lines=6,
container=False,
show_label=False,
elem_id="chat-input",
elem_classes=["chat-input-box"]
)
stop_btn_visible = gr.Button(
"⏹️",
variant="secondary",
elem_id="stop-btn-visible",
elem_classes=["stop-btn-chat"],
visible=False,
size="sm"
)
# Hidden buttons for functionality
with gr.Row(visible=True, elem_id="hidden-buttons"):
send_btn = gr.Button("Send", variant="primary", elem_id="send-btn")
stop_btn = gr.Button("Stop", variant="secondary", elem_id="stop-btn")
clear_btn = gr.Button("Clear", variant="secondary", elem_id="clear-btn")
# export_btn = gr.Button("Export chat (.md)", variant="secondary", elem_classes=["rounded-btn","secondary-btn"])
# exported_file = gr.File(label="", interactive=False, visible=True)
gr.HTML('<div class="footer-tip">Shortcuts: Enter to send • Shift+Enter for new line</div>')
# Helper functions for managing UI state
def show_stop_button():
return gr.update(visible=True)
def hide_stop_button():
return gr.update(visible=False)
# Events (preserve your original handlers)
e1 = msg.submit(fn=user, inputs=[msg, image_input, chatbot], outputs=[msg, image_input, chatbot], queue=True).then(
fn=show_stop_button, inputs=None, outputs=stop_btn_visible
).then(
fn=bot, inputs=chatbot, outputs=chatbot
).then(
fn=hide_stop_button, inputs=None, outputs=stop_btn_visible
)
e2 = send_btn.click(fn=user, inputs=[msg, image_input, chatbot], outputs=[msg, image_input, chatbot], queue=True).then(
fn=show_stop_button, inputs=None, outputs=stop_btn_visible
).then(
fn=bot, inputs=chatbot, outputs=chatbot
).then(
fn=hide_stop_button, inputs=None, outputs=stop_btn_visible
)
e3 = chatbot.example_select(fn=append_example_message, inputs=[chatbot], outputs=[chatbot], queue=True).then(
fn=show_stop_button, inputs=None, outputs=stop_btn_visible
).then(
fn=bot, inputs=chatbot, outputs=chatbot
).then(
fn=hide_stop_button, inputs=None, outputs=stop_btn_visible
)
# Stop cancels running events (both buttons work)
stop_btn.click(fn=hide_stop_button, inputs=None, outputs=stop_btn_visible, cancels=[e1, e2, e3], queue=True)
stop_btn_visible.click(fn=hide_stop_button, inputs=None, outputs=stop_btn_visible, cancels=[e1, e2, e3], queue=True)
# Clear chat + input
clear_btn.click(fn=_clear_chat, inputs=None, outputs=[msg, image_input, chatbot])
# Export markdown
# export_btn.click(fn=_export_markdown, inputs=chatbot, outputs=exported_file)
# Load and inject external JavaScript
def load_javascript():
try:
with open("static/script.js", "r", encoding="utf-8") as f:
return f"<script>{f.read()}</script>"
except FileNotFoundError:
print("Warning: static/script.js not found")
return ""
gr.HTML(load_javascript())
if __name__ == "__main__":
demo.queue().launch()