"""File upload endpoints (/api/upload/*).""" from __future__ import annotations import json import logging import uuid from pathlib import Path from typing import Optional from fastapi import APIRouter, Cookie, File, Form, HTTPException, Request, UploadFile from fastapi.responses import JSONResponse, StreamingResponse from bincio.serve import deps, tasks log = logging.getLogger("bincio.serve") router = APIRouter() _SUPPORTED_SUFFIXES = {".fit", ".gpx", ".tcx", ".fit.gz", ".gpx.gz", ".tcx.gz"} def _file_suffix(name: str) -> str: """Return the effective suffix, including .gz double-extension.""" p = Path(name.lower()) if p.suffix == ".gz": return p.stem.rsplit(".", 1)[-1].join([".", ".gz"]) if "." in p.stem else ".gz" return p.suffix def _upsert_index_summary(user_dir: Path, activity_id: str, activity: dict, geojson: Optional[dict] = None) -> None: """Add or update an activity summary in user_dir/index.json. Called after writing BAS activity files so that merge_all can include the activity in year shards. Without this, uploaded activities exist on disk but never appear in the browser feed. """ # Build preview coords from geojson if available ([lat, lng] order) preview: Optional[list] = None if geojson: try: coords = geojson.get("geometry", {}).get("coordinates", []) if coords: step = max(1, len(coords) // 9) preview = [[c[1], c[0]] for c in coords[::step]][:9] except (TypeError, IndexError, AttributeError): pass has_track = (user_dir / "activities" / f"{activity_id}.geojson").exists() summary = { "id": activity_id, "title": activity.get("title", activity_id), "sport": activity.get("sport"), "sub_sport": activity.get("sub_sport"), "started_at": activity.get("started_at"), "distance_m": activity.get("distance_m"), "duration_s": activity.get("duration_s"), "moving_time_s": activity.get("moving_time_s"), "elevation_gain_m": activity.get("elevation_gain_m"), "avg_speed_kmh": activity.get("avg_speed_kmh"), "max_speed_kmh": activity.get("max_speed_kmh"), "avg_hr_bpm": activity.get("avg_hr_bpm"), "max_hr_bpm": activity.get("max_hr_bpm"), "avg_cadence_rpm": activity.get("avg_cadence_rpm"), "avg_power_w": activity.get("avg_power_w"), "mmp": activity.get("mmp"), "best_efforts": activity.get("best_efforts"), "best_climb_m": activity.get("best_climb_m"), "source": activity.get("source"), "privacy": activity.get("privacy", "public"), "detail_url": f"activities/{activity_id}.json", "track_url": f"activities/{activity_id}.geojson" if has_track else None, "preview_coords": preview, } index_path = user_dir / "index.json" if index_path.exists(): index_data = json.loads(index_path.read_text(encoding="utf-8")) else: index_data = { "bas_version": "1.0", "owner": {"handle": user_dir.name}, "generated_at": None, "activities": [], } existing = {a["id"]: a for a in index_data.get("activities", [])} existing[activity_id] = summary index_data["activities"] = sorted(existing.values(), key=lambda a: a.get("started_at", ""), reverse=True) index_path.write_text(json.dumps(index_data, indent=2, ensure_ascii=False), encoding="utf-8") @router.post("/api/upload/bas") async def upload_bas_activity( request: Request, bincio_session: Optional[str] = Cookie(default=None), ) -> JSONResponse: """Accept a pre-extracted BAS activity JSON from the mobile app. Body (JSON): activity – full BAS activity dict (required, must have 'id') timeseries – timeseries dict (optional) geojson – GeoJSON dict (optional) Returns: {"ok": true, "id": "...", "status": "imported" | "duplicate"} """ user = deps._require_auth(request, bincio_session) body = await request.json() activity = body.get("activity") if not activity or not activity.get("id"): raise HTTPException(400, "Missing activity.id") activity_id = str(activity["id"]) deps._check_id(activity_id) user_dir = deps._get_data_dir() / user.handle acts_dir = user_dir / "activities" acts_dir.mkdir(parents=True, exist_ok=True) out = acts_dir / f"{activity_id}.json" if out.exists(): return JSONResponse({"ok": True, "id": activity_id, "status": "duplicate"}) out.write_text(json.dumps(activity, ensure_ascii=False, indent=2), encoding="utf-8") if body.get("timeseries"): ts_path = acts_dir / f"{activity_id}.timeseries.json" if not ts_path.exists(): ts_path.write_text(json.dumps(body["timeseries"], ensure_ascii=False), encoding="utf-8") geojson_body: Optional[dict] = body.get("geojson") or None if geojson_body: gj_path = acts_dir / f"{activity_id}.geojson" if not gj_path.exists(): gj_path.write_text(json.dumps(geojson_body, ensure_ascii=False), encoding="utf-8") _upsert_index_summary(user_dir, activity_id, activity, geojson_body) try: from bincio.render.merge import merge_one, write_combined_feed merge_one(user_dir, activity_id) write_combined_feed(deps._get_data_dir()) except Exception as exc: log.warning("upload/bas[%s]: merge/feed failed (non-fatal): %s", user.handle, exc) log.info("upload/bas[%s]: imported %s", user.handle, activity_id) return JSONResponse({"ok": True, "id": activity_id, "status": "imported"}) @router.post("/api/upload/raw") async def upload_raw_activity( request: Request, bincio_session: Optional[str] = Cookie(default=None), ) -> JSONResponse: """Accept a raw FIT/GPX file (base64-encoded) from the mobile app, extract it server-side, store it in the user's activity library, and return the full extracted data so the mobile can cache it locally. Used when the device WebView is too old to run Pyodide (e.g. Karoo / Chrome <69). Body (JSON): filename – original filename (used only to determine file extension) base64 – base64-encoded raw file bytes Auth: Authorization: Bearer Returns: {"ok": true, "id": "...", "detail": {...}, "timeseries": {...}|null, "geojson": {...}|null, "source_hash": ""} """ import base64 as _b64 import hashlib import shutil user = deps._require_auth(request, bincio_session) body = await request.json() filename_hint: str = body.get("filename") or "activity.fit" b64: str = body.get("base64") or "" user_title: Optional[str] = body.get("user_title") or None if not b64: raise HTTPException(400, "Missing base64 field") try: raw = _b64.b64decode(b64) except ValueError: raise HTTPException(400, "Invalid base64 encoding") source_hash = hashlib.sha256(raw).hexdigest() suffix = Path(filename_hint).suffix or ".fit" tmp_in = Path(f"/tmp/bincio_raw_{uuid.uuid4()}{suffix}") tmp_out = Path(f"/tmp/bincio_out_{uuid.uuid4()}") try: tmp_in.write_bytes(raw) tmp_out.mkdir() from bincio.extract.parsers.factory import parse_file from bincio.extract.metrics import compute from bincio.extract.writer import make_activity_id, write_activity from bincio.extract.timeseries import build_timeseries activity = parse_file(tmp_in) metrics = compute(activity) write_activity(activity, metrics, tmp_out, privacy="public", rdp_epsilon=0.0001) act_id = make_activity_id(activity) acts_tmp = tmp_out / "activities" detail_path = acts_tmp / f"{act_id}.json" ts_path = acts_tmp / f"{act_id}.timeseries.json" geojson_path = acts_tmp / f"{act_id}.geojson" if not ts_path.exists(): ts_data = build_timeseries(activity.points, activity.started_at, "public") if ts_data.get("t"): ts_path.write_text(json.dumps(ts_data)) detail = json.loads(detail_path.read_text()) timeseries = json.loads(ts_path.read_text()) if ts_path.exists() else None geojson = json.loads(geojson_path.read_text()) if geojson_path.exists() else None # Also store on the server so the activity appears in the user's feed. user_dir = deps._get_data_dir() / user.handle acts_dir = user_dir / "activities" acts_dir.mkdir(parents=True, exist_ok=True) out = acts_dir / f"{act_id}.json" if not out.exists(): out.write_text(json.dumps(detail, ensure_ascii=False, indent=2), encoding="utf-8") if timeseries and not (acts_dir / f"{act_id}.timeseries.json").exists(): (acts_dir / f"{act_id}.timeseries.json").write_text(json.dumps(timeseries), encoding="utf-8") if geojson and not (acts_dir / f"{act_id}.geojson").exists(): (acts_dir / f"{act_id}.geojson").write_text(json.dumps(geojson), encoding="utf-8") _upsert_index_summary(user_dir, act_id, detail, geojson) if user_title: import yaml as _yaml edits_dir = user_dir / "edits" edits_dir.mkdir(parents=True, exist_ok=True) (edits_dir / f"{act_id}.md").write_text( f"---\n{_yaml.dump({'title': user_title}, allow_unicode=True)}---\n", encoding="utf-8", ) except Exception as exc: log.warning("upload/raw[%s]: extraction failed: %s", user.handle, exc) raise HTTPException(422, f"Could not extract activity: {exc}") from exc finally: tmp_in.unlink(missing_ok=True) shutil.rmtree(tmp_out, ignore_errors=True) # Merge and update feed — best effort; a race or transient FS error here must # not turn a successful extraction into a 422 (the file is on disk; the mobile # would retry indefinitely and the activity would never be marked synced). try: from bincio.render.merge import merge_one, write_combined_feed merge_one(user_dir, act_id) write_combined_feed(deps._get_data_dir()) except Exception as exc: log.warning("upload/raw[%s]: merge/feed failed (non-fatal): %s", user.handle, exc) log.info("upload/raw[%s]: imported %s", user.handle, act_id) return JSONResponse({ "ok": True, "id": act_id, "detail": detail, "timeseries": timeseries, "geojson": geojson, "source_hash": source_hash, }) @router.post("/api/upload") async def upload_activity( files: list[UploadFile] = File(...), store_original: bool = Form(False), overwrite: bool = Form(False), bincio_session: Optional[str] = Cookie(default=None), ) -> StreamingResponse: """Accept FIT/GPX/TCX files and/or activities.csv; stream SSE progress while processing. activities.csv (Strava export format) can be included in the batch to: - Enrich activity files in the same batch (matched by filename) - Retroactively update sidecars for existing activities (matched by strava_id) SSE events: {"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"overwritten"|"duplicate"|"error"} {"type": "csv", "updates": N} -- only when CSV was included {"type": "done", "added": N, "csv_updates": N, "duplicates": N, "overwritten": N, "errors": N} """ from bincio.extract.ingest import ingest_parsed from bincio.extract.parsers.factory import parse_file from bincio.extract.writer import make_activity_id from bincio.render.merge import merge_all user = deps._require_user(bincio_session) dd = deps._get_data_dir() / user.handle staging = dd / "_uploads" staging.mkdir(exist_ok=True) # Read all files into memory now (async), then process synchronously in the generator csv_bytes_list: list[bytes] = [] activity_items: list[tuple[str, bytes]] = [] # (original_filename, bytes) for f in files: fname = Path(f.filename or "").name raw = await f.read() if fname.lower().endswith(".csv"): csv_bytes_list.append(raw) else: activity_items.append((fname, raw)) # Build metadata from the first CSV metadata = None if csv_bytes_list: from bincio.extract.strava_csv import StravaMetadata import tempfile with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: tmp.write(csv_bytes_list[0]) tmp_path = Path(tmp.name) try: metadata = StravaMetadata(tmp_path) finally: tmp_path.unlink(missing_ok=True) total_files = len(activity_items) job_id = tasks._job_start(user.handle, total_files) if total_files > 0 else None def event_stream(): added = 0 overwritten = 0 duplicates = 0 errors = 0 any_added = False for n, (name, contents) in enumerate(activity_items, 1): if job_id: tasks._job_update(job_id, n - 1, name) suffix = _file_suffix(name) if suffix not in _SUPPORTED_SUFFIXES: errors += 1 yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'unsupported type'})}\n\n" continue if len(contents) > 50 * 1024 * 1024: errors += 1 yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'file too large'})}\n\n" continue staged = staging / name staged.write_bytes(contents) kept = False try: activity = parse_file(staged) if metadata is not None: metadata.enrich(name, activity) activity_id = make_activity_id(activity) was_overwrite = False if (dd / "activities" / f"{activity_id}.json").exists(): if not overwrite: duplicates += 1 yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'duplicate'})}\n\n" continue # Overwrite: delete existing files before re-ingesting. for ext in (".json", ".geojson", ".timeseries.json"): (dd / "activities" / f"{activity_id}{ext}").unlink(missing_ok=True) # Remove stale summary from index so ingest_parsed writes a clean one index_path = dd / "index.json" if index_path.exists(): idx = json.loads(index_path.read_text(encoding="utf-8")) idx["activities"] = [a for a in idx.get("activities", []) if a.get("id") != activity_id] index_path.write_text(json.dumps(idx, indent=2, ensure_ascii=False)) # Remove from dedup hash cache so the new file isn't blocked cache_path = dd / ".bincio_cache.json" if cache_path.exists(): try: cache = json.loads(cache_path.read_text(encoding="utf-8")) cache.pop(activity_id, None) cache_path.write_text(json.dumps(cache, ensure_ascii=False)) except (OSError, json.JSONDecodeError): pass # Remove merged copies (merge_all will regenerate them after ingest) merged_acts = dd / "_merged" / "activities" if merged_acts.exists(): for ext in (".json", ".geojson", ".timeseries.json"): p = merged_acts / f"{activity_id}{ext}" if p.exists() or p.is_symlink(): p.unlink(missing_ok=True) was_overwrite = True ingest_parsed(activity, dd, privacy="public") if store_original: originals_dir = dd / "originals" originals_dir.mkdir(exist_ok=True) staged.rename(originals_dir / name) kept = True if was_overwrite: overwritten += 1 else: added += 1 any_added = True status = 'overwritten' if was_overwrite else 'imported' yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': status})}\n\n" except Exception as exc: errors += 1 log.error("upload[%s]: failed to process %s: %s", user.handle, name, exc, exc_info=True) yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': str(exc)})}\n\n" finally: if not kept: staged.unlink(missing_ok=True) # Retroactively apply CSV metadata to existing activities csv_updates = 0 if metadata is not None: from bincio.extract.strava_csv import apply_csv_to_data_dir csv_updates = apply_csv_to_data_dir(dd, metadata) if csv_updates: yield f"data: {json.dumps({'type': 'csv', 'updates': csv_updates})}\n\n" if any_added or csv_updates: merge_all(dd) if any_added: tasks._trigger_rebuild(user.handle) yield f"data: {json.dumps({'type': 'done', 'added': added, 'overwritten': overwritten, 'csv_updates': csv_updates, 'duplicates': duplicates, 'errors': errors})}\n\n" if job_id: tasks._job_finish(job_id) return StreamingResponse( event_stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @router.post("/api/upload/strava-zip") async def upload_strava_zip( file: UploadFile = File(...), private: str = Form(default="false"), bincio_session: Optional[str] = Cookie(default=None), ) -> StreamingResponse: """Accept a Strava bulk export ZIP and stream SSE progress while processing. The ZIP is written to a temp file, processed activity-by-activity, then deleted. Originals are never kept — the UI informs the user of this upfront. """ user = deps._require_user(bincio_session) if not file.filename or not file.filename.lower().endswith(".zip"): raise HTTPException(400, "Please upload a .zip file") privacy = "unlisted" if private.lower() in ("true", "1", "yes") else "public" dd = deps._get_data_dir() / user.handle import tempfile tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False) zip_path = Path(tmp.name) try: while chunk := await file.read(1024 * 1024): # 1 MB chunks tmp.write(chunk) finally: tmp.close() from bincio.extract.strava_zip import strava_zip_iter from bincio.render.merge import merge_all log.info("strava-zip[%s]: received %s, privacy=%s", user.handle, file.filename, privacy) def event_stream(): any_imported = False imported_count = 0 error_count = 0 try: for event in strava_zip_iter(zip_path, dd, privacy=privacy): yield f"data: {json.dumps(event)}\n\n" if event.get("type") == "progress": status = event.get("status") if status == "imported": any_imported = True imported_count += 1 elif status == "error": error_count += 1 log.warning("strava-zip[%s]: error on %s: %s", user.handle, event.get("name"), event.get("detail", "")) if event.get("type") == "done": log.info("strava-zip[%s]: done — imported=%d errors=%d", user.handle, imported_count, error_count) if any_imported: merge_all(dd) try: from bincio.explore import bake_tracks bake_tracks(user.handle, deps._get_data_dir()) except Exception as exc: log.warning("strava-zip[%s]: bake_tracks failed (non-fatal): %s", user.handle, exc) tasks._trigger_rebuild(user.handle) except Exception as exc: log.error("strava-zip[%s]: fatal error: %s", user.handle, exc, exc_info=True) yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n" finally: zip_path.unlink(missing_ok=True) return StreamingResponse( event_stream(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, )