Files
bincio-activity/bincio/serve/routers/uploads.py
T
Davide Scaini 27f6d141f7 Refactor step 4: narrow broad except Exception catches
Replaced 28 bare `except Exception` catches across 8 files with specific
exception types reflecting the actual failure modes:

- JSON file reads → (OSError, json.JSONDecodeError)
- datetime parsing → ValueError
- base64 decoding → ValueError
- YAML parsing → (OSError, yaml.YAMLError); import moved above try
- GeoJSON coord extraction → (TypeError, IndexError, AttributeError)
- Startup temp-file cleanup → OSError
- Single JSON line parsing (SSE batch) → json.JSONDecodeError

Kept broad catches only where intentional:
- Background thread top-level guards (tasks.py, admin.py) with log.exception
- SSE stream generator tops (strava.py, garmin.py, uploads.py)
- Per-item batch loops that must not abort the whole operation
- Explicitly non-fatal post-upload merge steps with log.warning
2026-05-13 23:58:14 +02:00

502 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""File upload endpoints (/api/upload/*)."""
from __future__ import annotations
import json
import logging
import uuid
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Cookie, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import JSONResponse, StreamingResponse
from bincio.serve import deps, tasks
log = logging.getLogger("bincio.serve")
router = APIRouter()
_SUPPORTED_SUFFIXES = {".fit", ".gpx", ".tcx", ".fit.gz", ".gpx.gz", ".tcx.gz"}
def _file_suffix(name: str) -> str:
"""Return the effective suffix, including .gz double-extension."""
p = Path(name.lower())
if p.suffix == ".gz":
return p.stem.rsplit(".", 1)[-1].join([".", ".gz"]) if "." in p.stem else ".gz"
return p.suffix
def _upsert_index_summary(user_dir: Path, activity_id: str, activity: dict, geojson: Optional[dict] = None) -> None:
"""Add or update an activity summary in user_dir/index.json.
Called after writing BAS activity files so that merge_all can include the
activity in year shards. Without this, uploaded activities exist on disk
but never appear in the browser feed.
"""
# Build preview coords from geojson if available ([lat, lng] order)
preview: Optional[list] = None
if geojson:
try:
coords = geojson.get("geometry", {}).get("coordinates", [])
if coords:
step = max(1, len(coords) // 9)
preview = [[c[1], c[0]] for c in coords[::step]][:9]
except (TypeError, IndexError, AttributeError):
pass
has_track = (user_dir / "activities" / f"{activity_id}.geojson").exists()
summary = {
"id": activity_id,
"title": activity.get("title", activity_id),
"sport": activity.get("sport"),
"sub_sport": activity.get("sub_sport"),
"started_at": activity.get("started_at"),
"distance_m": activity.get("distance_m"),
"duration_s": activity.get("duration_s"),
"moving_time_s": activity.get("moving_time_s"),
"elevation_gain_m": activity.get("elevation_gain_m"),
"avg_speed_kmh": activity.get("avg_speed_kmh"),
"max_speed_kmh": activity.get("max_speed_kmh"),
"avg_hr_bpm": activity.get("avg_hr_bpm"),
"max_hr_bpm": activity.get("max_hr_bpm"),
"avg_cadence_rpm": activity.get("avg_cadence_rpm"),
"avg_power_w": activity.get("avg_power_w"),
"mmp": activity.get("mmp"),
"best_efforts": activity.get("best_efforts"),
"best_climb_m": activity.get("best_climb_m"),
"source": activity.get("source"),
"privacy": activity.get("privacy", "public"),
"detail_url": f"activities/{activity_id}.json",
"track_url": f"activities/{activity_id}.geojson" if has_track else None,
"preview_coords": preview,
}
index_path = user_dir / "index.json"
if index_path.exists():
index_data = json.loads(index_path.read_text(encoding="utf-8"))
else:
index_data = {
"bas_version": "1.0",
"owner": {"handle": user_dir.name},
"generated_at": None,
"activities": [],
}
existing = {a["id"]: a for a in index_data.get("activities", [])}
existing[activity_id] = summary
index_data["activities"] = sorted(existing.values(), key=lambda a: a.get("started_at", ""), reverse=True)
index_path.write_text(json.dumps(index_data, indent=2, ensure_ascii=False), encoding="utf-8")
@router.post("/api/upload/bas")
async def upload_bas_activity(
request: Request,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""Accept a pre-extracted BAS activity JSON from the mobile app.
Body (JSON):
activity full BAS activity dict (required, must have 'id')
timeseries timeseries dict (optional)
geojson GeoJSON dict (optional)
Returns:
{"ok": true, "id": "...", "status": "imported" | "duplicate"}
"""
user = deps._require_auth(request, bincio_session)
body = await request.json()
activity = body.get("activity")
if not activity or not activity.get("id"):
raise HTTPException(400, "Missing activity.id")
activity_id = str(activity["id"])
deps._check_id(activity_id)
user_dir = deps._get_data_dir() / user.handle
acts_dir = user_dir / "activities"
acts_dir.mkdir(parents=True, exist_ok=True)
out = acts_dir / f"{activity_id}.json"
if out.exists():
return JSONResponse({"ok": True, "id": activity_id, "status": "duplicate"})
out.write_text(json.dumps(activity, ensure_ascii=False, indent=2), encoding="utf-8")
if body.get("timeseries"):
ts_path = acts_dir / f"{activity_id}.timeseries.json"
if not ts_path.exists():
ts_path.write_text(json.dumps(body["timeseries"], ensure_ascii=False), encoding="utf-8")
geojson_body: Optional[dict] = body.get("geojson") or None
if geojson_body:
gj_path = acts_dir / f"{activity_id}.geojson"
if not gj_path.exists():
gj_path.write_text(json.dumps(geojson_body, ensure_ascii=False), encoding="utf-8")
_upsert_index_summary(user_dir, activity_id, activity, geojson_body)
try:
from bincio.render.merge import merge_one, write_combined_feed
merge_one(user_dir, activity_id)
write_combined_feed(deps._get_data_dir())
except Exception as exc:
log.warning("upload/bas[%s]: merge/feed failed (non-fatal): %s", user.handle, exc)
log.info("upload/bas[%s]: imported %s", user.handle, activity_id)
return JSONResponse({"ok": True, "id": activity_id, "status": "imported"})
@router.post("/api/upload/raw")
async def upload_raw_activity(
request: Request,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""Accept a raw FIT/GPX file (base64-encoded) from the mobile app, extract it
server-side, store it in the user's activity library, and return the full
extracted data so the mobile can cache it locally.
Used when the device WebView is too old to run Pyodide (e.g. Karoo / Chrome <69).
Body (JSON):
filename original filename (used only to determine file extension)
base64 base64-encoded raw file bytes
Auth: Authorization: Bearer <token>
Returns:
{"ok": true, "id": "...", "detail": {...}, "timeseries": {...}|null,
"geojson": {...}|null, "source_hash": "<sha256-hex>"}
"""
import base64 as _b64
import hashlib
import shutil
user = deps._require_auth(request, bincio_session)
body = await request.json()
filename_hint: str = body.get("filename") or "activity.fit"
b64: str = body.get("base64") or ""
user_title: Optional[str] = body.get("user_title") or None
if not b64:
raise HTTPException(400, "Missing base64 field")
try:
raw = _b64.b64decode(b64)
except ValueError:
raise HTTPException(400, "Invalid base64 encoding")
source_hash = hashlib.sha256(raw).hexdigest()
suffix = Path(filename_hint).suffix or ".fit"
tmp_in = Path(f"/tmp/bincio_raw_{uuid.uuid4()}{suffix}")
tmp_out = Path(f"/tmp/bincio_out_{uuid.uuid4()}")
try:
tmp_in.write_bytes(raw)
tmp_out.mkdir()
from bincio.extract.parsers.factory import parse_file
from bincio.extract.metrics import compute
from bincio.extract.writer import make_activity_id, write_activity
from bincio.extract.timeseries import build_timeseries
activity = parse_file(tmp_in)
metrics = compute(activity)
write_activity(activity, metrics, tmp_out, privacy="public", rdp_epsilon=0.0001)
act_id = make_activity_id(activity)
acts_tmp = tmp_out / "activities"
detail_path = acts_tmp / f"{act_id}.json"
ts_path = acts_tmp / f"{act_id}.timeseries.json"
geojson_path = acts_tmp / f"{act_id}.geojson"
if not ts_path.exists():
ts_data = build_timeseries(activity.points, activity.started_at, "public")
if ts_data.get("t"):
ts_path.write_text(json.dumps(ts_data))
detail = json.loads(detail_path.read_text())
timeseries = json.loads(ts_path.read_text()) if ts_path.exists() else None
geojson = json.loads(geojson_path.read_text()) if geojson_path.exists() else None
# Also store on the server so the activity appears in the user's feed.
user_dir = deps._get_data_dir() / user.handle
acts_dir = user_dir / "activities"
acts_dir.mkdir(parents=True, exist_ok=True)
out = acts_dir / f"{act_id}.json"
if not out.exists():
out.write_text(json.dumps(detail, ensure_ascii=False, indent=2), encoding="utf-8")
if timeseries and not (acts_dir / f"{act_id}.timeseries.json").exists():
(acts_dir / f"{act_id}.timeseries.json").write_text(json.dumps(timeseries), encoding="utf-8")
if geojson and not (acts_dir / f"{act_id}.geojson").exists():
(acts_dir / f"{act_id}.geojson").write_text(json.dumps(geojson), encoding="utf-8")
_upsert_index_summary(user_dir, act_id, detail, geojson)
if user_title:
import yaml as _yaml
edits_dir = user_dir / "edits"
edits_dir.mkdir(parents=True, exist_ok=True)
(edits_dir / f"{act_id}.md").write_text(
f"---\n{_yaml.dump({'title': user_title}, allow_unicode=True)}---\n",
encoding="utf-8",
)
except Exception as exc:
log.warning("upload/raw[%s]: extraction failed: %s", user.handle, exc)
raise HTTPException(422, f"Could not extract activity: {exc}") from exc
finally:
tmp_in.unlink(missing_ok=True)
shutil.rmtree(tmp_out, ignore_errors=True)
# Merge and update feed — best effort; a race or transient FS error here must
# not turn a successful extraction into a 422 (the file is on disk; the mobile
# would retry indefinitely and the activity would never be marked synced).
try:
from bincio.render.merge import merge_one, write_combined_feed
merge_one(user_dir, act_id)
write_combined_feed(deps._get_data_dir())
except Exception as exc:
log.warning("upload/raw[%s]: merge/feed failed (non-fatal): %s", user.handle, exc)
log.info("upload/raw[%s]: imported %s", user.handle, act_id)
return JSONResponse({
"ok": True,
"id": act_id,
"detail": detail,
"timeseries": timeseries,
"geojson": geojson,
"source_hash": source_hash,
})
@router.post("/api/upload")
async def upload_activity(
files: list[UploadFile] = File(...),
store_original: bool = Form(False),
overwrite: bool = Form(False),
bincio_session: Optional[str] = Cookie(default=None),
) -> StreamingResponse:
"""Accept FIT/GPX/TCX files and/or activities.csv; stream SSE progress while processing.
activities.csv (Strava export format) can be included in the batch to:
- Enrich activity files in the same batch (matched by filename)
- Retroactively update sidecars for existing activities (matched by strava_id)
SSE events:
{"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"overwritten"|"duplicate"|"error"}
{"type": "csv", "updates": N} -- only when CSV was included
{"type": "done", "added": N, "csv_updates": N, "duplicates": N, "overwritten": N, "errors": N}
"""
from bincio.extract.ingest import ingest_parsed
from bincio.extract.parsers.factory import parse_file
from bincio.extract.writer import make_activity_id
from bincio.render.merge import merge_all
user = deps._require_user(bincio_session)
dd = deps._get_data_dir() / user.handle
staging = dd / "_uploads"
staging.mkdir(exist_ok=True)
# Read all files into memory now (async), then process synchronously in the generator
csv_bytes_list: list[bytes] = []
activity_items: list[tuple[str, bytes]] = [] # (original_filename, bytes)
for f in files:
fname = Path(f.filename or "").name
raw = await f.read()
if fname.lower().endswith(".csv"):
csv_bytes_list.append(raw)
else:
activity_items.append((fname, raw))
# Build metadata from the first CSV
metadata = None
if csv_bytes_list:
from bincio.extract.strava_csv import StravaMetadata
import tempfile
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
tmp.write(csv_bytes_list[0])
tmp_path = Path(tmp.name)
try:
metadata = StravaMetadata(tmp_path)
finally:
tmp_path.unlink(missing_ok=True)
total_files = len(activity_items)
job_id = tasks._job_start(user.handle, total_files) if total_files > 0 else None
def event_stream():
added = 0
overwritten = 0
duplicates = 0
errors = 0
any_added = False
for n, (name, contents) in enumerate(activity_items, 1):
if job_id:
tasks._job_update(job_id, n - 1, name)
suffix = _file_suffix(name)
if suffix not in _SUPPORTED_SUFFIXES:
errors += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'unsupported type'})}\n\n"
continue
if len(contents) > 50 * 1024 * 1024:
errors += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'file too large'})}\n\n"
continue
staged = staging / name
staged.write_bytes(contents)
kept = False
try:
activity = parse_file(staged)
if metadata is not None:
metadata.enrich(name, activity)
activity_id = make_activity_id(activity)
was_overwrite = False
if (dd / "activities" / f"{activity_id}.json").exists():
if not overwrite:
duplicates += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'duplicate'})}\n\n"
continue
# Overwrite: delete existing files before re-ingesting.
for ext in (".json", ".geojson", ".timeseries.json"):
(dd / "activities" / f"{activity_id}{ext}").unlink(missing_ok=True)
# Remove stale summary from index so ingest_parsed writes a clean one
index_path = dd / "index.json"
if index_path.exists():
idx = json.loads(index_path.read_text(encoding="utf-8"))
idx["activities"] = [a for a in idx.get("activities", []) if a.get("id") != activity_id]
index_path.write_text(json.dumps(idx, indent=2, ensure_ascii=False))
# Remove from dedup hash cache so the new file isn't blocked
cache_path = dd / ".bincio_cache.json"
if cache_path.exists():
try:
cache = json.loads(cache_path.read_text(encoding="utf-8"))
cache.pop(activity_id, None)
cache_path.write_text(json.dumps(cache, ensure_ascii=False))
except (OSError, json.JSONDecodeError):
pass
# Remove merged copies (merge_all will regenerate them after ingest)
merged_acts = dd / "_merged" / "activities"
if merged_acts.exists():
for ext in (".json", ".geojson", ".timeseries.json"):
p = merged_acts / f"{activity_id}{ext}"
if p.exists() or p.is_symlink():
p.unlink(missing_ok=True)
was_overwrite = True
ingest_parsed(activity, dd, privacy="public")
if store_original:
originals_dir = dd / "originals"
originals_dir.mkdir(exist_ok=True)
staged.rename(originals_dir / name)
kept = True
if was_overwrite:
overwritten += 1
else:
added += 1
any_added = True
status = 'overwritten' if was_overwrite else 'imported'
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': status})}\n\n"
except Exception as exc:
errors += 1
log.error("upload[%s]: failed to process %s: %s", user.handle, name, exc, exc_info=True)
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': str(exc)})}\n\n"
finally:
if not kept:
staged.unlink(missing_ok=True)
# Retroactively apply CSV metadata to existing activities
csv_updates = 0
if metadata is not None:
from bincio.extract.strava_csv import apply_csv_to_data_dir
csv_updates = apply_csv_to_data_dir(dd, metadata)
if csv_updates:
yield f"data: {json.dumps({'type': 'csv', 'updates': csv_updates})}\n\n"
if any_added or csv_updates:
merge_all(dd)
if any_added:
tasks._trigger_rebuild(user.handle)
yield f"data: {json.dumps({'type': 'done', 'added': added, 'overwritten': overwritten, 'csv_updates': csv_updates, 'duplicates': duplicates, 'errors': errors})}\n\n"
if job_id:
tasks._job_finish(job_id)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@router.post("/api/upload/strava-zip")
async def upload_strava_zip(
file: UploadFile = File(...),
private: str = Form(default="false"),
bincio_session: Optional[str] = Cookie(default=None),
) -> StreamingResponse:
"""Accept a Strava bulk export ZIP and stream SSE progress while processing.
The ZIP is written to a temp file, processed activity-by-activity, then deleted.
Originals are never kept — the UI informs the user of this upfront.
"""
user = deps._require_user(bincio_session)
if not file.filename or not file.filename.lower().endswith(".zip"):
raise HTTPException(400, "Please upload a .zip file")
privacy = "unlisted" if private.lower() in ("true", "1", "yes") else "public"
dd = deps._get_data_dir() / user.handle
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
zip_path = Path(tmp.name)
try:
while chunk := await file.read(1024 * 1024): # 1 MB chunks
tmp.write(chunk)
finally:
tmp.close()
from bincio.extract.strava_zip import strava_zip_iter
from bincio.render.merge import merge_all
log.info("strava-zip[%s]: received %s, privacy=%s", user.handle, file.filename, privacy)
def event_stream():
any_imported = False
imported_count = 0
error_count = 0
try:
for event in strava_zip_iter(zip_path, dd, privacy=privacy):
yield f"data: {json.dumps(event)}\n\n"
if event.get("type") == "progress":
status = event.get("status")
if status == "imported":
any_imported = True
imported_count += 1
elif status == "error":
error_count += 1
log.warning("strava-zip[%s]: error on %s: %s",
user.handle, event.get("name"), event.get("detail", ""))
if event.get("type") == "done":
log.info("strava-zip[%s]: done — imported=%d errors=%d",
user.handle, imported_count, error_count)
if any_imported:
merge_all(dd)
tasks._trigger_rebuild(user.handle)
except Exception as exc:
log.error("strava-zip[%s]: fatal error: %s", user.handle, exc, exc_info=True)
yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n"
finally:
zip_path.unlink(missing_ok=True)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)