5307ae287c
- bincio/explore.py: bake_tracks() simplifies GPS coords (RDP ε=0.0001), strips to [lng,lat], groups by sport type, writes per-handle tracks.json - bake-tracks CLI command; render CLI calls _bake_tracks() after each build; strava_zip runs it once at end of batch - /api/me/tracks endpoint serves the baked file; wipe_user cleans it up - Explore.svelte: MapLibre full-screen map with sidebar — type pills, year/month date filter, Lines / Heatmap (global or by-type) view modes - AthleteView: Explore tab visible only to profile owner (checks __bincioMe) - Base.astro: fullscreen prop + Planner nav link
507 lines
21 KiB
Python
507 lines
21 KiB
Python
"""File upload endpoints (/api/upload/*)."""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from fastapi import APIRouter, Cookie, File, Form, HTTPException, Request, UploadFile
|
||
from fastapi.responses import JSONResponse, StreamingResponse
|
||
|
||
from bincio.serve import deps, tasks
|
||
|
||
log = logging.getLogger("bincio.serve")
|
||
|
||
router = APIRouter()
|
||
|
||
_SUPPORTED_SUFFIXES = {".fit", ".gpx", ".tcx", ".fit.gz", ".gpx.gz", ".tcx.gz"}
|
||
|
||
|
||
def _file_suffix(name: str) -> str:
|
||
"""Return the effective suffix, including .gz double-extension."""
|
||
p = Path(name.lower())
|
||
if p.suffix == ".gz":
|
||
return p.stem.rsplit(".", 1)[-1].join([".", ".gz"]) if "." in p.stem else ".gz"
|
||
return p.suffix
|
||
|
||
|
||
def _upsert_index_summary(user_dir: Path, activity_id: str, activity: dict, geojson: Optional[dict] = None) -> None:
|
||
"""Add or update an activity summary in user_dir/index.json.
|
||
|
||
Called after writing BAS activity files so that merge_all can include the
|
||
activity in year shards. Without this, uploaded activities exist on disk
|
||
but never appear in the browser feed.
|
||
"""
|
||
# Build preview coords from geojson if available ([lat, lng] order)
|
||
preview: Optional[list] = None
|
||
if geojson:
|
||
try:
|
||
coords = geojson.get("geometry", {}).get("coordinates", [])
|
||
if coords:
|
||
step = max(1, len(coords) // 9)
|
||
preview = [[c[1], c[0]] for c in coords[::step]][:9]
|
||
except (TypeError, IndexError, AttributeError):
|
||
pass
|
||
|
||
has_track = (user_dir / "activities" / f"{activity_id}.geojson").exists()
|
||
summary = {
|
||
"id": activity_id,
|
||
"title": activity.get("title", activity_id),
|
||
"sport": activity.get("sport"),
|
||
"sub_sport": activity.get("sub_sport"),
|
||
"started_at": activity.get("started_at"),
|
||
"distance_m": activity.get("distance_m"),
|
||
"duration_s": activity.get("duration_s"),
|
||
"moving_time_s": activity.get("moving_time_s"),
|
||
"elevation_gain_m": activity.get("elevation_gain_m"),
|
||
"avg_speed_kmh": activity.get("avg_speed_kmh"),
|
||
"max_speed_kmh": activity.get("max_speed_kmh"),
|
||
"avg_hr_bpm": activity.get("avg_hr_bpm"),
|
||
"max_hr_bpm": activity.get("max_hr_bpm"),
|
||
"avg_cadence_rpm": activity.get("avg_cadence_rpm"),
|
||
"avg_power_w": activity.get("avg_power_w"),
|
||
"mmp": activity.get("mmp"),
|
||
"best_efforts": activity.get("best_efforts"),
|
||
"best_climb_m": activity.get("best_climb_m"),
|
||
"source": activity.get("source"),
|
||
"privacy": activity.get("privacy", "public"),
|
||
"detail_url": f"activities/{activity_id}.json",
|
||
"track_url": f"activities/{activity_id}.geojson" if has_track else None,
|
||
"preview_coords": preview,
|
||
}
|
||
|
||
index_path = user_dir / "index.json"
|
||
if index_path.exists():
|
||
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
||
else:
|
||
index_data = {
|
||
"bas_version": "1.0",
|
||
"owner": {"handle": user_dir.name},
|
||
"generated_at": None,
|
||
"activities": [],
|
||
}
|
||
existing = {a["id"]: a for a in index_data.get("activities", [])}
|
||
existing[activity_id] = summary
|
||
index_data["activities"] = sorted(existing.values(), key=lambda a: a.get("started_at", ""), reverse=True)
|
||
index_path.write_text(json.dumps(index_data, indent=2, ensure_ascii=False), encoding="utf-8")
|
||
|
||
|
||
@router.post("/api/upload/bas")
|
||
async def upload_bas_activity(
|
||
request: Request,
|
||
bincio_session: Optional[str] = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Accept a pre-extracted BAS activity JSON from the mobile app.
|
||
|
||
Body (JSON):
|
||
activity – full BAS activity dict (required, must have 'id')
|
||
timeseries – timeseries dict (optional)
|
||
geojson – GeoJSON dict (optional)
|
||
|
||
Returns:
|
||
{"ok": true, "id": "...", "status": "imported" | "duplicate"}
|
||
"""
|
||
user = deps._require_auth(request, bincio_session)
|
||
body = await request.json()
|
||
|
||
activity = body.get("activity")
|
||
if not activity or not activity.get("id"):
|
||
raise HTTPException(400, "Missing activity.id")
|
||
|
||
activity_id = str(activity["id"])
|
||
deps._check_id(activity_id)
|
||
|
||
user_dir = deps._get_data_dir() / user.handle
|
||
acts_dir = user_dir / "activities"
|
||
acts_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
out = acts_dir / f"{activity_id}.json"
|
||
if out.exists():
|
||
return JSONResponse({"ok": True, "id": activity_id, "status": "duplicate"})
|
||
|
||
out.write_text(json.dumps(activity, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
if body.get("timeseries"):
|
||
ts_path = acts_dir / f"{activity_id}.timeseries.json"
|
||
if not ts_path.exists():
|
||
ts_path.write_text(json.dumps(body["timeseries"], ensure_ascii=False), encoding="utf-8")
|
||
|
||
geojson_body: Optional[dict] = body.get("geojson") or None
|
||
if geojson_body:
|
||
gj_path = acts_dir / f"{activity_id}.geojson"
|
||
if not gj_path.exists():
|
||
gj_path.write_text(json.dumps(geojson_body, ensure_ascii=False), encoding="utf-8")
|
||
|
||
_upsert_index_summary(user_dir, activity_id, activity, geojson_body)
|
||
|
||
try:
|
||
from bincio.render.merge import merge_one, write_combined_feed
|
||
merge_one(user_dir, activity_id)
|
||
write_combined_feed(deps._get_data_dir())
|
||
except Exception as exc:
|
||
log.warning("upload/bas[%s]: merge/feed failed (non-fatal): %s", user.handle, exc)
|
||
|
||
log.info("upload/bas[%s]: imported %s", user.handle, activity_id)
|
||
return JSONResponse({"ok": True, "id": activity_id, "status": "imported"})
|
||
|
||
|
||
@router.post("/api/upload/raw")
|
||
async def upload_raw_activity(
|
||
request: Request,
|
||
bincio_session: Optional[str] = Cookie(default=None),
|
||
) -> JSONResponse:
|
||
"""Accept a raw FIT/GPX file (base64-encoded) from the mobile app, extract it
|
||
server-side, store it in the user's activity library, and return the full
|
||
extracted data so the mobile can cache it locally.
|
||
|
||
Used when the device WebView is too old to run Pyodide (e.g. Karoo / Chrome <69).
|
||
|
||
Body (JSON):
|
||
filename – original filename (used only to determine file extension)
|
||
base64 – base64-encoded raw file bytes
|
||
|
||
Auth: Authorization: Bearer <token>
|
||
|
||
Returns:
|
||
{"ok": true, "id": "...", "detail": {...}, "timeseries": {...}|null,
|
||
"geojson": {...}|null, "source_hash": "<sha256-hex>"}
|
||
"""
|
||
import base64 as _b64
|
||
import hashlib
|
||
import shutil
|
||
|
||
user = deps._require_auth(request, bincio_session)
|
||
|
||
body = await request.json()
|
||
filename_hint: str = body.get("filename") or "activity.fit"
|
||
b64: str = body.get("base64") or ""
|
||
user_title: Optional[str] = body.get("user_title") or None
|
||
if not b64:
|
||
raise HTTPException(400, "Missing base64 field")
|
||
|
||
try:
|
||
raw = _b64.b64decode(b64)
|
||
except ValueError:
|
||
raise HTTPException(400, "Invalid base64 encoding")
|
||
|
||
source_hash = hashlib.sha256(raw).hexdigest()
|
||
|
||
suffix = Path(filename_hint).suffix or ".fit"
|
||
tmp_in = Path(f"/tmp/bincio_raw_{uuid.uuid4()}{suffix}")
|
||
tmp_out = Path(f"/tmp/bincio_out_{uuid.uuid4()}")
|
||
try:
|
||
tmp_in.write_bytes(raw)
|
||
tmp_out.mkdir()
|
||
|
||
from bincio.extract.parsers.factory import parse_file
|
||
from bincio.extract.metrics import compute
|
||
from bincio.extract.writer import make_activity_id, write_activity
|
||
from bincio.extract.timeseries import build_timeseries
|
||
|
||
activity = parse_file(tmp_in)
|
||
metrics = compute(activity)
|
||
write_activity(activity, metrics, tmp_out, privacy="public", rdp_epsilon=0.0001)
|
||
act_id = make_activity_id(activity)
|
||
|
||
acts_tmp = tmp_out / "activities"
|
||
detail_path = acts_tmp / f"{act_id}.json"
|
||
ts_path = acts_tmp / f"{act_id}.timeseries.json"
|
||
geojson_path = acts_tmp / f"{act_id}.geojson"
|
||
|
||
if not ts_path.exists():
|
||
ts_data = build_timeseries(activity.points, activity.started_at, "public")
|
||
if ts_data.get("t"):
|
||
ts_path.write_text(json.dumps(ts_data))
|
||
|
||
detail = json.loads(detail_path.read_text())
|
||
timeseries = json.loads(ts_path.read_text()) if ts_path.exists() else None
|
||
geojson = json.loads(geojson_path.read_text()) if geojson_path.exists() else None
|
||
|
||
# Also store on the server so the activity appears in the user's feed.
|
||
user_dir = deps._get_data_dir() / user.handle
|
||
acts_dir = user_dir / "activities"
|
||
acts_dir.mkdir(parents=True, exist_ok=True)
|
||
out = acts_dir / f"{act_id}.json"
|
||
if not out.exists():
|
||
out.write_text(json.dumps(detail, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
if timeseries and not (acts_dir / f"{act_id}.timeseries.json").exists():
|
||
(acts_dir / f"{act_id}.timeseries.json").write_text(json.dumps(timeseries), encoding="utf-8")
|
||
if geojson and not (acts_dir / f"{act_id}.geojson").exists():
|
||
(acts_dir / f"{act_id}.geojson").write_text(json.dumps(geojson), encoding="utf-8")
|
||
|
||
_upsert_index_summary(user_dir, act_id, detail, geojson)
|
||
|
||
if user_title:
|
||
import yaml as _yaml
|
||
edits_dir = user_dir / "edits"
|
||
edits_dir.mkdir(parents=True, exist_ok=True)
|
||
(edits_dir / f"{act_id}.md").write_text(
|
||
f"---\n{_yaml.dump({'title': user_title}, allow_unicode=True)}---\n",
|
||
encoding="utf-8",
|
||
)
|
||
|
||
except Exception as exc:
|
||
log.warning("upload/raw[%s]: extraction failed: %s", user.handle, exc)
|
||
raise HTTPException(422, f"Could not extract activity: {exc}") from exc
|
||
finally:
|
||
tmp_in.unlink(missing_ok=True)
|
||
shutil.rmtree(tmp_out, ignore_errors=True)
|
||
|
||
# Merge and update feed — best effort; a race or transient FS error here must
|
||
# not turn a successful extraction into a 422 (the file is on disk; the mobile
|
||
# would retry indefinitely and the activity would never be marked synced).
|
||
try:
|
||
from bincio.render.merge import merge_one, write_combined_feed
|
||
merge_one(user_dir, act_id)
|
||
write_combined_feed(deps._get_data_dir())
|
||
except Exception as exc:
|
||
log.warning("upload/raw[%s]: merge/feed failed (non-fatal): %s", user.handle, exc)
|
||
|
||
log.info("upload/raw[%s]: imported %s", user.handle, act_id)
|
||
return JSONResponse({
|
||
"ok": True,
|
||
"id": act_id,
|
||
"detail": detail,
|
||
"timeseries": timeseries,
|
||
"geojson": geojson,
|
||
"source_hash": source_hash,
|
||
})
|
||
|
||
|
||
@router.post("/api/upload")
|
||
async def upload_activity(
|
||
files: list[UploadFile] = File(...),
|
||
store_original: bool = Form(False),
|
||
overwrite: bool = Form(False),
|
||
bincio_session: Optional[str] = Cookie(default=None),
|
||
) -> StreamingResponse:
|
||
"""Accept FIT/GPX/TCX files and/or activities.csv; stream SSE progress while processing.
|
||
|
||
activities.csv (Strava export format) can be included in the batch to:
|
||
- Enrich activity files in the same batch (matched by filename)
|
||
- Retroactively update sidecars for existing activities (matched by strava_id)
|
||
|
||
SSE events:
|
||
{"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"overwritten"|"duplicate"|"error"}
|
||
{"type": "csv", "updates": N} -- only when CSV was included
|
||
{"type": "done", "added": N, "csv_updates": N, "duplicates": N, "overwritten": N, "errors": N}
|
||
"""
|
||
from bincio.extract.ingest import ingest_parsed
|
||
from bincio.extract.parsers.factory import parse_file
|
||
from bincio.extract.writer import make_activity_id
|
||
from bincio.render.merge import merge_all
|
||
|
||
user = deps._require_user(bincio_session)
|
||
dd = deps._get_data_dir() / user.handle
|
||
staging = dd / "_uploads"
|
||
staging.mkdir(exist_ok=True)
|
||
|
||
# Read all files into memory now (async), then process synchronously in the generator
|
||
csv_bytes_list: list[bytes] = []
|
||
activity_items: list[tuple[str, bytes]] = [] # (original_filename, bytes)
|
||
|
||
for f in files:
|
||
fname = Path(f.filename or "").name
|
||
raw = await f.read()
|
||
if fname.lower().endswith(".csv"):
|
||
csv_bytes_list.append(raw)
|
||
else:
|
||
activity_items.append((fname, raw))
|
||
|
||
# Build metadata from the first CSV
|
||
metadata = None
|
||
if csv_bytes_list:
|
||
from bincio.extract.strava_csv import StravaMetadata
|
||
import tempfile
|
||
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
|
||
tmp.write(csv_bytes_list[0])
|
||
tmp_path = Path(tmp.name)
|
||
try:
|
||
metadata = StravaMetadata(tmp_path)
|
||
finally:
|
||
tmp_path.unlink(missing_ok=True)
|
||
|
||
total_files = len(activity_items)
|
||
job_id = tasks._job_start(user.handle, total_files) if total_files > 0 else None
|
||
|
||
def event_stream():
|
||
added = 0
|
||
overwritten = 0
|
||
duplicates = 0
|
||
errors = 0
|
||
any_added = False
|
||
|
||
for n, (name, contents) in enumerate(activity_items, 1):
|
||
if job_id:
|
||
tasks._job_update(job_id, n - 1, name)
|
||
|
||
suffix = _file_suffix(name)
|
||
if suffix not in _SUPPORTED_SUFFIXES:
|
||
errors += 1
|
||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'unsupported type'})}\n\n"
|
||
continue
|
||
|
||
if len(contents) > 50 * 1024 * 1024:
|
||
errors += 1
|
||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'file too large'})}\n\n"
|
||
continue
|
||
|
||
staged = staging / name
|
||
staged.write_bytes(contents)
|
||
kept = False
|
||
try:
|
||
activity = parse_file(staged)
|
||
if metadata is not None:
|
||
metadata.enrich(name, activity)
|
||
activity_id = make_activity_id(activity)
|
||
was_overwrite = False
|
||
if (dd / "activities" / f"{activity_id}.json").exists():
|
||
if not overwrite:
|
||
duplicates += 1
|
||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'duplicate'})}\n\n"
|
||
continue
|
||
# Overwrite: delete existing files before re-ingesting.
|
||
for ext in (".json", ".geojson", ".timeseries.json"):
|
||
(dd / "activities" / f"{activity_id}{ext}").unlink(missing_ok=True)
|
||
# Remove stale summary from index so ingest_parsed writes a clean one
|
||
index_path = dd / "index.json"
|
||
if index_path.exists():
|
||
idx = json.loads(index_path.read_text(encoding="utf-8"))
|
||
idx["activities"] = [a for a in idx.get("activities", []) if a.get("id") != activity_id]
|
||
index_path.write_text(json.dumps(idx, indent=2, ensure_ascii=False))
|
||
# Remove from dedup hash cache so the new file isn't blocked
|
||
cache_path = dd / ".bincio_cache.json"
|
||
if cache_path.exists():
|
||
try:
|
||
cache = json.loads(cache_path.read_text(encoding="utf-8"))
|
||
cache.pop(activity_id, None)
|
||
cache_path.write_text(json.dumps(cache, ensure_ascii=False))
|
||
except (OSError, json.JSONDecodeError):
|
||
pass
|
||
# Remove merged copies (merge_all will regenerate them after ingest)
|
||
merged_acts = dd / "_merged" / "activities"
|
||
if merged_acts.exists():
|
||
for ext in (".json", ".geojson", ".timeseries.json"):
|
||
p = merged_acts / f"{activity_id}{ext}"
|
||
if p.exists() or p.is_symlink():
|
||
p.unlink(missing_ok=True)
|
||
was_overwrite = True
|
||
ingest_parsed(activity, dd, privacy="public")
|
||
if store_original:
|
||
originals_dir = dd / "originals"
|
||
originals_dir.mkdir(exist_ok=True)
|
||
staged.rename(originals_dir / name)
|
||
kept = True
|
||
if was_overwrite:
|
||
overwritten += 1
|
||
else:
|
||
added += 1
|
||
any_added = True
|
||
status = 'overwritten' if was_overwrite else 'imported'
|
||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': status})}\n\n"
|
||
except Exception as exc:
|
||
errors += 1
|
||
log.error("upload[%s]: failed to process %s: %s", user.handle, name, exc, exc_info=True)
|
||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': str(exc)})}\n\n"
|
||
finally:
|
||
if not kept:
|
||
staged.unlink(missing_ok=True)
|
||
|
||
# Retroactively apply CSV metadata to existing activities
|
||
csv_updates = 0
|
||
if metadata is not None:
|
||
from bincio.extract.strava_csv import apply_csv_to_data_dir
|
||
csv_updates = apply_csv_to_data_dir(dd, metadata)
|
||
if csv_updates:
|
||
yield f"data: {json.dumps({'type': 'csv', 'updates': csv_updates})}\n\n"
|
||
|
||
if any_added or csv_updates:
|
||
merge_all(dd)
|
||
if any_added:
|
||
tasks._trigger_rebuild(user.handle)
|
||
|
||
yield f"data: {json.dumps({'type': 'done', 'added': added, 'overwritten': overwritten, 'csv_updates': csv_updates, 'duplicates': duplicates, 'errors': errors})}\n\n"
|
||
|
||
if job_id:
|
||
tasks._job_finish(job_id)
|
||
|
||
return StreamingResponse(
|
||
event_stream(),
|
||
media_type="text/event-stream",
|
||
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
||
)
|
||
|
||
|
||
@router.post("/api/upload/strava-zip")
|
||
async def upload_strava_zip(
|
||
file: UploadFile = File(...),
|
||
private: str = Form(default="false"),
|
||
bincio_session: Optional[str] = Cookie(default=None),
|
||
) -> StreamingResponse:
|
||
"""Accept a Strava bulk export ZIP and stream SSE progress while processing.
|
||
|
||
The ZIP is written to a temp file, processed activity-by-activity, then deleted.
|
||
Originals are never kept — the UI informs the user of this upfront.
|
||
"""
|
||
user = deps._require_user(bincio_session)
|
||
if not file.filename or not file.filename.lower().endswith(".zip"):
|
||
raise HTTPException(400, "Please upload a .zip file")
|
||
|
||
privacy = "unlisted" if private.lower() in ("true", "1", "yes") else "public"
|
||
|
||
dd = deps._get_data_dir() / user.handle
|
||
import tempfile
|
||
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
|
||
zip_path = Path(tmp.name)
|
||
try:
|
||
while chunk := await file.read(1024 * 1024): # 1 MB chunks
|
||
tmp.write(chunk)
|
||
finally:
|
||
tmp.close()
|
||
|
||
from bincio.extract.strava_zip import strava_zip_iter
|
||
from bincio.render.merge import merge_all
|
||
|
||
log.info("strava-zip[%s]: received %s, privacy=%s", user.handle, file.filename, privacy)
|
||
|
||
def event_stream():
|
||
any_imported = False
|
||
imported_count = 0
|
||
error_count = 0
|
||
try:
|
||
for event in strava_zip_iter(zip_path, dd, privacy=privacy):
|
||
yield f"data: {json.dumps(event)}\n\n"
|
||
if event.get("type") == "progress":
|
||
status = event.get("status")
|
||
if status == "imported":
|
||
any_imported = True
|
||
imported_count += 1
|
||
elif status == "error":
|
||
error_count += 1
|
||
log.warning("strava-zip[%s]: error on %s: %s",
|
||
user.handle, event.get("name"), event.get("detail", ""))
|
||
if event.get("type") == "done":
|
||
log.info("strava-zip[%s]: done — imported=%d errors=%d",
|
||
user.handle, imported_count, error_count)
|
||
if any_imported:
|
||
merge_all(dd)
|
||
try:
|
||
from bincio.explore import bake_tracks
|
||
bake_tracks(user.handle, deps._get_data_dir())
|
||
except Exception as exc:
|
||
log.warning("strava-zip[%s]: bake_tracks failed (non-fatal): %s", user.handle, exc)
|
||
tasks._trigger_rebuild(user.handle)
|
||
except Exception as exc:
|
||
log.error("strava-zip[%s]: fatal error: %s", user.handle, exc, exc_info=True)
|
||
yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n"
|
||
finally:
|
||
zip_path.unlink(missing_ok=True)
|
||
|
||
return StreamingResponse(
|
||
event_stream(),
|
||
media_type="text/event-stream",
|
||
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
||
)
|