"""Slyfox backend — serves index.html and persists chats to an HF Bucket. Layout in the bucket (one file per chat): users//chats/.json users//index.json # ordered list of chat ids + titles + topics jobs/.json # slyfox-submitted hf jobs (extract / analyze) `` is currently an anonymous UUID minted in the browser (stored in localStorage) and sent as `X-User-Id`. This is intentionally minimal — swap for HF OAuth identity later by reading the OAuth headers Spaces inject. """ from __future__ import annotations import io import json import logging import os import re from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional from fastapi import FastAPI, HTTPException, Header, Request from fastapi.responses import FileResponse, JSONResponse, RedirectResponse, Response from fastapi.staticfiles import StaticFiles from huggingface_hub import HfApi from huggingface_hub.errors import EntryNotFoundError logging.basicConfig(level=logging.INFO) log = logging.getLogger("slyfox") BUCKET_ID = os.getenv("SLYFOX_BUCKET", "HF-slyfox/slyfox-chats") HF_TOKEN = os.getenv("HF_TOKEN") # Spaces inject this automatically. # Job submission config — URLs of UV scripts that run on hf jobs infra. SLYFOX_JOBS_REF = os.getenv("SLYFOX_JOBS_REF", "arthur-work") # branch / tag of the slyfox repo (`main` once promoted) JOBS_SCRIPT_BASE = f"https://raw.githubusercontent.com/huggingface/slyfox/{SLYFOX_JOBS_REF}/jobs_scripts" EXTRACT_SCRIPT_URL = f"{JOBS_SCRIPT_BASE}/extract_vectors.py" ANALYZE_SCRIPT_URL = f"{JOBS_SCRIPT_BASE}/analyze_traces.py" VECTORS_BUCKET = os.getenv("SLYFOX_VECTORS_BUCKET", "HF-slyfox/emotion-vectors") api = HfApi(token=HF_TOKEN) ROOT = Path(__file__).parent USER_RE = re.compile(r"^[A-Za-z0-9_-]{8,64}$") CHAT_RE = re.compile(r"^[A-Za-z0-9_-]{8,64}$") MODEL_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._\-]*/[A-Za-z0-9][A-Za-z0-9._\-]*$") BUCKET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._\-]*/[A-Za-z0-9][A-Za-z0-9._\-]*$") def _user_prefix(user_id: str) -> str: if not USER_RE.match(user_id): raise HTTPException(400, "invalid user id") return f"users/{user_id}" def _chat_path(user_id: str, chat_id: str) -> str: if not CHAT_RE.match(chat_id): raise HTTPException(400, "invalid chat id") return f"{_user_prefix(user_id)}/chats/{chat_id}.json" def _index_path(user_id: str) -> str: return f"{_user_prefix(user_id)}/index.json" def _read_json(remote_path: str) -> Any | None: """Return the JSON object stored at `remote_path`, or None if missing.""" try: meta = api.get_bucket_file_metadata(BUCKET_ID, remote_path) except EntryNotFoundError: return None except Exception as e: # noqa: BLE001 log.warning("metadata failed for %s: %s", remote_path, e) return None import tempfile with tempfile.TemporaryDirectory() as tmp: local = Path(tmp) / "blob.json" try: api.download_bucket_files( BUCKET_ID, files=[(remote_path, str(local))], ) except EntryNotFoundError: return None return json.loads(local.read_text()) def _write_json(remote_path: str, payload: Any) -> None: data = json.dumps(payload, ensure_ascii=False).encode("utf-8") api.batch_bucket_files(BUCKET_ID, add=[(data, remote_path)]) def _list_json(prefix: str) -> list[dict]: """List JSON objects directly under `prefix/` in the bucket.""" try: files = list(api.list_bucket_files(BUCKET_ID, path=prefix)) except Exception as e: # noqa: BLE001 log.warning("list failed for %s: %s", prefix, e) return [] out: list[dict] = [] for f in files: path = getattr(f, "path", None) or getattr(f, "rfilename", None) or str(f) if not path.endswith(".json"): continue data = _read_json(path) if isinstance(data, dict): out.append(data) return out def _delete(remote_path: str) -> None: try: api.batch_bucket_files(BUCKET_ID, delete=[remote_path]) except Exception as e: # noqa: BLE001 log.warning("delete failed for %s: %s", remote_path, e) def _now_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") app = FastAPI(title="Slyfox") # ============================================================================= # Existing chat persistence API (unchanged) # ============================================================================= @app.get("/api/chats") def list_chats(x_user_id: str = Header(...)) -> JSONResponse: """Return the per-user index: [{id, title, topic, updated_at}, ...].""" idx = _read_json(_index_path(x_user_id)) or [] return JSONResponse(idx) @app.get("/api/chats/{chat_id}") def get_chat(chat_id: str, x_user_id: str = Header(...)) -> JSONResponse: data = _read_json(_chat_path(x_user_id, chat_id)) if data is None: raise HTTPException(404, "chat not found") return JSONResponse(data) @app.put("/api/chats/{chat_id}") async def put_chat(chat_id: str, request: Request, x_user_id: str = Header(...)) -> JSONResponse: body = await request.json() if body.get("id") != chat_id: raise HTTPException(400, "chat id mismatch") _write_json(_chat_path(x_user_id, chat_id), body) idx = _read_json(_index_path(x_user_id)) or [] idx = [e for e in idx if e.get("id") != chat_id] idx.insert( 0, { "id": chat_id, "title": body.get("title", "Untitled"), "topic": body.get("topic"), "repo": body.get("repo"), "updated_at": body.get("updated_at"), }, ) _write_json(_index_path(x_user_id), idx) return JSONResponse({"ok": True}) @app.delete("/api/chats/{chat_id}") def delete_chat(chat_id: str, x_user_id: str = Header(...)) -> JSONResponse: _delete(_chat_path(x_user_id, chat_id)) idx = _read_json(_index_path(x_user_id)) or [] idx = [e for e in idx if e.get("id") != chat_id] _write_json(_index_path(x_user_id), idx) return JSONResponse({"ok": True}) @app.get("/healthz") def healthz() -> dict: return {"ok": True, "bucket": BUCKET_ID} # ============================================================================= # Jobs API — emotion-vector extraction + trace analysis via `hf jobs` # ============================================================================= # # Two job kinds, both submitted via `HfApi.run_uv_job`: # # extract : take a model_id, run EmotionScope's vector extraction, write # the 20 emotion direction vectors to a bucket under # // # # analyze : take a traces source (bucket+prefix) and a model_id, replay # sessions through the model, project each turn's residual stream # onto the emotion vectors, write per-trace parquet results under # /trace-analysis// # # A small sidecar record is written to the chats bucket under `jobs/.json` # so the Space can list its own jobs. Live status / logs are pulled directly # from the HF jobs API on demand. def _job_record(job_id: str, *, kind: str, params: dict, requester: str) -> dict: return { "job_id": job_id, "kind": kind, "params": params, "requester": requester, "submitted_at": _now_iso(), } def _hf_job_status(job_id: str) -> dict: """Live status snapshot for one job. Falls back gracefully on errors.""" try: info = api.inspect_job(job_id) except Exception as e: # noqa: BLE001 return {"job_id": job_id, "status": "unknown", "error": str(e)} started = getattr(info, "started_at", None) ended = getattr(info, "ended_at", None) elapsed = None if started: end = ended or datetime.now(timezone.utc) try: elapsed = int((end - started).total_seconds()) except Exception: # noqa: BLE001 elapsed = None return { "job_id": job_id, "status": getattr(info, "status", None) or getattr(info, "stage", "unknown"), "flavor": getattr(info, "flavor", None), "started_at": started.isoformat() if hasattr(started, "isoformat") else started, "ended_at": ended.isoformat() if hasattr(ended, "isoformat") else ended, "elapsed_s": elapsed, } @app.post("/api/models/extract") async def request_model_extract(request: Request, x_user_id: str = Header(...)) -> JSONResponse: """Submit an emotion-vector extraction job for a HuggingFace model.""" _user_prefix(x_user_id) body = await request.json() model_id = (body.get("model_id") or "").strip() flavor = body.get("flavor") or "a10g-large" if not MODEL_RE.match(model_id): raise HTTPException(400, "model_id must look like 'org/name'") output = f"hf://buckets/{VECTORS_BUCKET}/{model_id}/" job = api.run_uv_job( script=EXTRACT_SCRIPT_URL, script_args=["--model", model_id, "--output", output], dependencies=[ "transformers>=4.45", "torch", "huggingface_hub>=1.7", "scikit-learn", "safetensors", "numpy", ], flavor=flavor, timeout="45m", labels={ "slyfox.io/type": "extract", "slyfox.io/model": model_id, "slyfox.io/requester": x_user_id, }, ) record = _job_record( job.id, kind="extract", params={"model_id": model_id, "flavor": flavor, "output": output}, requester=x_user_id, ) _write_json(f"jobs/{job.id}.json", record) return JSONResponse(record) @app.post("/api/analyses") async def request_analysis(request: Request, x_user_id: str = Header(...)) -> JSONResponse: """Submit a trace-analysis job: replay sessions through a model and emit emotion trajectories.""" _user_prefix(x_user_id) body = await request.json() source = (body.get("source") or "").strip() # bucket id, e.g. "HF-slyfox/traces" prefix = (body.get("prefix") or "").strip().strip("/") n_traces = int(body.get("n_traces") or 5) model_id = (body.get("model_id") or "").strip() flavor = body.get("flavor") or "a10g-large" if not BUCKET_RE.match(source): raise HTTPException(400, "source must look like 'namespace/bucket'") if not MODEL_RE.match(model_id): raise HTTPException(400, "model_id must look like 'org/name'") if n_traces < 1 or n_traces > 100: raise HTTPException(400, "n_traces must be 1..100") source_url = f"hf://buckets/{source}/{prefix}/" if prefix else f"hf://buckets/{source}/" output_url = f"hf://buckets/{source}/trace-analysis/" vectors_url = f"hf://buckets/{VECTORS_BUCKET}/{model_id}/" job = api.run_uv_job( script=ANALYZE_SCRIPT_URL, script_args=[ "--source", source_url, "--model", model_id, "--vectors", vectors_url, "--n-traces", str(n_traces), "--output", output_url, ], dependencies=[ "transformers>=4.45", "torch", "huggingface_hub>=1.7", "safetensors", "numpy", "pyarrow", ], flavor=flavor, timeout="2h", labels={ "slyfox.io/type": "analyze", "slyfox.io/model": model_id, "slyfox.io/source": source, "slyfox.io/requester": x_user_id, }, ) record = _job_record( job.id, kind="analyze", params={ "source": source, "prefix": prefix, "n_traces": n_traces, "model_id": model_id, "flavor": flavor, "output": output_url, "vectors": vectors_url, }, requester=x_user_id, ) _write_json(f"jobs/{job.id}.json", record) return JSONResponse(record) @app.get("/api/jobs") def list_slyfox_jobs() -> JSONResponse: """List slyfox-submitted jobs (newest first) with live status merged in.""" records = _list_json("jobs") records.sort(key=lambda r: r.get("submitted_at", ""), reverse=True) out: list[dict] = [] for r in records: out.append({**r, "live": _hf_job_status(r["job_id"])}) return JSONResponse(out) @app.get("/api/jobs/{job_id}") def get_job(job_id: str) -> JSONResponse: record = _read_json(f"jobs/{job_id}.json") or {"job_id": job_id} return JSONResponse({**record, "live": _hf_job_status(job_id)}) @app.get("/api/jobs/{job_id}/logs") def get_job_logs(job_id: str, tail: int = 200) -> JSONResponse: try: chunks = list(api.fetch_job_logs(job_id)) except Exception as e: # noqa: BLE001 raise HTTPException(404, f"logs unavailable: {e}") text = "".join(chunks) lines = text.splitlines() if tail and tail > 0: lines = lines[-tail:] return JSONResponse({"job_id": job_id, "lines": lines}) @app.post("/api/jobs/{job_id}/cancel") def cancel_slyfox_job(job_id: str, x_user_id: str = Header(...)) -> JSONResponse: _user_prefix(x_user_id) try: api.cancel_job(job_id) except Exception as e: # noqa: BLE001 raise HTTPException(404, f"cancel failed: {e}") return JSONResponse({"ok": True, "job_id": job_id}) @app.get("/jobs", include_in_schema=False) def jobs_page() -> RedirectResponse: return RedirectResponse(url="/?tab=jobs", status_code=302) # ============================================================================= # Community API — placeholder surface for org-level / public trace sharing. # ============================================================================= # # v0 stubs reserve the URL shape so future PRs can land without renaming. # The intent (see COMMUNITY.md) is that orgs and users *opt in* to publishing # their trace bundles + analysis runs to a public index, which the Space # surfaces here so anyone can browse other maintainers' personas, remix # analyses with their own model picks, and (eventually) export # fine-tuning-friendly datasets from the published traces. COMMUNITY_INDEX_BUCKET = os.getenv("SLYFOX_COMMUNITY_BUCKET", "HF-slyfox/community-index") @app.get("/api/community/runs") def community_runs() -> JSONResponse: """Return the public list of opt-in published analysis runs. v0: returns whatever has been written to hf://buckets//runs/*.json Each entry is a manifest with {run_id, model_id, source, owner, summary}. """ try: files = list(api.list_bucket_files(COMMUNITY_INDEX_BUCKET, path="runs")) except Exception as e: # noqa: BLE001 log.info("community index not available yet: %s", e) return JSONResponse({"runs": [], "note": "community index not initialized"}) runs: list[dict] = [] for f in files: path = getattr(f, "path", None) or getattr(f, "rfilename", None) or str(f) if not path.endswith(".json"): continue # Read each manifest. The community bucket is *public*, so anonymous # reads should work; we still go via HfApi for consistency. import tempfile with tempfile.TemporaryDirectory() as tmp: local = Path(tmp) / "m.json" try: api.download_bucket_files(COMMUNITY_INDEX_BUCKET, files=[(path, str(local))]) runs.append(json.loads(local.read_text())) except Exception as e: # noqa: BLE001 log.warning("skip community entry %s: %s", path, e) return JSONResponse({"runs": runs}) @app.get("/community", include_in_schema=False) def community_page() -> FileResponse: return FileResponse(ROOT / "community.html") # ============================================================================= # Persona feature pages (mock-data v1; wire to real APIs as data lands) # ============================================================================= def _persona_page(filename: str, tab: str, embed: str | None) -> Response: if embed: return FileResponse(ROOT / filename) return RedirectResponse(url=f"/?tab={tab}", status_code=302) @app.get("/cards", include_in_schema=False, response_model=None) def cards_page(embed: Optional[str] = None) -> Response: return _persona_page("cards.html", "cards", embed) @app.get("/arena", include_in_schema=False, response_model=None) def arena_page(embed: Optional[str] = None) -> Response: return _persona_page("arena.html", "arena", embed) @app.get("/buddy", include_in_schema=False, response_model=None) def buddy_page(embed: Optional[str] = None) -> Response: return _persona_page("buddy.html", "buddy", embed) @app.get("/mirror", include_in_schema=False, response_model=None) def mirror_page(embed: Optional[str] = None) -> Response: return _persona_page("mirror.html", "mirror", embed) @app.get("/quests", include_in_schema=False, response_model=None) def quests_page(embed: Optional[str] = None) -> Response: return _persona_page("quests.html", "quests", embed) @app.get("/mood-ci", include_in_schema=False, response_model=None) def mood_ci_page(embed: Optional[str] = None) -> Response: return _persona_page("mood-ci.html", "mood-ci", embed) @app.get("/marketplace", include_in_schema=False, response_model=None) def marketplace_page(embed: Optional[str] = None) -> Response: return _persona_page("marketplace.html", "marketplace", embed) # ============================================================================= # Static frontend — mounted last so /api/* routes win. # ============================================================================= @app.get("/") def index() -> FileResponse: return FileResponse(ROOT / "index.html") app.mount("/", StaticFiles(directory=str(ROOT)), name="static")