"""FastAPI edit server — serves the activity edit UI and writes sidecar .md files.""" from __future__ import annotations import json import re import shutil import time from pathlib import Path from typing import Any from fastapi import FastAPI, File, HTTPException, Request, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse # Populated by the CLI before uvicorn starts data_dir: Path | None = None site_url: str = "http://localhost:4321" strava_client_id: str = "" strava_client_secret: str = "" app = FastAPI(title="BincioActivity Edit Server", docs_url=None, redoc_url=None) # Allow localhost origins only — this server is never meant to be public app.add_middleware( CORSMiddleware, allow_origin_regex=r"https?://localhost(:\d+)?", allow_methods=["GET", "POST", "DELETE"], allow_headers=["Content-Type"], ) _VALID_ACTIVITY_ID = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9\-]{0,250}$') def _check_id(activity_id: str) -> str: """Reject activity IDs that contain path traversal sequences.""" if not _VALID_ACTIVITY_ID.match(activity_id): raise HTTPException(400, "Invalid activity ID") return activity_id SPORTS = ["cycling", "running", "hiking", "walking", "swimming", "skiing", "other"] STAT_PANELS = ["elevation", "speed", "heart_rate", "cadence", "power"] # ── HTML UI ─────────────────────────────────────────────────────────────────── _HTML = """\ Edit Activity
← Back to site

Edit Activity

Identity

Description

Display

__STAT_CHECKBOXES__

Images

Drop images here or click to upload
""" # ── Routes ──────────────────────────────────────────────────────────────────── def _get_data_dir() -> Path: if data_dir is None: raise HTTPException(500, "Edit server not configured (data_dir is None)") return data_dir @app.get("/") async def root() -> RedirectResponse: return RedirectResponse(url=site_url) @app.get("/edit/{activity_id}", response_class=HTMLResponse) async def edit_page(activity_id: str) -> str: sport_opts = "\n".join( f'' for s in SPORTS ) stat_cbs = "\n".join( f'' for s in STAT_PANELS ) html = ( _HTML .replace("__SITE_URL__", site_url) .replace("__SPORT_OPTIONS__", sport_opts) .replace("__STAT_CHECKBOXES__", stat_cbs) ) return html @app.get("/api/activity/{activity_id}") async def get_activity(activity_id: str) -> JSONResponse: dd = _get_data_dir() _check_id(activity_id) json_path = dd / "activities" / f"{activity_id}.json" if not json_path.exists(): raise HTTPException(404, f"Activity {activity_id!r} not found") detail: dict[str, Any] = json.loads(json_path.read_text(encoding="utf-8")) # Read existing sidecar if any — these are the "user" values shown in the form from bincio.render.merge import parse_sidecar sidecar_path = dd / "edits" / f"{activity_id}.md" fm: dict = {} body = "" if sidecar_path.exists(): fm, body = parse_sidecar(sidecar_path) # Existing uploaded images for this activity images_dir = dd / "edits" / "images" / activity_id images = sorted(p.name for p in images_dir.iterdir() if p.is_file()) if images_dir.exists() else [] return JSONResponse({ "id": activity_id, "started_at": detail.get("started_at", ""), "title": fm.get("title", detail.get("title", "")), "sport": fm.get("sport", detail.get("sport", "other")), "gear": fm.get("gear", detail.get("gear") or ""), "description": body or fm.get("description") or detail.get("description") or "", "highlight": fm.get("highlight", detail.get("custom", {}).get("highlight", False)), "private": fm.get("private", detail.get("privacy") == "private"), "hide_stats": fm.get("hide_stats", detail.get("custom", {}).get("hide_stats", [])), "images": images, }) @app.post("/api/activity/{activity_id}") async def save_activity(activity_id: str, payload: dict[str, Any]) -> JSONResponse: dd = _get_data_dir() _check_id(activity_id) if not (dd / "activities" / f"{activity_id}.json").exists(): raise HTTPException(404, f"Activity {activity_id!r} not found") edits_dir = dd / "edits" edits_dir.mkdir(exist_ok=True) sidecar_path = edits_dir / f"{activity_id}.md" lines: list[str] = [] if payload.get("title"): lines.append(f"title: {json.dumps(payload['title'])}") if payload.get("sport") and payload["sport"] in SPORTS and payload["sport"] != "other": lines.append(f"sport: {payload['sport']}") if payload.get("gear"): lines.append(f"gear: {json.dumps(payload['gear'])}") if payload.get("highlight"): lines.append("highlight: true") if payload.get("private"): lines.append("private: true") hide = [s for s in (payload.get("hide_stats") or []) if s in STAT_PANELS] if hide: lines.append(f"hide_stats: [{', '.join(hide)}]") description = (payload.get("description") or "").strip() content = "---\n" + "\n".join(lines) + "\n---\n" if description: content += "\n" + description + "\n" sidecar_path.write_text(content, encoding="utf-8") # Re-merge so the Astro dev server immediately serves updated data from bincio.render.merge import merge_all merge_all(dd) return JSONResponse({"ok": True, "sidecar": str(sidecar_path)}) @app.post("/api/activity/{activity_id}/images") async def upload_image(activity_id: str, file: UploadFile = File(...)) -> JSONResponse: dd = _get_data_dir() _check_id(activity_id) if not (dd / "activities" / f"{activity_id}.json").exists(): raise HTTPException(404, f"Activity {activity_id!r} not found") if not file.filename: raise HTTPException(400, "No filename") images_dir = dd / "edits" / "images" / activity_id images_dir.mkdir(parents=True, exist_ok=True) safe_name = Path(file.filename).name # Only allow image content types ct = file.content_type or "" if not ct.startswith("image/"): raise HTTPException(400, f"Only image files are accepted (got {ct})") dest = images_dir / safe_name dest.write_bytes(await file.read()) return JSONResponse({"ok": True, "filename": dest.name}) @app.get("/api/athlete") async def get_athlete() -> JSONResponse: dd = _get_data_dir() athlete_path = dd / "athlete.json" if not athlete_path.exists(): raise HTTPException(404, "athlete.json not found — run bincio extract first") data = json.loads(athlete_path.read_text(encoding="utf-8")) # Layer edits/athlete.yaml overrides on top overrides = _read_athlete_edits(dd) for key in ("max_hr", "ftp_w", "hr_zones", "power_zones", "seasons", "gear"): if key in overrides: data[key] = overrides[key] return JSONResponse({ "max_hr": data.get("max_hr"), "ftp_w": data.get("ftp_w"), "hr_zones": data.get("hr_zones"), "power_zones": data.get("power_zones"), "seasons": data.get("seasons", []), "gear": data.get("gear", {}), }) @app.post("/api/athlete") async def save_athlete(payload: dict[str, Any]) -> JSONResponse: dd = _get_data_dir() athlete_path = dd / "athlete.json" if not athlete_path.exists(): raise HTTPException(404, "athlete.json not found — run bincio extract first") # Write edits/athlete.yaml with validated fields edits_dir = dd / "edits" edits_dir.mkdir(exist_ok=True) overrides: dict[str, Any] = {} if payload.get("max_hr") is not None: overrides["max_hr"] = int(payload["max_hr"]) if payload.get("ftp_w") is not None: overrides["ftp_w"] = int(payload["ftp_w"]) if payload.get("hr_zones") is not None: overrides["hr_zones"] = [[int(lo), int(hi)] for lo, hi in payload["hr_zones"]] if payload.get("power_zones") is not None: overrides["power_zones"] = [[int(lo), int(hi)] for lo, hi in payload["power_zones"]] if payload.get("seasons") is not None: overrides["seasons"] = [ {"name": str(s["name"]), "start": str(s["start"]), "end": str(s["end"])} for s in payload["seasons"] ] if payload.get("gear") is not None: overrides["gear"] = payload["gear"] import yaml (edits_dir / "athlete.yaml").write_text( yaml.dump(overrides, allow_unicode=True, default_flow_style=False), encoding="utf-8", ) # Re-merge — merge_all() applies edits/athlete.yaml on top of athlete.json from bincio.render.merge import merge_all merge_all(dd) return JSONResponse({"ok": True}) def _read_athlete_edits(data_dir: Path) -> dict: path = data_dir / "edits" / "athlete.yaml" if not path.exists(): return {} try: import yaml return yaml.safe_load(path.read_text(encoding="utf-8")) or {} except Exception: return {} _SUPPORTED_SUFFIXES = {".fit", ".gpx", ".tcx", ".fit.gz", ".gpx.gz", ".tcx.gz"} def _file_suffix(name: str) -> str: """Return the effective suffix, including .gz double-extension.""" p = Path(name.lower()) if p.suffix == ".gz": return p.stem.rsplit(".", 1)[-1].join([".", ".gz"]) if "." in p.stem else ".gz" return p.suffix @app.post("/api/upload") async def upload_activity(file: UploadFile = File(...)) -> JSONResponse: """Accept a FIT/GPX/TCX file, extract it, update index.json, and re-merge.""" dd = _get_data_dir() name = Path(file.filename or "upload.fit").name # strip any path components suffix = _file_suffix(name) if suffix not in _SUPPORTED_SUFFIXES: raise HTTPException(400, f"Unsupported file type '{Path(name).suffix}'. Expected FIT, GPX, or TCX.") _MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB contents = await file.read() if len(contents) > _MAX_UPLOAD_BYTES: raise HTTPException(413, f"File too large ({len(contents)} bytes). Maximum is 50 MB.") staging = dd / "_uploads" staging.mkdir(exist_ok=True) staged = staging / name staged.write_bytes(contents) try: from bincio.extract.metrics import compute from bincio.extract.parsers.factory import parse_file from bincio.extract.writer import build_summary, make_activity_id, write_activity, write_index activity = parse_file(staged) metrics = compute(activity) activity_id = make_activity_id(activity) existing_json = dd / "activities" / f"{activity_id}.json" if existing_json.exists(): raise HTTPException(409, f"Activity already exists: {activity_id}") write_activity(activity, metrics, dd, privacy="public", rdp_epsilon=0.0001) summary = build_summary(activity, metrics, activity_id, "public") # Read current index to preserve owner + existing summaries index_path = dd / "index.json" if index_path.exists(): index_data = json.loads(index_path.read_text(encoding="utf-8")) else: index_data = {"owner": {"handle": "unknown"}, "activities": []} owner = index_data.get("owner", {}) existing = {s["id"]: s for s in index_data.get("activities", [])} existing[activity_id] = summary write_index(list(existing.values()), dd, owner) from bincio.render.merge import merge_all merge_all(dd) except HTTPException: raise except Exception as exc: raise HTTPException(422, f"Failed to process activity file: {type(exc).__name__}") finally: staged.unlink(missing_ok=True) return JSONResponse({"ok": True, "id": activity_id}) @app.post("/api/import-bas") async def import_bas(payload: dict[str, Any]) -> JSONResponse: """Accept a pre-converted BAS detail JSON (from the /convert/ page) and save it.""" dd = _get_data_dir() detail = payload.get("detail") geojson = payload.get("geojson") if not isinstance(detail, dict) or not detail.get("id"): raise HTTPException(400, "Missing or invalid 'detail' field") activity_id = detail["id"] _check_id(activity_id) acts_dir = dd / "activities" acts_dir.mkdir(exist_ok=True) dest = acts_dir / f"{activity_id}.json" if dest.exists(): raise HTTPException(409, f"Activity already exists: {activity_id}") dest.write_text(json.dumps(detail, indent=2, ensure_ascii=False)) if geojson: (acts_dir / f"{activity_id}.geojson").write_text( json.dumps(geojson, indent=2, ensure_ascii=False) ) # Rebuild index index_path = dd / "index.json" if index_path.exists(): index_data = json.loads(index_path.read_text(encoding="utf-8")) else: index_data = {"owner": {"handle": "unknown"}, "activities": []} owner = index_data.get("owner", {}) existing = {s["id"]: s for s in index_data.get("activities", [])} # Build a minimal summary from the detail summary_keys = [ "id", "title", "sport", "sub_sport", "started_at", "distance_m", "duration_s", "moving_time_s", "elevation_gain_m", "avg_speed_kmh", "avg_hr_bpm", "avg_cadence_rpm", "avg_power_w", "privacy", "detail_url", "track_url", "preview_coords", "highlight", "duplicate_of", ] summary = {k: detail[k] for k in summary_keys if k in detail} existing[activity_id] = summary from bincio.extract.writer import write_index write_index(list(existing.values()), dd, owner) from bincio.render.merge import merge_all merge_all(dd) return JSONResponse({"ok": True, "id": activity_id}) @app.delete("/api/activity/{activity_id}/images/{filename}") async def delete_image(activity_id: str, filename: str) -> JSONResponse: dd = _get_data_dir() _check_id(activity_id) safe_name = Path(filename).name # strip any path traversal if not safe_name: raise HTTPException(400, "Invalid filename") target = dd / "edits" / "images" / activity_id / safe_name if target.exists() and target.is_file(): target.unlink() # Remove empty parent dir if not any(target.parent.iterdir()): shutil.rmtree(target.parent) return JSONResponse({"ok": True}) # ── Strava sync ─────────────────────────────────────────────────────────────── @app.get("/api/strava/status") async def strava_status() -> JSONResponse: """Return whether Strava is configured and whether a token is stored.""" dd = _get_data_dir() from bincio.extract.strava_api import load_token token = load_token(dd) return JSONResponse({ "configured": bool(strava_client_id), "connected": token is not None, "last_sync": token.get("last_sync_at") if token else None, }) @app.get("/api/strava/auth-url") async def strava_auth_url(request: Request) -> JSONResponse: """Return the Strava OAuth URL the browser should open.""" if not strava_client_id: raise HTTPException(400, "Strava client ID not configured. Pass --strava-client-id to bincio edit.") redirect_uri = str(request.url_for("strava_callback")) from bincio.extract.strava_api import auth_url return JSONResponse({"url": auth_url(strava_client_id, redirect_uri)}) @app.get("/api/strava/callback", name="strava_callback") async def strava_callback(code: str = "", error: str = "") -> RedirectResponse: """Strava OAuth callback — exchange code for token then redirect to the site.""" if error or not code: return RedirectResponse(f"{site_url}?strava=error") if not strava_client_id or not strava_client_secret: return RedirectResponse(f"{site_url}?strava=error") dd = _get_data_dir() from bincio.extract.strava_api import StravaError, exchange_code, save_token try: token = exchange_code(strava_client_id, strava_client_secret, code) except StravaError: return RedirectResponse(f"{site_url}?strava=error") # Stamp last_sync_at at connect time so the first sync only fetches new activities token.setdefault("last_sync_at", int(time.time())) save_token(dd, token) return RedirectResponse(f"{site_url}?strava=connected") @app.post("/api/strava/sync") async def strava_sync() -> JSONResponse: """Fetch new Strava activities since last sync and add them to the data store.""" if not strava_client_id or not strava_client_secret: raise HTTPException(400, "Strava not configured. Pass --strava-client-id and --strava-client-secret to bincio edit.") dd = _get_data_dir() from bincio.extract.strava_api import ( StravaError, ensure_fresh, fetch_activities, fetch_streams, save_token, strava_to_parsed, ) try: token = ensure_fresh(dd, strava_client_id, strava_client_secret) except StravaError as e: raise HTTPException(502, str(e)) after: int | None = token.get("last_sync_at") try: activities = fetch_activities(token["access_token"], after=after) except StravaError as e: raise HTTPException(502, str(e)) from bincio.extract.metrics import compute from bincio.extract.writer import build_summary, make_activity_id, write_activity, write_index from bincio.extract.strava_api import strava_meta_to_partial from bincio.render.merge import merge_all # Load existing index once index_path = dd / "index.json" if index_path.exists(): index_data = json.loads(index_path.read_text(encoding="utf-8")) else: index_data = {"owner": {"handle": "unknown"}, "activities": []} owner = index_data.get("owner", {}) summaries: dict[str, dict] = {s["id"]: s for s in index_data.get("activities", [])} imported = 0 skipped = 0 errors: list[str] = [] for meta in activities: try: # Compute ID from meta alone (no API call) to skip already-known activities activity_id = make_activity_id(strava_meta_to_partial(meta)) if (dd / "activities" / f"{activity_id}.json").exists(): skipped += 1 continue # Only fetch streams for genuinely new activities streams = fetch_streams(token["access_token"], meta["id"]) parsed = strava_to_parsed(meta, streams) metrics = compute(parsed) write_activity(parsed, metrics, dd, privacy="public", rdp_epsilon=0.0001) summaries[activity_id] = build_summary(parsed, metrics, activity_id, "public") imported += 1 except Exception as exc: errors.append(f"{meta.get('id')}: {type(exc).__name__}") if imported: write_index(list(summaries.values()), dd, owner) merge_all(dd) token["last_sync_at"] = int(time.time()) save_token(dd, token) return JSONResponse({"ok": True, "imported": imported, "skipped": skipped, "errors": errors[:5]})