Files
Davide Scaini 5307ae287c Explore: personal GPS heatmap tab under Athlete page
- bincio/explore.py: bake_tracks() simplifies GPS coords (RDP ε=0.0001),
  strips to [lng,lat], groups by sport type, writes per-handle tracks.json
- bake-tracks CLI command; render CLI calls _bake_tracks() after each build;
  strava_zip runs it once at end of batch
- /api/me/tracks endpoint serves the baked file; wipe_user cleans it up
- Explore.svelte: MapLibre full-screen map with sidebar — type pills,
  year/month date filter, Lines / Heatmap (global or by-type) view modes
- AthleteView: Explore tab visible only to profile owner (checks __bincioMe)
- Base.astro: fullscreen prop + Planner nav link
2026-05-14 14:31:21 +02:00

507 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""File upload endpoints (/api/upload/*)."""
from __future__ import annotations
import json
import logging
import uuid
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Cookie, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import JSONResponse, StreamingResponse
from bincio.serve import deps, tasks
log = logging.getLogger("bincio.serve")
router = APIRouter()
_SUPPORTED_SUFFIXES = {".fit", ".gpx", ".tcx", ".fit.gz", ".gpx.gz", ".tcx.gz"}
def _file_suffix(name: str) -> str:
"""Return the effective suffix, including .gz double-extension."""
p = Path(name.lower())
if p.suffix == ".gz":
return p.stem.rsplit(".", 1)[-1].join([".", ".gz"]) if "." in p.stem else ".gz"
return p.suffix
def _upsert_index_summary(user_dir: Path, activity_id: str, activity: dict, geojson: Optional[dict] = None) -> None:
"""Add or update an activity summary in user_dir/index.json.
Called after writing BAS activity files so that merge_all can include the
activity in year shards. Without this, uploaded activities exist on disk
but never appear in the browser feed.
"""
# Build preview coords from geojson if available ([lat, lng] order)
preview: Optional[list] = None
if geojson:
try:
coords = geojson.get("geometry", {}).get("coordinates", [])
if coords:
step = max(1, len(coords) // 9)
preview = [[c[1], c[0]] for c in coords[::step]][:9]
except (TypeError, IndexError, AttributeError):
pass
has_track = (user_dir / "activities" / f"{activity_id}.geojson").exists()
summary = {
"id": activity_id,
"title": activity.get("title", activity_id),
"sport": activity.get("sport"),
"sub_sport": activity.get("sub_sport"),
"started_at": activity.get("started_at"),
"distance_m": activity.get("distance_m"),
"duration_s": activity.get("duration_s"),
"moving_time_s": activity.get("moving_time_s"),
"elevation_gain_m": activity.get("elevation_gain_m"),
"avg_speed_kmh": activity.get("avg_speed_kmh"),
"max_speed_kmh": activity.get("max_speed_kmh"),
"avg_hr_bpm": activity.get("avg_hr_bpm"),
"max_hr_bpm": activity.get("max_hr_bpm"),
"avg_cadence_rpm": activity.get("avg_cadence_rpm"),
"avg_power_w": activity.get("avg_power_w"),
"mmp": activity.get("mmp"),
"best_efforts": activity.get("best_efforts"),
"best_climb_m": activity.get("best_climb_m"),
"source": activity.get("source"),
"privacy": activity.get("privacy", "public"),
"detail_url": f"activities/{activity_id}.json",
"track_url": f"activities/{activity_id}.geojson" if has_track else None,
"preview_coords": preview,
}
index_path = user_dir / "index.json"
if index_path.exists():
index_data = json.loads(index_path.read_text(encoding="utf-8"))
else:
index_data = {
"bas_version": "1.0",
"owner": {"handle": user_dir.name},
"generated_at": None,
"activities": [],
}
existing = {a["id"]: a for a in index_data.get("activities", [])}
existing[activity_id] = summary
index_data["activities"] = sorted(existing.values(), key=lambda a: a.get("started_at", ""), reverse=True)
index_path.write_text(json.dumps(index_data, indent=2, ensure_ascii=False), encoding="utf-8")
@router.post("/api/upload/bas")
async def upload_bas_activity(
request: Request,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""Accept a pre-extracted BAS activity JSON from the mobile app.
Body (JSON):
activity full BAS activity dict (required, must have 'id')
timeseries timeseries dict (optional)
geojson GeoJSON dict (optional)
Returns:
{"ok": true, "id": "...", "status": "imported" | "duplicate"}
"""
user = deps._require_auth(request, bincio_session)
body = await request.json()
activity = body.get("activity")
if not activity or not activity.get("id"):
raise HTTPException(400, "Missing activity.id")
activity_id = str(activity["id"])
deps._check_id(activity_id)
user_dir = deps._get_data_dir() / user.handle
acts_dir = user_dir / "activities"
acts_dir.mkdir(parents=True, exist_ok=True)
out = acts_dir / f"{activity_id}.json"
if out.exists():
return JSONResponse({"ok": True, "id": activity_id, "status": "duplicate"})
out.write_text(json.dumps(activity, ensure_ascii=False, indent=2), encoding="utf-8")
if body.get("timeseries"):
ts_path = acts_dir / f"{activity_id}.timeseries.json"
if not ts_path.exists():
ts_path.write_text(json.dumps(body["timeseries"], ensure_ascii=False), encoding="utf-8")
geojson_body: Optional[dict] = body.get("geojson") or None
if geojson_body:
gj_path = acts_dir / f"{activity_id}.geojson"
if not gj_path.exists():
gj_path.write_text(json.dumps(geojson_body, ensure_ascii=False), encoding="utf-8")
_upsert_index_summary(user_dir, activity_id, activity, geojson_body)
try:
from bincio.render.merge import merge_one, write_combined_feed
merge_one(user_dir, activity_id)
write_combined_feed(deps._get_data_dir())
except Exception as exc:
log.warning("upload/bas[%s]: merge/feed failed (non-fatal): %s", user.handle, exc)
log.info("upload/bas[%s]: imported %s", user.handle, activity_id)
return JSONResponse({"ok": True, "id": activity_id, "status": "imported"})
@router.post("/api/upload/raw")
async def upload_raw_activity(
request: Request,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""Accept a raw FIT/GPX file (base64-encoded) from the mobile app, extract it
server-side, store it in the user's activity library, and return the full
extracted data so the mobile can cache it locally.
Used when the device WebView is too old to run Pyodide (e.g. Karoo / Chrome <69).
Body (JSON):
filename original filename (used only to determine file extension)
base64 base64-encoded raw file bytes
Auth: Authorization: Bearer <token>
Returns:
{"ok": true, "id": "...", "detail": {...}, "timeseries": {...}|null,
"geojson": {...}|null, "source_hash": "<sha256-hex>"}
"""
import base64 as _b64
import hashlib
import shutil
user = deps._require_auth(request, bincio_session)
body = await request.json()
filename_hint: str = body.get("filename") or "activity.fit"
b64: str = body.get("base64") or ""
user_title: Optional[str] = body.get("user_title") or None
if not b64:
raise HTTPException(400, "Missing base64 field")
try:
raw = _b64.b64decode(b64)
except ValueError:
raise HTTPException(400, "Invalid base64 encoding")
source_hash = hashlib.sha256(raw).hexdigest()
suffix = Path(filename_hint).suffix or ".fit"
tmp_in = Path(f"/tmp/bincio_raw_{uuid.uuid4()}{suffix}")
tmp_out = Path(f"/tmp/bincio_out_{uuid.uuid4()}")
try:
tmp_in.write_bytes(raw)
tmp_out.mkdir()
from bincio.extract.parsers.factory import parse_file
from bincio.extract.metrics import compute
from bincio.extract.writer import make_activity_id, write_activity
from bincio.extract.timeseries import build_timeseries
activity = parse_file(tmp_in)
metrics = compute(activity)
write_activity(activity, metrics, tmp_out, privacy="public", rdp_epsilon=0.0001)
act_id = make_activity_id(activity)
acts_tmp = tmp_out / "activities"
detail_path = acts_tmp / f"{act_id}.json"
ts_path = acts_tmp / f"{act_id}.timeseries.json"
geojson_path = acts_tmp / f"{act_id}.geojson"
if not ts_path.exists():
ts_data = build_timeseries(activity.points, activity.started_at, "public")
if ts_data.get("t"):
ts_path.write_text(json.dumps(ts_data))
detail = json.loads(detail_path.read_text())
timeseries = json.loads(ts_path.read_text()) if ts_path.exists() else None
geojson = json.loads(geojson_path.read_text()) if geojson_path.exists() else None
# Also store on the server so the activity appears in the user's feed.
user_dir = deps._get_data_dir() / user.handle
acts_dir = user_dir / "activities"
acts_dir.mkdir(parents=True, exist_ok=True)
out = acts_dir / f"{act_id}.json"
if not out.exists():
out.write_text(json.dumps(detail, ensure_ascii=False, indent=2), encoding="utf-8")
if timeseries and not (acts_dir / f"{act_id}.timeseries.json").exists():
(acts_dir / f"{act_id}.timeseries.json").write_text(json.dumps(timeseries), encoding="utf-8")
if geojson and not (acts_dir / f"{act_id}.geojson").exists():
(acts_dir / f"{act_id}.geojson").write_text(json.dumps(geojson), encoding="utf-8")
_upsert_index_summary(user_dir, act_id, detail, geojson)
if user_title:
import yaml as _yaml
edits_dir = user_dir / "edits"
edits_dir.mkdir(parents=True, exist_ok=True)
(edits_dir / f"{act_id}.md").write_text(
f"---\n{_yaml.dump({'title': user_title}, allow_unicode=True)}---\n",
encoding="utf-8",
)
except Exception as exc:
log.warning("upload/raw[%s]: extraction failed: %s", user.handle, exc)
raise HTTPException(422, f"Could not extract activity: {exc}") from exc
finally:
tmp_in.unlink(missing_ok=True)
shutil.rmtree(tmp_out, ignore_errors=True)
# Merge and update feed — best effort; a race or transient FS error here must
# not turn a successful extraction into a 422 (the file is on disk; the mobile
# would retry indefinitely and the activity would never be marked synced).
try:
from bincio.render.merge import merge_one, write_combined_feed
merge_one(user_dir, act_id)
write_combined_feed(deps._get_data_dir())
except Exception as exc:
log.warning("upload/raw[%s]: merge/feed failed (non-fatal): %s", user.handle, exc)
log.info("upload/raw[%s]: imported %s", user.handle, act_id)
return JSONResponse({
"ok": True,
"id": act_id,
"detail": detail,
"timeseries": timeseries,
"geojson": geojson,
"source_hash": source_hash,
})
@router.post("/api/upload")
async def upload_activity(
files: list[UploadFile] = File(...),
store_original: bool = Form(False),
overwrite: bool = Form(False),
bincio_session: Optional[str] = Cookie(default=None),
) -> StreamingResponse:
"""Accept FIT/GPX/TCX files and/or activities.csv; stream SSE progress while processing.
activities.csv (Strava export format) can be included in the batch to:
- Enrich activity files in the same batch (matched by filename)
- Retroactively update sidecars for existing activities (matched by strava_id)
SSE events:
{"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"overwritten"|"duplicate"|"error"}
{"type": "csv", "updates": N} -- only when CSV was included
{"type": "done", "added": N, "csv_updates": N, "duplicates": N, "overwritten": N, "errors": N}
"""
from bincio.extract.ingest import ingest_parsed
from bincio.extract.parsers.factory import parse_file
from bincio.extract.writer import make_activity_id
from bincio.render.merge import merge_all
user = deps._require_user(bincio_session)
dd = deps._get_data_dir() / user.handle
staging = dd / "_uploads"
staging.mkdir(exist_ok=True)
# Read all files into memory now (async), then process synchronously in the generator
csv_bytes_list: list[bytes] = []
activity_items: list[tuple[str, bytes]] = [] # (original_filename, bytes)
for f in files:
fname = Path(f.filename or "").name
raw = await f.read()
if fname.lower().endswith(".csv"):
csv_bytes_list.append(raw)
else:
activity_items.append((fname, raw))
# Build metadata from the first CSV
metadata = None
if csv_bytes_list:
from bincio.extract.strava_csv import StravaMetadata
import tempfile
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
tmp.write(csv_bytes_list[0])
tmp_path = Path(tmp.name)
try:
metadata = StravaMetadata(tmp_path)
finally:
tmp_path.unlink(missing_ok=True)
total_files = len(activity_items)
job_id = tasks._job_start(user.handle, total_files) if total_files > 0 else None
def event_stream():
added = 0
overwritten = 0
duplicates = 0
errors = 0
any_added = False
for n, (name, contents) in enumerate(activity_items, 1):
if job_id:
tasks._job_update(job_id, n - 1, name)
suffix = _file_suffix(name)
if suffix not in _SUPPORTED_SUFFIXES:
errors += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'unsupported type'})}\n\n"
continue
if len(contents) > 50 * 1024 * 1024:
errors += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'file too large'})}\n\n"
continue
staged = staging / name
staged.write_bytes(contents)
kept = False
try:
activity = parse_file(staged)
if metadata is not None:
metadata.enrich(name, activity)
activity_id = make_activity_id(activity)
was_overwrite = False
if (dd / "activities" / f"{activity_id}.json").exists():
if not overwrite:
duplicates += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'duplicate'})}\n\n"
continue
# Overwrite: delete existing files before re-ingesting.
for ext in (".json", ".geojson", ".timeseries.json"):
(dd / "activities" / f"{activity_id}{ext}").unlink(missing_ok=True)
# Remove stale summary from index so ingest_parsed writes a clean one
index_path = dd / "index.json"
if index_path.exists():
idx = json.loads(index_path.read_text(encoding="utf-8"))
idx["activities"] = [a for a in idx.get("activities", []) if a.get("id") != activity_id]
index_path.write_text(json.dumps(idx, indent=2, ensure_ascii=False))
# Remove from dedup hash cache so the new file isn't blocked
cache_path = dd / ".bincio_cache.json"
if cache_path.exists():
try:
cache = json.loads(cache_path.read_text(encoding="utf-8"))
cache.pop(activity_id, None)
cache_path.write_text(json.dumps(cache, ensure_ascii=False))
except (OSError, json.JSONDecodeError):
pass
# Remove merged copies (merge_all will regenerate them after ingest)
merged_acts = dd / "_merged" / "activities"
if merged_acts.exists():
for ext in (".json", ".geojson", ".timeseries.json"):
p = merged_acts / f"{activity_id}{ext}"
if p.exists() or p.is_symlink():
p.unlink(missing_ok=True)
was_overwrite = True
ingest_parsed(activity, dd, privacy="public")
if store_original:
originals_dir = dd / "originals"
originals_dir.mkdir(exist_ok=True)
staged.rename(originals_dir / name)
kept = True
if was_overwrite:
overwritten += 1
else:
added += 1
any_added = True
status = 'overwritten' if was_overwrite else 'imported'
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': status})}\n\n"
except Exception as exc:
errors += 1
log.error("upload[%s]: failed to process %s: %s", user.handle, name, exc, exc_info=True)
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': str(exc)})}\n\n"
finally:
if not kept:
staged.unlink(missing_ok=True)
# Retroactively apply CSV metadata to existing activities
csv_updates = 0
if metadata is not None:
from bincio.extract.strava_csv import apply_csv_to_data_dir
csv_updates = apply_csv_to_data_dir(dd, metadata)
if csv_updates:
yield f"data: {json.dumps({'type': 'csv', 'updates': csv_updates})}\n\n"
if any_added or csv_updates:
merge_all(dd)
if any_added:
tasks._trigger_rebuild(user.handle)
yield f"data: {json.dumps({'type': 'done', 'added': added, 'overwritten': overwritten, 'csv_updates': csv_updates, 'duplicates': duplicates, 'errors': errors})}\n\n"
if job_id:
tasks._job_finish(job_id)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@router.post("/api/upload/strava-zip")
async def upload_strava_zip(
file: UploadFile = File(...),
private: str = Form(default="false"),
bincio_session: Optional[str] = Cookie(default=None),
) -> StreamingResponse:
"""Accept a Strava bulk export ZIP and stream SSE progress while processing.
The ZIP is written to a temp file, processed activity-by-activity, then deleted.
Originals are never kept — the UI informs the user of this upfront.
"""
user = deps._require_user(bincio_session)
if not file.filename or not file.filename.lower().endswith(".zip"):
raise HTTPException(400, "Please upload a .zip file")
privacy = "unlisted" if private.lower() in ("true", "1", "yes") else "public"
dd = deps._get_data_dir() / user.handle
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
zip_path = Path(tmp.name)
try:
while chunk := await file.read(1024 * 1024): # 1 MB chunks
tmp.write(chunk)
finally:
tmp.close()
from bincio.extract.strava_zip import strava_zip_iter
from bincio.render.merge import merge_all
log.info("strava-zip[%s]: received %s, privacy=%s", user.handle, file.filename, privacy)
def event_stream():
any_imported = False
imported_count = 0
error_count = 0
try:
for event in strava_zip_iter(zip_path, dd, privacy=privacy):
yield f"data: {json.dumps(event)}\n\n"
if event.get("type") == "progress":
status = event.get("status")
if status == "imported":
any_imported = True
imported_count += 1
elif status == "error":
error_count += 1
log.warning("strava-zip[%s]: error on %s: %s",
user.handle, event.get("name"), event.get("detail", ""))
if event.get("type") == "done":
log.info("strava-zip[%s]: done — imported=%d errors=%d",
user.handle, imported_count, error_count)
if any_imported:
merge_all(dd)
try:
from bincio.explore import bake_tracks
bake_tracks(user.handle, deps._get_data_dir())
except Exception as exc:
log.warning("strava-zip[%s]: bake_tracks failed (non-fatal): %s", user.handle, exc)
tasks._trigger_rebuild(user.handle)
except Exception as exc:
log.error("strava-zip[%s]: fatal error: %s", user.handle, exc, exc_info=True)
yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n"
finally:
zip_path.unlink(missing_ok=True)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)