Spaces:
Running
Running
| """Slyfox backend — serves index.html and persists chats to an HF Bucket. | |
| Layout in the bucket (one file per chat): | |
| users/<user_id>/chats/<chat_id>.json | |
| users/<user_id>/index.json # ordered list of chat ids + titles + topics | |
| jobs/<job_id>.json # slyfox-submitted hf jobs (extract / analyze) | |
| `<user_id>` is currently an anonymous UUID minted in the browser (stored in | |
| localStorage) and sent as `X-User-Id`. This is intentionally minimal — swap for | |
| HF OAuth identity later by reading the OAuth headers Spaces inject. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import logging | |
| import os | |
| import re | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| from fastapi import FastAPI, HTTPException, Header, Request | |
| from fastapi.responses import FileResponse, JSONResponse, RedirectResponse, Response | |
| from fastapi.staticfiles import StaticFiles | |
| from huggingface_hub import HfApi | |
| from huggingface_hub.errors import EntryNotFoundError | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger("slyfox") | |
| BUCKET_ID = os.getenv("SLYFOX_BUCKET", "HF-slyfox/slyfox-chats") | |
| HF_TOKEN = os.getenv("HF_TOKEN") # Spaces inject this automatically. | |
| # Job submission config — URLs of UV scripts that run on hf jobs infra. | |
| SLYFOX_JOBS_REF = os.getenv("SLYFOX_JOBS_REF", "arthur-work") # branch / tag of the slyfox repo (`main` once promoted) | |
| JOBS_SCRIPT_BASE = f"https://raw.githubusercontent.com/huggingface/slyfox/{SLYFOX_JOBS_REF}/jobs_scripts" | |
| EXTRACT_SCRIPT_URL = f"{JOBS_SCRIPT_BASE}/extract_vectors.py" | |
| ANALYZE_SCRIPT_URL = f"{JOBS_SCRIPT_BASE}/analyze_traces.py" | |
| VECTORS_BUCKET = os.getenv("SLYFOX_VECTORS_BUCKET", "HF-slyfox/emotion-vectors") | |
| api = HfApi(token=HF_TOKEN) | |
| ROOT = Path(__file__).parent | |
| USER_RE = re.compile(r"^[A-Za-z0-9_-]{8,64}$") | |
| CHAT_RE = re.compile(r"^[A-Za-z0-9_-]{8,64}$") | |
| MODEL_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._\-]*/[A-Za-z0-9][A-Za-z0-9._\-]*$") | |
| BUCKET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._\-]*/[A-Za-z0-9][A-Za-z0-9._\-]*$") | |
| def _user_prefix(user_id: str) -> str: | |
| if not USER_RE.match(user_id): | |
| raise HTTPException(400, "invalid user id") | |
| return f"users/{user_id}" | |
| def _chat_path(user_id: str, chat_id: str) -> str: | |
| if not CHAT_RE.match(chat_id): | |
| raise HTTPException(400, "invalid chat id") | |
| return f"{_user_prefix(user_id)}/chats/{chat_id}.json" | |
| def _index_path(user_id: str) -> str: | |
| return f"{_user_prefix(user_id)}/index.json" | |
| def _read_json(remote_path: str) -> Any | None: | |
| """Return the JSON object stored at `remote_path`, or None if missing.""" | |
| try: | |
| meta = api.get_bucket_file_metadata(BUCKET_ID, remote_path) | |
| except EntryNotFoundError: | |
| return None | |
| except Exception as e: # noqa: BLE001 | |
| log.warning("metadata failed for %s: %s", remote_path, e) | |
| return None | |
| import tempfile | |
| with tempfile.TemporaryDirectory() as tmp: | |
| local = Path(tmp) / "blob.json" | |
| try: | |
| api.download_bucket_files( | |
| BUCKET_ID, | |
| files=[(remote_path, str(local))], | |
| ) | |
| except EntryNotFoundError: | |
| return None | |
| return json.loads(local.read_text()) | |
| def _write_json(remote_path: str, payload: Any) -> None: | |
| data = json.dumps(payload, ensure_ascii=False).encode("utf-8") | |
| api.batch_bucket_files(BUCKET_ID, add=[(data, remote_path)]) | |
| def _list_json(prefix: str) -> list[dict]: | |
| """List JSON objects directly under `prefix/` in the bucket.""" | |
| try: | |
| files = list(api.list_bucket_files(BUCKET_ID, path=prefix)) | |
| except Exception as e: # noqa: BLE001 | |
| log.warning("list failed for %s: %s", prefix, e) | |
| return [] | |
| out: list[dict] = [] | |
| for f in files: | |
| path = getattr(f, "path", None) or getattr(f, "rfilename", None) or str(f) | |
| if not path.endswith(".json"): | |
| continue | |
| data = _read_json(path) | |
| if isinstance(data, dict): | |
| out.append(data) | |
| return out | |
| def _delete(remote_path: str) -> None: | |
| try: | |
| api.batch_bucket_files(BUCKET_ID, delete=[remote_path]) | |
| except Exception as e: # noqa: BLE001 | |
| log.warning("delete failed for %s: %s", remote_path, e) | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat(timespec="seconds") | |
| app = FastAPI(title="Slyfox") | |
| # ============================================================================= | |
| # Existing chat persistence API (unchanged) | |
| # ============================================================================= | |
| def list_chats(x_user_id: str = Header(...)) -> JSONResponse: | |
| """Return the per-user index: [{id, title, topic, updated_at}, ...].""" | |
| idx = _read_json(_index_path(x_user_id)) or [] | |
| return JSONResponse(idx) | |
| def get_chat(chat_id: str, x_user_id: str = Header(...)) -> JSONResponse: | |
| data = _read_json(_chat_path(x_user_id, chat_id)) | |
| if data is None: | |
| raise HTTPException(404, "chat not found") | |
| return JSONResponse(data) | |
| async def put_chat(chat_id: str, request: Request, x_user_id: str = Header(...)) -> JSONResponse: | |
| body = await request.json() | |
| if body.get("id") != chat_id: | |
| raise HTTPException(400, "chat id mismatch") | |
| _write_json(_chat_path(x_user_id, chat_id), body) | |
| idx = _read_json(_index_path(x_user_id)) or [] | |
| idx = [e for e in idx if e.get("id") != chat_id] | |
| idx.insert( | |
| 0, | |
| { | |
| "id": chat_id, | |
| "title": body.get("title", "Untitled"), | |
| "topic": body.get("topic"), | |
| "repo": body.get("repo"), | |
| "updated_at": body.get("updated_at"), | |
| }, | |
| ) | |
| _write_json(_index_path(x_user_id), idx) | |
| return JSONResponse({"ok": True}) | |
| def delete_chat(chat_id: str, x_user_id: str = Header(...)) -> JSONResponse: | |
| _delete(_chat_path(x_user_id, chat_id)) | |
| idx = _read_json(_index_path(x_user_id)) or [] | |
| idx = [e for e in idx if e.get("id") != chat_id] | |
| _write_json(_index_path(x_user_id), idx) | |
| return JSONResponse({"ok": True}) | |
| def healthz() -> dict: | |
| return {"ok": True, "bucket": BUCKET_ID} | |
| # ============================================================================= | |
| # Jobs API — emotion-vector extraction + trace analysis via `hf jobs` | |
| # ============================================================================= | |
| # | |
| # Two job kinds, both submitted via `HfApi.run_uv_job`: | |
| # | |
| # extract : take a model_id, run EmotionScope's vector extraction, write | |
| # the 20 emotion direction vectors to a bucket under | |
| # <VECTORS_BUCKET>/<model_id>/ | |
| # | |
| # analyze : take a traces source (bucket+prefix) and a model_id, replay | |
| # sessions through the model, project each turn's residual stream | |
| # onto the emotion vectors, write per-trace parquet results under | |
| # <source>/trace-analysis/<run_id>/ | |
| # | |
| # A small sidecar record is written to the chats bucket under `jobs/<job_id>.json` | |
| # so the Space can list its own jobs. Live status / logs are pulled directly | |
| # from the HF jobs API on demand. | |
| def _job_record(job_id: str, *, kind: str, params: dict, requester: str) -> dict: | |
| return { | |
| "job_id": job_id, | |
| "kind": kind, | |
| "params": params, | |
| "requester": requester, | |
| "submitted_at": _now_iso(), | |
| } | |
| def _hf_job_status(job_id: str) -> dict: | |
| """Live status snapshot for one job. Falls back gracefully on errors.""" | |
| try: | |
| info = api.inspect_job(job_id) | |
| except Exception as e: # noqa: BLE001 | |
| return {"job_id": job_id, "status": "unknown", "error": str(e)} | |
| started = getattr(info, "started_at", None) | |
| ended = getattr(info, "ended_at", None) | |
| elapsed = None | |
| if started: | |
| end = ended or datetime.now(timezone.utc) | |
| try: | |
| elapsed = int((end - started).total_seconds()) | |
| except Exception: # noqa: BLE001 | |
| elapsed = None | |
| return { | |
| "job_id": job_id, | |
| "status": getattr(info, "status", None) or getattr(info, "stage", "unknown"), | |
| "flavor": getattr(info, "flavor", None), | |
| "started_at": started.isoformat() if hasattr(started, "isoformat") else started, | |
| "ended_at": ended.isoformat() if hasattr(ended, "isoformat") else ended, | |
| "elapsed_s": elapsed, | |
| } | |
| async def request_model_extract(request: Request, x_user_id: str = Header(...)) -> JSONResponse: | |
| """Submit an emotion-vector extraction job for a HuggingFace model.""" | |
| _user_prefix(x_user_id) | |
| body = await request.json() | |
| model_id = (body.get("model_id") or "").strip() | |
| flavor = body.get("flavor") or "a10g-large" | |
| if not MODEL_RE.match(model_id): | |
| raise HTTPException(400, "model_id must look like 'org/name'") | |
| output = f"hf://buckets/{VECTORS_BUCKET}/{model_id}/" | |
| job = api.run_uv_job( | |
| script=EXTRACT_SCRIPT_URL, | |
| script_args=["--model", model_id, "--output", output], | |
| dependencies=[ | |
| "transformers>=4.45", | |
| "torch", | |
| "huggingface_hub>=1.7", | |
| "scikit-learn", | |
| "safetensors", | |
| "numpy", | |
| ], | |
| flavor=flavor, | |
| timeout="45m", | |
| labels={ | |
| "slyfox.io/type": "extract", | |
| "slyfox.io/model": model_id, | |
| "slyfox.io/requester": x_user_id, | |
| }, | |
| ) | |
| record = _job_record( | |
| job.id, | |
| kind="extract", | |
| params={"model_id": model_id, "flavor": flavor, "output": output}, | |
| requester=x_user_id, | |
| ) | |
| _write_json(f"jobs/{job.id}.json", record) | |
| return JSONResponse(record) | |
| async def request_analysis(request: Request, x_user_id: str = Header(...)) -> JSONResponse: | |
| """Submit a trace-analysis job: replay sessions through a model and emit emotion trajectories.""" | |
| _user_prefix(x_user_id) | |
| body = await request.json() | |
| source = (body.get("source") or "").strip() # bucket id, e.g. "HF-slyfox/traces" | |
| prefix = (body.get("prefix") or "").strip().strip("/") | |
| n_traces = int(body.get("n_traces") or 5) | |
| model_id = (body.get("model_id") or "").strip() | |
| flavor = body.get("flavor") or "a10g-large" | |
| if not BUCKET_RE.match(source): | |
| raise HTTPException(400, "source must look like 'namespace/bucket'") | |
| if not MODEL_RE.match(model_id): | |
| raise HTTPException(400, "model_id must look like 'org/name'") | |
| if n_traces < 1 or n_traces > 100: | |
| raise HTTPException(400, "n_traces must be 1..100") | |
| source_url = f"hf://buckets/{source}/{prefix}/" if prefix else f"hf://buckets/{source}/" | |
| output_url = f"hf://buckets/{source}/trace-analysis/" | |
| vectors_url = f"hf://buckets/{VECTORS_BUCKET}/{model_id}/" | |
| job = api.run_uv_job( | |
| script=ANALYZE_SCRIPT_URL, | |
| script_args=[ | |
| "--source", source_url, | |
| "--model", model_id, | |
| "--vectors", vectors_url, | |
| "--n-traces", str(n_traces), | |
| "--output", output_url, | |
| ], | |
| dependencies=[ | |
| "transformers>=4.45", | |
| "torch", | |
| "huggingface_hub>=1.7", | |
| "safetensors", | |
| "numpy", | |
| "pyarrow", | |
| ], | |
| flavor=flavor, | |
| timeout="2h", | |
| labels={ | |
| "slyfox.io/type": "analyze", | |
| "slyfox.io/model": model_id, | |
| "slyfox.io/source": source, | |
| "slyfox.io/requester": x_user_id, | |
| }, | |
| ) | |
| record = _job_record( | |
| job.id, | |
| kind="analyze", | |
| params={ | |
| "source": source, | |
| "prefix": prefix, | |
| "n_traces": n_traces, | |
| "model_id": model_id, | |
| "flavor": flavor, | |
| "output": output_url, | |
| "vectors": vectors_url, | |
| }, | |
| requester=x_user_id, | |
| ) | |
| _write_json(f"jobs/{job.id}.json", record) | |
| return JSONResponse(record) | |
| def list_slyfox_jobs() -> JSONResponse: | |
| """List slyfox-submitted jobs (newest first) with live status merged in.""" | |
| records = _list_json("jobs") | |
| records.sort(key=lambda r: r.get("submitted_at", ""), reverse=True) | |
| out: list[dict] = [] | |
| for r in records: | |
| out.append({**r, "live": _hf_job_status(r["job_id"])}) | |
| return JSONResponse(out) | |
| def get_job(job_id: str) -> JSONResponse: | |
| record = _read_json(f"jobs/{job_id}.json") or {"job_id": job_id} | |
| return JSONResponse({**record, "live": _hf_job_status(job_id)}) | |
| def get_job_logs(job_id: str, tail: int = 200) -> JSONResponse: | |
| try: | |
| chunks = list(api.fetch_job_logs(job_id)) | |
| except Exception as e: # noqa: BLE001 | |
| raise HTTPException(404, f"logs unavailable: {e}") | |
| text = "".join(chunks) | |
| lines = text.splitlines() | |
| if tail and tail > 0: | |
| lines = lines[-tail:] | |
| return JSONResponse({"job_id": job_id, "lines": lines}) | |
| def cancel_slyfox_job(job_id: str, x_user_id: str = Header(...)) -> JSONResponse: | |
| _user_prefix(x_user_id) | |
| try: | |
| api.cancel_job(job_id) | |
| except Exception as e: # noqa: BLE001 | |
| raise HTTPException(404, f"cancel failed: {e}") | |
| return JSONResponse({"ok": True, "job_id": job_id}) | |
| def jobs_page() -> RedirectResponse: | |
| return RedirectResponse(url="/?tab=jobs", status_code=302) | |
| # ============================================================================= | |
| # Community API — placeholder surface for org-level / public trace sharing. | |
| # ============================================================================= | |
| # | |
| # v0 stubs reserve the URL shape so future PRs can land without renaming. | |
| # The intent (see COMMUNITY.md) is that orgs and users *opt in* to publishing | |
| # their trace bundles + analysis runs to a public index, which the Space | |
| # surfaces here so anyone can browse other maintainers' personas, remix | |
| # analyses with their own model picks, and (eventually) export | |
| # fine-tuning-friendly datasets from the published traces. | |
| COMMUNITY_INDEX_BUCKET = os.getenv("SLYFOX_COMMUNITY_BUCKET", "HF-slyfox/community-index") | |
| def community_runs() -> JSONResponse: | |
| """Return the public list of opt-in published analysis runs. | |
| v0: returns whatever has been written to | |
| hf://buckets/<SLYFOX_COMMUNITY_BUCKET>/runs/*.json | |
| Each entry is a manifest with {run_id, model_id, source, owner, summary}. | |
| """ | |
| try: | |
| files = list(api.list_bucket_files(COMMUNITY_INDEX_BUCKET, path="runs")) | |
| except Exception as e: # noqa: BLE001 | |
| log.info("community index not available yet: %s", e) | |
| return JSONResponse({"runs": [], "note": "community index not initialized"}) | |
| runs: list[dict] = [] | |
| for f in files: | |
| path = getattr(f, "path", None) or getattr(f, "rfilename", None) or str(f) | |
| if not path.endswith(".json"): | |
| continue | |
| # Read each manifest. The community bucket is *public*, so anonymous | |
| # reads should work; we still go via HfApi for consistency. | |
| import tempfile | |
| with tempfile.TemporaryDirectory() as tmp: | |
| local = Path(tmp) / "m.json" | |
| try: | |
| api.download_bucket_files(COMMUNITY_INDEX_BUCKET, files=[(path, str(local))]) | |
| runs.append(json.loads(local.read_text())) | |
| except Exception as e: # noqa: BLE001 | |
| log.warning("skip community entry %s: %s", path, e) | |
| return JSONResponse({"runs": runs}) | |
| def community_page() -> FileResponse: | |
| return FileResponse(ROOT / "community.html") | |
| # ============================================================================= | |
| # Persona feature pages (mock-data v1; wire to real APIs as data lands) | |
| # ============================================================================= | |
| def _persona_page(filename: str, tab: str, embed: str | None) -> Response: | |
| if embed: | |
| return FileResponse(ROOT / filename) | |
| return RedirectResponse(url=f"/?tab={tab}", status_code=302) | |
| def cards_page(embed: Optional[str] = None) -> Response: | |
| return _persona_page("cards.html", "cards", embed) | |
| def arena_page(embed: Optional[str] = None) -> Response: | |
| return _persona_page("arena.html", "arena", embed) | |
| def buddy_page(embed: Optional[str] = None) -> Response: | |
| return _persona_page("buddy.html", "buddy", embed) | |
| def mirror_page(embed: Optional[str] = None) -> Response: | |
| return _persona_page("mirror.html", "mirror", embed) | |
| def quests_page(embed: Optional[str] = None) -> Response: | |
| return _persona_page("quests.html", "quests", embed) | |
| def mood_ci_page(embed: Optional[str] = None) -> Response: | |
| return _persona_page("mood-ci.html", "mood-ci", embed) | |
| def marketplace_page(embed: Optional[str] = None) -> Response: | |
| return _persona_page("marketplace.html", "marketplace", embed) | |
| # ============================================================================= | |
| # Static frontend — mounted last so /api/* routes win. | |
| # ============================================================================= | |
| def index() -> FileResponse: | |
| return FileResponse(ROOT / "index.html") | |
| app.mount("/", StaticFiles(directory=str(ROOT)), name="static") | |