"""Activity file download endpoint. GET /api/activity/{activity_id}/download/{fmt} fmt: bas | original | gpx Permission: - If activity.download_disabled is true: only the owner (authenticated) may download. - Otherwise: no auth required — anyone who can see the activity can download. """ from __future__ import annotations import json from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Optional from fastapi import APIRouter, Cookie, HTTPException from fastapi.responses import FileResponse, Response from bincio.serve import deps router = APIRouter() def _find_activity(activity_id: str) -> tuple[str, Path] | None: """Return (handle, detail_path) for whichever user owns this activity.""" data_dir = deps._get_data_dir() for user_dir in sorted(data_dir.iterdir()): if not user_dir.is_dir() or user_dir.name.startswith(("_", ".")): continue for base in (user_dir / "_merged" / "activities", user_dir / "activities"): p = base / f"{activity_id}.json" if p.exists(): return user_dir.name, p return None def _check_download_permission( detail: dict, handle: str, bincio_session: Optional[str] ) -> None: if not detail.get("download_disabled"): return try: user = deps._require_user(bincio_session) except HTTPException: raise HTTPException(403, "Downloads are disabled for this activity") if user.handle != handle: raise HTTPException(403, "Downloads are disabled for this activity") def _generate_gpx(detail: dict, ts: dict) -> str: t_vals = ts.get("t") or [] lat_vals = ts.get("lat") or [] lon_vals = ts.get("lon") or [] ele_vals = ts.get("elevation_m") or [] hr_vals = ts.get("hr_bpm") or [] title = (detail.get("title") or "Activity").replace("&", "&").replace("<", "<").replace(">", ">") started = detail.get("started_at") or "1970-01-01T00:00:00+00:00" try: t0 = datetime.fromisoformat(started) except ValueError: t0 = datetime(1970, 1, 1, tzinfo=timezone.utc) lines = [ '', '', f' {title}', ] for i, t in enumerate(t_vals): lat = lat_vals[i] if i < len(lat_vals) else None lon = lon_vals[i] if i < len(lon_vals) else None if lat is None or lon is None: continue ele = ele_vals[i] if i < len(ele_vals) else None hr = hr_vals[i] if i < len(hr_vals) else None ts_str = (t0 + timedelta(seconds=t)).strftime("%Y-%m-%dT%H:%M:%SZ") trkpt = f' ' if ele is not None: trkpt += f"{ele}" trkpt += f"" if hr is not None: trkpt += ( f"" f"{hr}" f"" ) trkpt += "" lines.append(trkpt) lines += [" ", ""] return "\n".join(lines) @router.get("/api/activity/{activity_id}/download/{fmt}") async def download_activity( activity_id: str, fmt: str, bincio_session: Optional[str] = Cookie(default=None), ) -> Response: deps._check_id(activity_id) if fmt not in ("bas", "original", "gpx"): raise HTTPException(400, "fmt must be bas, original, or gpx") result = _find_activity(activity_id) if result is None: raise HTTPException(404, "Activity not found") handle, detail_path = result detail = json.loads(detail_path.read_text(encoding="utf-8")) _check_download_permission(detail, handle, bincio_session) if fmt == "bas": # Embed the timeseries so the downloaded file is self-contained. ts_path: Path | None = None data_dir = deps._get_data_dir() for base in ( data_dir / handle / "_merged" / "activities", data_dir / handle / "activities", ): p = base / f"{activity_id}.timeseries.json" if p.exists(): ts_path = p break if ts_path: try: detail["timeseries"] = json.loads(ts_path.read_text(encoding="utf-8")) detail.pop("timeseries_url", None) except (OSError, json.JSONDecodeError): pass content = json.dumps(detail, ensure_ascii=False, indent=2) return Response( content=content, media_type="application/json", headers={"Content-Disposition": f'attachment; filename="{activity_id}.json"'}, ) if fmt == "original": source = detail.get("source") or "" source_file = detail.get("source_file") or "" if source not in ("fit_file", "gpx_file") or not source_file: raise HTTPException(404, "No original file available for this activity") safe_name = Path(source_file).name # strip any directory traversal orig_path = deps._get_data_dir() / handle / "originals" / safe_name if not orig_path.exists(): raise HTTPException(404, "Original file not found on disk") media_type = "application/octet-stream" if safe_name.endswith(".fit"): media_type = "application/vnd.ant.fit" elif safe_name.endswith(".gpx"): media_type = "application/gpx+xml" return FileResponse( orig_path, media_type=media_type, filename=safe_name, headers={"Content-Disposition": f'attachment; filename="{safe_name}"'}, ) # fmt == "gpx" data_dir = deps._get_data_dir() ts_path: Path | None = None for base in ( data_dir / handle / "_merged" / "activities", data_dir / handle / "activities", ): p = base / f"{activity_id}.timeseries.json" if p.exists(): ts_path = p break if ts_path is None: raise HTTPException(404, "No GPS data available for this activity") ts = json.loads(ts_path.read_text(encoding="utf-8")) lat_vals = ts.get("lat") or [] if not any(v is not None for v in lat_vals): raise HTTPException(404, "No GPS data available for this activity") gpx_content = _generate_gpx(detail, ts) raw_title = detail.get("title") or activity_id safe_title = "".join(c for c in raw_title if c.isalnum() or c in " -_")[:50].strip() filename = f"{safe_title or activity_id}.gpx" return Response( content=gpx_content, media_type="application/gpx+xml", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, )