"""Activity CRUD and athlete endpoints.""" from __future__ import annotations import json from pathlib import Path from typing import Any from fastapi import APIRouter, Cookie, Depends, File, HTTPException, Request, UploadFile from fastapi.responses import JSONResponse from bincio.serve import deps, tasks from bincio.serve.models import ActivityEditRequest, ActivityEditResponse, GenericResponse from bincio.serve.db import User from bincio.shared.images import ( ALLOWED_IMAGE_TYPES as _ALLOWED_IMAGE_TYPES, MAX_IMAGE_BYTES as _MAX_IMAGE_BYTES, unique_image_name as _unique_image_name, ) router = APIRouter() def _user_data_dir(handle: str) -> Path: """Return the merged data dir for a user, for reading activity files.""" dd = deps._get_data_dir() merged = dd / handle / "_merged" return merged if merged.exists() else dd / handle def _require_owns(activity_id: str, user: User) -> Path: """Verify the user owns this activity (it lives in their data dir).""" activity_path = _user_data_dir(user.handle) / "activities" / f"{activity_id}.json" if not activity_path.exists(): raise HTTPException(404, "Activity not found") return activity_path @router.get("/api/activity/{activity_id}/geojson") async def get_activity_geojson( activity_id: str, user: User = Depends(deps._require_auth), ) -> JSONResponse: """Return GeoJSON track for an activity (mobile detail screen).""" deps._check_id(activity_id) dd = deps._get_data_dir() user_dir = dd / user.handle for base in (user_dir / "_merged" / "activities", user_dir / "activities"): p = base / f"{activity_id}.geojson" if p.exists(): return JSONResponse(json.loads(p.read_text())) raise HTTPException(404, "GeoJSON not found") @router.get("/api/activity/{activity_id}/timeseries") async def get_activity_timeseries( activity_id: str, user: User = Depends(deps._require_auth), ) -> JSONResponse: """Return timeseries for an activity (mobile detail screen).""" deps._check_id(activity_id) dd = deps._get_data_dir() user_dir = dd / user.handle for base in (user_dir / "_merged" / "activities", user_dir / "activities"): p = base / f"{activity_id}.timeseries.json" if p.exists(): return JSONResponse(json.loads(p.read_text())) raise HTTPException(404, "Timeseries not found") @router.get("/api/activity/{activity_id}") async def get_activity( activity_id: str, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: user = deps._require_user(bincio_session) deps._check_id(activity_id) path = _require_owns(activity_id, user) detail = json.loads(path.read_text()) # Normalise for EditDrawer: add `private` bool so the drawer works regardless # of whether the raw JSON uses the old "private" or the new "unlisted" value. detail["private"] = detail.get("privacy") in ("private", "unlisted") return JSONResponse(detail) @router.post("/api/activity/{activity_id}", response_model=ActivityEditResponse) async def post_activity( activity_id: str, edit_req: ActivityEditRequest, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: user = deps._require_user(bincio_session) deps._check_id(activity_id) dd = deps._get_data_dir() / user.handle # Verify the activity belongs to this user before writing if not (dd / "activities" / f"{activity_id}.json").exists(): raise HTTPException(404, "Activity not found") from bincio.edit.ops import apply_sidecar_edit body = edit_req.model_dump(exclude_none=True) apply_sidecar_edit(activity_id, body, dd) tasks._trigger_rebuild(user.handle) return JSONResponse({"ok": True}) @router.post("/api/activity/{activity_id}/recalculate-elevation/dem") async def recalculate_elevation_dem_endpoint( activity_id: str, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: """Replace GPS altitude with DEM terrain elevation and recompute gain/loss. Requires --dem-url to be set when starting bincio serve. """ user = deps._require_user(bincio_session) deps._check_id(activity_id) if not deps.dem_url: raise HTTPException(503, "DEM URL not configured.") dd = deps._get_data_dir() / user.handle if not (dd / "activities" / f"{activity_id}.json").exists(): raise HTTPException(404, "Activity not found") try: from bincio.extract.dem import recalculate_elevation from bincio.render.merge import merge_one result = recalculate_elevation(dd, activity_id, deps.dem_url) merge_one(dd, activity_id) tasks._trigger_rebuild(user.handle) return JSONResponse(result) except FileNotFoundError as e: raise HTTPException(404, str(e)) except ValueError as e: raise HTTPException(422, str(e)) @router.post("/api/activity/{activity_id}/recalculate-elevation/hysteresis") async def recalculate_elevation_hysteresis_endpoint( activity_id: str, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: """Recompute gain/loss from original recorded elevation using source-aware hysteresis.""" user = deps._require_user(bincio_session) deps._check_id(activity_id) dd = deps._get_data_dir() / user.handle if not (dd / "activities" / f"{activity_id}.json").exists(): raise HTTPException(404, "Activity not found") try: from bincio.extract.dem import recalculate_elevation_hysteresis from bincio.render.merge import merge_one result = recalculate_elevation_hysteresis(dd, activity_id) merge_one(dd, activity_id) tasks._trigger_rebuild(user.handle) return JSONResponse(result) except FileNotFoundError as e: raise HTTPException(404, str(e)) except ValueError as e: raise HTTPException(422, str(e)) @router.delete("/api/activity/{activity_id}", response_model=GenericResponse) async def delete_activity( activity_id: str, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: """Delete a single activity and all associated files for the logged-in user.""" user = deps._require_user(bincio_session) deps._check_id(activity_id) dd = deps._get_data_dir() / user.handle acts_dir = dd / "activities" json_path = acts_dir / f"{activity_id}.json" if not json_path.exists(): raise HTTPException(404, "Activity not found") import shutil # Remove the source files (activities dir) for suffix in (".json", ".geojson", ".timeseries.json"): p = acts_dir / f"{activity_id}{suffix}" p.unlink(missing_ok=True) # Remove sidecar edit and images sidecar = dd / "edits" / f"{activity_id}.md" sidecar.unlink(missing_ok=True) images_dir = dd / "edits" / "images" / activity_id if images_dir.exists(): shutil.rmtree(images_dir) # Remove from the extract-level flat index so merge_all doesn't re-add # the summary even though the detail file is gone. index_path = dd / "index.json" if index_path.exists(): try: idx = json.loads(index_path.read_text(encoding="utf-8")) idx["activities"] = [a for a in idx.get("activities", []) if a.get("id") != activity_id] index_path.write_text(json.dumps(idx, indent=2, ensure_ascii=False)) except (OSError, json.JSONDecodeError): pass # corrupt index — merge_all will clean up on next run # Remove from dedup cache so the file can be re-uploaded if needed cache_path = dd / ".bincio_cache.json" if cache_path.exists(): try: cache = json.loads(cache_path.read_text(encoding="utf-8")) if isinstance(cache, dict) and "activities" in cache: cache["activities"] = [ a for a in cache["activities"] if a.get("id") != activity_id ] cache_path.write_text(json.dumps(cache, indent=2, ensure_ascii=False)) except (OSError, json.JSONDecodeError): pass # corrupt cache — leave it; next extract will rebuild # Full merge needed: activity removed from index from bincio.render.merge import merge_all merge_all(dd) tasks._trigger_rebuild(user.handle) return JSONResponse({"ok": True}) @router.get("/api/activity/{activity_id}/images") async def list_images( activity_id: str, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: user = deps._require_user(bincio_session) deps._check_id(activity_id) dd = deps._get_data_dir() / user.handle images_dir = dd / "edits" / "images" / activity_id images = sorted(p.name for p in images_dir.iterdir() if p.is_file()) if images_dir.exists() else [] return JSONResponse({"images": images}) @router.post("/api/activity/{activity_id}/images") async def upload_image( activity_id: str, file: UploadFile = File(...), bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: user = deps._require_user(bincio_session) deps._check_id(activity_id) dd = deps._get_data_dir() / user.handle if not (dd / "activities" / f"{activity_id}.json").exists(): raise HTTPException(404, "Activity not found") if not file.filename: raise HTTPException(400, "No filename") ct = file.content_type or "" if ct not in _ALLOWED_IMAGE_TYPES: raise HTTPException(400, "Only JPEG, PNG, WebP, or GIF images are accepted") contents = await file.read() if len(contents) > _MAX_IMAGE_BYTES: raise HTTPException(413, f"Image too large (max {_MAX_IMAGE_BYTES // (1024*1024)} MB)") images_dir = dd / "edits" / "images" / activity_id images_dir.mkdir(parents=True, exist_ok=True) safe_name = _unique_image_name(images_dir, Path(file.filename).name) (images_dir / safe_name).write_bytes(contents) from bincio.render.merge import merge_one merge_one(dd, activity_id) return JSONResponse({"ok": True, "filename": safe_name}) @router.delete("/api/activity/{activity_id}/images/{filename}") async def delete_image( activity_id: str, filename: str, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: user = deps._require_user(bincio_session) deps._check_id(activity_id) dd = deps._get_data_dir() / user.handle import shutil safe_name = Path(filename).name target = dd / "edits" / "images" / activity_id / safe_name if target.exists() and target.is_file(): target.unlink() if target.parent.exists() and not any(target.parent.iterdir()): shutil.rmtree(target.parent) from bincio.render.merge import merge_one merge_one(dd, activity_id) return JSONResponse({"ok": True}) @router.get("/api/athlete") async def get_athlete(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: user = deps._require_user(bincio_session) dd = deps._get_data_dir() / user.handle athlete_path = dd / "athlete.json" data: dict = {} if athlete_path.exists(): try: data = json.loads(athlete_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): pass # Layer edits/athlete.yaml on top edits_path = dd / "edits" / "athlete.yaml" if edits_path.exists(): import yaml try: edits = yaml.safe_load(edits_path.read_text(encoding="utf-8")) or {} for k in ("max_hr", "ftp_w", "hr_zones", "power_zones", "seasons", "gear"): if k in edits: data[k] = edits[k] except (OSError, yaml.YAMLError): pass return JSONResponse(data) @router.post("/api/athlete") async def save_athlete( request: Request, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: user = deps._require_user(bincio_session) dd = deps._get_data_dir() / user.handle athlete_path = dd / "athlete.json" if not athlete_path.exists(): from datetime import datetime, timezone athlete_path.write_text(json.dumps({ "bas_version": "1.0", "generated_at": datetime.now(timezone.utc).isoformat(), "power_curve": {}, }), encoding="utf-8") payload = await request.json() edits_dir = dd / "edits" edits_dir.mkdir(exist_ok=True) overrides: dict[str, Any] = {} if payload.get("max_hr") is not None: overrides["max_hr"] = int(payload["max_hr"]) if payload.get("ftp_w") is not None: overrides["ftp_w"] = int(payload["ftp_w"]) if payload.get("hr_zones") is not None: overrides["hr_zones"] = [[int(lo), int(hi)] for lo, hi in payload["hr_zones"]] if payload.get("power_zones") is not None: overrides["power_zones"] = [[int(lo), int(hi)] for lo, hi in payload["power_zones"]] if payload.get("seasons") is not None: overrides["seasons"] = [ {"name": str(s["name"]), "start": str(s["start"]), "end": str(s["end"])} for s in payload["seasons"] ] if payload.get("gear") is not None: overrides["gear"] = payload["gear"] import yaml (edits_dir / "athlete.yaml").write_text( yaml.dump(overrides, allow_unicode=True, default_flow_style=False), encoding="utf-8", ) from bincio.render.merge import merge_all merge_all(dd) tasks._trigger_rebuild(user.handle) return JSONResponse({"ok": True}) @router.get("/api/activities/{activity_id}/segment_efforts") async def activity_segment_efforts( activity_id: str, bincio_session: str | None = Cookie(default=None), ) -> JSONResponse: """Return segment efforts that belong to a specific activity for the logged-in user.""" import asyncio from bincio.segments import store as _seg_store user = deps._require_user(bincio_session) dd = deps._get_data_dir() def _collect() -> list[dict]: efforts_dir = dd / user.handle / "segment_efforts" result: list[dict] = [] if not efforts_dir.exists(): return result for ef_file in sorted(efforts_dir.glob("*.json")): seg_id = ef_file.stem all_efforts = _seg_store.load_efforts(dd, user.handle, seg_id) matching = [e for e in all_efforts if e.activity_id == activity_id] if not matching: continue seg = _seg_store.load_segment(dd, seg_id) if not seg: continue pr_elapsed = min(e.elapsed_s for e in all_efforts) for eff in matching: result.append({ "segment_id": seg.id, "segment_name": seg.name, "segment_distance_m": seg.distance_m, "elapsed_s": eff.elapsed_s, "pr_elapsed_s": pr_elapsed, "started_at": _seg_store._iso(eff.started_at), }) return result return JSONResponse(await asyncio.to_thread(_collect))