Files
bincio-activity/bincio/serve/routers/activities.py
T

381 lines
15 KiB
Python

"""Activity CRUD and athlete endpoints."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Cookie, Depends, File, HTTPException, Request, UploadFile
from fastapi.responses import JSONResponse
from bincio.serve import deps, tasks
from bincio.serve.models import ActivityEditRequest, ActivityEditResponse, GenericResponse
from bincio.serve.db import User
from bincio.shared.images import (
ALLOWED_IMAGE_TYPES as _ALLOWED_IMAGE_TYPES,
MAX_IMAGE_BYTES as _MAX_IMAGE_BYTES,
unique_image_name as _unique_image_name,
)
router = APIRouter()
def _user_data_dir(handle: str) -> Path:
"""Return the merged data dir for a user, for reading activity files."""
dd = deps._get_data_dir()
merged = dd / handle / "_merged"
return merged if merged.exists() else dd / handle
def _require_owns(activity_id: str, user: User) -> Path:
"""Verify the user owns this activity (it lives in their data dir)."""
activity_path = _user_data_dir(user.handle) / "activities" / f"{activity_id}.json"
if not activity_path.exists():
raise HTTPException(404, "Activity not found")
return activity_path
@router.get("/api/activity/{activity_id}/geojson")
async def get_activity_geojson(
activity_id: str,
user: User = Depends(deps._require_auth),
) -> JSONResponse:
"""Return GeoJSON track for an activity (mobile detail screen)."""
deps._check_id(activity_id)
dd = deps._get_data_dir()
user_dir = dd / user.handle
for base in (user_dir / "_merged" / "activities", user_dir / "activities"):
p = base / f"{activity_id}.geojson"
if p.exists():
return JSONResponse(json.loads(p.read_text()))
raise HTTPException(404, "GeoJSON not found")
@router.get("/api/activity/{activity_id}/timeseries")
async def get_activity_timeseries(
activity_id: str,
user: User = Depends(deps._require_auth),
) -> JSONResponse:
"""Return timeseries for an activity (mobile detail screen)."""
deps._check_id(activity_id)
dd = deps._get_data_dir()
user_dir = dd / user.handle
for base in (user_dir / "_merged" / "activities", user_dir / "activities"):
p = base / f"{activity_id}.timeseries.json"
if p.exists():
return JSONResponse(json.loads(p.read_text()))
raise HTTPException(404, "Timeseries not found")
@router.get("/api/activity/{activity_id}")
async def get_activity(
activity_id: str,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
user = deps._require_user(bincio_session)
deps._check_id(activity_id)
path = _require_owns(activity_id, user)
detail = json.loads(path.read_text())
# Normalise for EditDrawer: add `private` bool so the drawer works regardless
# of whether the raw JSON uses the old "private" or the new "unlisted" value.
detail["private"] = detail.get("privacy") in ("private", "unlisted")
return JSONResponse(detail)
@router.post("/api/activity/{activity_id}", response_model=ActivityEditResponse)
async def post_activity(
activity_id: str,
edit_req: ActivityEditRequest,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
user = deps._require_user(bincio_session)
deps._check_id(activity_id)
dd = deps._get_data_dir() / user.handle
# Verify the activity belongs to this user before writing
if not (dd / "activities" / f"{activity_id}.json").exists():
raise HTTPException(404, "Activity not found")
from bincio.edit.ops import apply_sidecar_edit
body = edit_req.model_dump(exclude_none=True)
apply_sidecar_edit(activity_id, body, dd)
tasks._trigger_rebuild(user.handle)
return JSONResponse({"ok": True})
@router.post("/api/activity/{activity_id}/recalculate-elevation/dem")
async def recalculate_elevation_dem_endpoint(
activity_id: str,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
"""Replace GPS altitude with DEM terrain elevation and recompute gain/loss.
Requires --dem-url to be set when starting bincio serve.
"""
user = deps._require_user(bincio_session)
deps._check_id(activity_id)
if not deps.dem_url:
raise HTTPException(503, "DEM URL not configured.")
dd = deps._get_data_dir() / user.handle
if not (dd / "activities" / f"{activity_id}.json").exists():
raise HTTPException(404, "Activity not found")
try:
from bincio.extract.dem import recalculate_elevation
from bincio.render.merge import merge_one
result = recalculate_elevation(dd, activity_id, deps.dem_url)
merge_one(dd, activity_id)
tasks._trigger_rebuild(user.handle)
return JSONResponse(result)
except FileNotFoundError as e:
raise HTTPException(404, str(e))
except ValueError as e:
raise HTTPException(422, str(e))
@router.post("/api/activity/{activity_id}/recalculate-elevation/hysteresis")
async def recalculate_elevation_hysteresis_endpoint(
activity_id: str,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
"""Recompute gain/loss from original recorded elevation using source-aware hysteresis."""
user = deps._require_user(bincio_session)
deps._check_id(activity_id)
dd = deps._get_data_dir() / user.handle
if not (dd / "activities" / f"{activity_id}.json").exists():
raise HTTPException(404, "Activity not found")
try:
from bincio.extract.dem import recalculate_elevation_hysteresis
from bincio.render.merge import merge_one
result = recalculate_elevation_hysteresis(dd, activity_id)
merge_one(dd, activity_id)
tasks._trigger_rebuild(user.handle)
return JSONResponse(result)
except FileNotFoundError as e:
raise HTTPException(404, str(e))
except ValueError as e:
raise HTTPException(422, str(e))
@router.delete("/api/activity/{activity_id}", response_model=GenericResponse)
async def delete_activity(
activity_id: str,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
"""Delete a single activity and all associated files for the logged-in user."""
user = deps._require_user(bincio_session)
deps._check_id(activity_id)
dd = deps._get_data_dir() / user.handle
acts_dir = dd / "activities"
json_path = acts_dir / f"{activity_id}.json"
if not json_path.exists():
raise HTTPException(404, "Activity not found")
import shutil
# Remove the source files (activities dir)
for suffix in (".json", ".geojson", ".timeseries.json"):
p = acts_dir / f"{activity_id}{suffix}"
p.unlink(missing_ok=True)
# Remove sidecar edit and images
sidecar = dd / "edits" / f"{activity_id}.md"
sidecar.unlink(missing_ok=True)
images_dir = dd / "edits" / "images" / activity_id
if images_dir.exists():
shutil.rmtree(images_dir)
# Remove from the extract-level flat index so merge_all doesn't re-add
# the summary even though the detail file is gone.
index_path = dd / "index.json"
if index_path.exists():
try:
idx = json.loads(index_path.read_text(encoding="utf-8"))
idx["activities"] = [a for a in idx.get("activities", []) if a.get("id") != activity_id]
index_path.write_text(json.dumps(idx, indent=2, ensure_ascii=False))
except (OSError, json.JSONDecodeError):
pass # corrupt index — merge_all will clean up on next run
# Remove from dedup cache so the file can be re-uploaded if needed
cache_path = dd / ".bincio_cache.json"
if cache_path.exists():
try:
cache = json.loads(cache_path.read_text(encoding="utf-8"))
if isinstance(cache, dict) and "activities" in cache:
cache["activities"] = [
a for a in cache["activities"] if a.get("id") != activity_id
]
cache_path.write_text(json.dumps(cache, indent=2, ensure_ascii=False))
except (OSError, json.JSONDecodeError):
pass # corrupt cache — leave it; next extract will rebuild
# Full merge needed: activity removed from index
from bincio.render.merge import merge_all
merge_all(dd)
tasks._trigger_rebuild(user.handle)
return JSONResponse({"ok": True})
@router.get("/api/activity/{activity_id}/images")
async def list_images(
activity_id: str,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
user = deps._require_user(bincio_session)
deps._check_id(activity_id)
dd = deps._get_data_dir() / user.handle
images_dir = dd / "edits" / "images" / activity_id
images = sorted(p.name for p in images_dir.iterdir() if p.is_file()) if images_dir.exists() else []
return JSONResponse({"images": images})
@router.post("/api/activity/{activity_id}/images")
async def upload_image(
activity_id: str,
file: UploadFile = File(...),
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
user = deps._require_user(bincio_session)
deps._check_id(activity_id)
dd = deps._get_data_dir() / user.handle
if not (dd / "activities" / f"{activity_id}.json").exists():
raise HTTPException(404, "Activity not found")
if not file.filename:
raise HTTPException(400, "No filename")
ct = file.content_type or ""
if ct not in _ALLOWED_IMAGE_TYPES:
raise HTTPException(400, "Only JPEG, PNG, WebP, or GIF images are accepted")
contents = await file.read()
if len(contents) > _MAX_IMAGE_BYTES:
raise HTTPException(413, f"Image too large (max {_MAX_IMAGE_BYTES // (1024*1024)} MB)")
images_dir = dd / "edits" / "images" / activity_id
images_dir.mkdir(parents=True, exist_ok=True)
safe_name = _unique_image_name(images_dir, Path(file.filename).name)
(images_dir / safe_name).write_bytes(contents)
from bincio.render.merge import merge_one
merge_one(dd, activity_id)
return JSONResponse({"ok": True, "filename": safe_name})
@router.delete("/api/activity/{activity_id}/images/{filename}")
async def delete_image(
activity_id: str,
filename: str,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
user = deps._require_user(bincio_session)
deps._check_id(activity_id)
dd = deps._get_data_dir() / user.handle
import shutil
safe_name = Path(filename).name
target = dd / "edits" / "images" / activity_id / safe_name
if target.exists() and target.is_file():
target.unlink()
if target.parent.exists() and not any(target.parent.iterdir()):
shutil.rmtree(target.parent)
from bincio.render.merge import merge_one
merge_one(dd, activity_id)
return JSONResponse({"ok": True})
@router.get("/api/athlete")
async def get_athlete(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
user = deps._require_user(bincio_session)
dd = deps._get_data_dir() / user.handle
athlete_path = dd / "athlete.json"
data: dict = {}
if athlete_path.exists():
data = json.loads(athlete_path.read_text(encoding="utf-8"))
# Layer edits/athlete.yaml on top
edits_path = dd / "edits" / "athlete.yaml"
if edits_path.exists():
import yaml
try:
edits = yaml.safe_load(edits_path.read_text(encoding="utf-8")) or {}
for k in ("max_hr", "ftp_w", "hr_zones", "power_zones", "seasons", "gear"):
if k in edits:
data[k] = edits[k]
except (OSError, yaml.YAMLError):
pass
return JSONResponse(data)
@router.post("/api/athlete")
async def save_athlete(
request: Request,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
user = deps._require_user(bincio_session)
dd = deps._get_data_dir() / user.handle
athlete_path = dd / "athlete.json"
if not athlete_path.exists():
from datetime import datetime, timezone
athlete_path.write_text(json.dumps({
"bas_version": "1.0",
"generated_at": datetime.now(timezone.utc).isoformat(),
"power_curve": {},
}), encoding="utf-8")
payload = await request.json()
edits_dir = dd / "edits"
edits_dir.mkdir(exist_ok=True)
overrides: dict[str, Any] = {}
if payload.get("max_hr") is not None:
overrides["max_hr"] = int(payload["max_hr"])
if payload.get("ftp_w") is not None:
overrides["ftp_w"] = int(payload["ftp_w"])
if payload.get("hr_zones") is not None:
overrides["hr_zones"] = [[int(lo), int(hi)] for lo, hi in payload["hr_zones"]]
if payload.get("power_zones") is not None:
overrides["power_zones"] = [[int(lo), int(hi)] for lo, hi in payload["power_zones"]]
if payload.get("seasons") is not None:
overrides["seasons"] = [
{"name": str(s["name"]), "start": str(s["start"]), "end": str(s["end"])}
for s in payload["seasons"]
]
if payload.get("gear") is not None:
overrides["gear"] = payload["gear"]
import yaml
(edits_dir / "athlete.yaml").write_text(
yaml.dump(overrides, allow_unicode=True, default_flow_style=False),
encoding="utf-8",
)
from bincio.render.merge import merge_all
merge_all(dd)
tasks._trigger_rebuild(user.handle)
return JSONResponse({"ok": True})
@router.get("/api/activities/{activity_id}/segment_efforts")
async def activity_segment_efforts(
activity_id: str,
bincio_session: str | None = Cookie(default=None),
) -> JSONResponse:
"""Return segment efforts that belong to a specific activity for the logged-in user."""
from bincio.segments import store as _seg_store
user = deps._require_user(bincio_session)
dd = deps._get_data_dir()
efforts_dir = dd / user.handle / "segment_efforts"
result = []
if efforts_dir.exists():
import json as _json
for ef_file in sorted(efforts_dir.glob("*.json")):
seg_id = ef_file.stem
all_efforts = _seg_store.load_efforts(dd, user.handle, seg_id)
matching = [e for e in all_efforts if e.activity_id == activity_id]
if not matching:
continue
seg = _seg_store.load_segment(dd, seg_id)
if not seg:
continue
pr_elapsed = min(e.elapsed_s for e in all_efforts)
for eff in matching:
result.append({
"segment_id": seg.id,
"segment_name": seg.name,
"segment_distance_m": seg.distance_m,
"elapsed_s": eff.elapsed_s,
"pr_elapsed_s": pr_elapsed,
"started_at": _seg_store._iso(eff.started_at),
})
return JSONResponse(result)