From 82830222ba8ead3f199596979d28f581398349ff Mon Sep 17 00:00:00 2001 From: Davide Scaini Date: Sat, 11 Apr 2026 08:33:21 +0200 Subject: [PATCH] =?UTF-8?q?=20=20For=20users=20uploading:=20=20=20-=20POST?= =?UTF-8?q?=20/api/upload=20now=20returns=20text/event-stream=20instead=20?= =?UTF-8?q?of=20JSON=20=20=20-=20Per-file=20progress=20events=20stream=20b?= =?UTF-8?q?ack=20as=20each=20file=20is=20processed:=20=E2=86=93=203/47=20(?= =?UTF-8?q?6%)=20=E2=80=94=20morning=5Fride.fit=20=20=20-=20Final=20done?= =?UTF-8?q?=20event=20shows=20the=20summary:=20"12=20added,=2035=20duplica?= =?UTF-8?q?tes"=20=20=20-=20The=20Vite=20proxy=20is=20configured=20to=20st?= =?UTF-8?q?ream=20this=20properly=20(no=20buffering)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For the admin: - New GET /api/admin/jobs endpoint (admin-only) returns the list of active upload jobs, each with user, started_at, total, done, current (filename being processed) - A pulsing amber badge appears in the nav bar for admins when any user has an active upload running — it shows e.g. "2 uploads running" with a tooltip listing each user's progress (@alice: 12/50 files) - Polls every 5 seconds, disappears automatically when all jobs finish --- bincio/edit/server.py | 132 ++++++++++++------------ bincio/serve/server.py | 200 ++++++++++++++++++++++++------------ site/astro.config.mjs | 7 +- site/src/layouts/Base.astro | 116 +++++++++++++++++---- 4 files changed, 302 insertions(+), 153 deletions(-) diff --git a/bincio/edit/server.py b/bincio/edit/server.py index f70faee..cc3acd6 100644 --- a/bincio/edit/server.py +++ b/bincio/edit/server.py @@ -540,11 +540,11 @@ def _file_suffix(name: str) -> str: async def upload_activity( files: list[UploadFile] = File(...), store_original: bool = Form(False), -) -> JSONResponse: - """Accept FIT/GPX/TCX files and/or activities.csv, extract, update index, re-merge. +) -> StreamingResponse: + """Accept FIT/GPX/TCX files and/or activities.csv; stream SSE progress while processing. activities.csv (Strava export format) can be included in the batch to: - - Enrich activity files being uploaded in the same batch (matched by filename) + - Enrich activity files in the same batch (matched by filename) - Retroactively update sidecars for existing activities (matched by strava_id) """ from bincio.extract.ingest import ingest_parsed @@ -556,90 +556,96 @@ async def upload_activity( staging = dd / "_uploads" staging.mkdir(exist_ok=True) - _MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB + # Read all files into memory now (async), then process synchronously in the generator + csv_bytes_list: list[bytes] = [] + activity_items: list[tuple[str, bytes]] = [] - # Separate CSV files from activity files - csv_files: list[UploadFile] = [] - activity_files: list[UploadFile] = [] for f in files: - name = Path(f.filename or "").name.lower() - if name.endswith(".csv"): - csv_files.append(f) + fname = Path(f.filename or "").name + raw = await f.read() + if fname.lower().endswith(".csv"): + csv_bytes_list.append(raw) else: - activity_files.append(f) + activity_items.append((fname, raw)) # Build metadata from the first CSV found (activities.csv from Strava export) metadata = None - if csv_files: + if csv_bytes_list: from bincio.extract.strava_csv import StravaMetadata import tempfile - csv_upload = csv_files[0] - csv_bytes = await csv_upload.read() with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: - tmp.write(csv_bytes) + tmp.write(csv_bytes_list[0]) tmp_path = Path(tmp.name) try: metadata = StravaMetadata(tmp_path) finally: tmp_path.unlink(missing_ok=True) - results = [] - any_added = False + total_files = len(activity_items) - for file in activity_files: - name = Path(file.filename or "upload.fit").name - suffix = _file_suffix(name) - if suffix not in _SUPPORTED_SUFFIXES: - results.append({"name": name, "ok": False, "error": f"Unsupported file type '{Path(name).suffix}'"}) - continue + def event_stream(): + added = 0 + duplicates = 0 + errors = 0 + any_added = False - contents = await file.read() - if len(contents) > _MAX_UPLOAD_BYTES: - results.append({"name": name, "ok": False, "error": "File too large (max 50 MB)"}) - continue - - staged = staging / name - staged.write_bytes(contents) - kept = False - try: - activity = parse_file(staged) - - # Enrich with CSV metadata when available (matched by filename) - if metadata is not None: - metadata.enrich(name, activity) - - activity_id = make_activity_id(activity) - if (dd / "activities" / f"{activity_id}.json").exists(): - results.append({"name": name, "ok": False, "error": "duplicate"}) + for n, (name, contents) in enumerate(activity_items, 1): + suffix = _file_suffix(name) + if suffix not in _SUPPORTED_SUFFIXES: + errors += 1 + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'unsupported type'})}\n\n" continue - ingest_parsed(activity, dd, privacy="public") + if len(contents) > _MAX_UPLOAD_BYTES: + errors += 1 + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'file too large'})}\n\n" + continue - if store_original: - originals_dir = dd / "originals" - originals_dir.mkdir(exist_ok=True) - staged.rename(originals_dir / name) - kept = True + staged = staging / name + staged.write_bytes(contents) + kept = False + try: + activity = parse_file(staged) + if metadata is not None: + metadata.enrich(name, activity) + activity_id = make_activity_id(activity) + if (dd / "activities" / f"{activity_id}.json").exists(): + duplicates += 1 + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'duplicate'})}\n\n" + continue + ingest_parsed(activity, dd, privacy="public") + if store_original: + originals_dir = dd / "originals" + originals_dir.mkdir(exist_ok=True) + staged.rename(originals_dir / name) + kept = True + added += 1 + any_added = True + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'imported'})}\n\n" + except Exception: + errors += 1 + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error'})}\n\n" + finally: + if not kept: + staged.unlink(missing_ok=True) - results.append({"name": name, "ok": True, "id": activity_id}) - any_added = True - except Exception: - results.append({"name": name, "ok": False, "error": "Processing failed"}) - finally: - if not kept: - staged.unlink(missing_ok=True) + csv_updates = 0 + if metadata is not None: + from bincio.extract.strava_csv import apply_csv_to_data_dir + csv_updates = apply_csv_to_data_dir(dd, metadata) + if csv_updates: + yield f"data: {json.dumps({'type': 'csv', 'updates': csv_updates})}\n\n" - # Retroactively update sidecars for existing activities matched by strava_id - csv_updates = 0 - if metadata is not None: - from bincio.extract.strava_csv import apply_csv_to_data_dir - csv_updates = apply_csv_to_data_dir(dd, metadata) + if any_added or csv_updates: + merge_all(dd) - if any_added or csv_updates: - merge_all(dd) + yield f"data: {json.dumps({'type': 'done', 'added': added, 'csv_updates': csv_updates, 'duplicates': duplicates, 'errors': errors})}\n\n" - added = [r for r in results if r["ok"]] - return JSONResponse({"ok": True, "added": len(added), "csv_updates": csv_updates, "results": results}) + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) @app.post("/api/import-bas") diff --git a/bincio/serve/server.py b/bincio/serve/server.py index ef69c02..52057cf 100644 --- a/bincio/serve/server.py +++ b/bincio/serve/server.py @@ -13,7 +13,9 @@ import re import secrets import shutil import subprocess +import threading import time +import uuid from pathlib import Path from typing import Any, Optional @@ -42,6 +44,40 @@ from bincio.serve.db import ( use_invite, ) +# ── Active job tracker ─────────────────────────────────────────────────────── +# Tracks in-progress upload/processing jobs so admins can see what's running. +# Jobs are added when a streaming upload starts and removed when it finishes. + +_jobs_lock = threading.Lock() +_active_jobs: dict[str, dict] = {} + + +def _job_start(user_handle: str, total_files: int) -> str: + job_id = uuid.uuid4().hex[:8] + with _jobs_lock: + _active_jobs[job_id] = { + "id": job_id, + "user": user_handle, + "started_at": int(time.time()), + "total": total_files, + "done": 0, + "current": "", + } + return job_id + + +def _job_update(job_id: str, done: int, current: str) -> None: + with _jobs_lock: + if job_id in _active_jobs: + _active_jobs[job_id]["done"] = done + _active_jobs[job_id]["current"] = current + + +def _job_finish(job_id: str) -> None: + with _jobs_lock: + _active_jobs.pop(job_id, None) + + # ── Globals (set by CLI before uvicorn starts) ──────────────────────────────── data_dir: Path | None = None @@ -346,6 +382,15 @@ async def admin_users(bincio_session: Optional[str] = Cookie(default=None)) -> J } for u in users]) +@app.get("/api/admin/jobs") +async def admin_jobs(bincio_session: Optional[str] = Cookie(default=None)) -> JSONResponse: + """Return currently active upload/processing jobs. Admin only.""" + _require_admin(bincio_session) + with _jobs_lock: + jobs = list(_active_jobs.values()) + return JSONResponse(jobs) + + # ── Write API (ported from bincio edit, auth-gated) ─────────────────────────── def _user_data_dir(handle: str) -> Path: @@ -529,12 +574,17 @@ async def upload_activity( files: list[UploadFile] = File(...), store_original: bool = Form(False), bincio_session: Optional[str] = Cookie(default=None), -) -> JSONResponse: - """Accept FIT/GPX/TCX files and/or activities.csv, extract, update index, re-merge. +) -> StreamingResponse: + """Accept FIT/GPX/TCX files and/or activities.csv; stream SSE progress while processing. activities.csv (Strava export format) can be included in the batch to: - - Enrich activity files being uploaded in the same batch (matched by filename) + - Enrich activity files in the same batch (matched by filename) - Retroactively update sidecars for existing activities (matched by strava_id) + + SSE events: + {"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"duplicate"|"error"} + {"type": "csv", "updates": N} -- only when CSV was included + {"type": "done", "added": N, "csv_updates": N, "duplicates": N, "errors": N} """ from bincio.extract.ingest import ingest_parsed from bincio.extract.parsers.factory import parse_file @@ -546,86 +596,106 @@ async def upload_activity( staging = dd / "_uploads" staging.mkdir(exist_ok=True) - # Separate CSV files from activity files - csv_files: list[UploadFile] = [] - activity_files: list[UploadFile] = [] - for f in files: - fname = Path(f.filename or "").name.lower() - if fname.endswith(".csv"): - csv_files.append(f) - else: - activity_files.append(f) + # Read all files into memory now (async), then process synchronously in the generator + csv_bytes_list: list[bytes] = [] + activity_items: list[tuple[str, bytes]] = [] # (original_filename, bytes) - # Build metadata from the first CSV found (activities.csv from Strava export) + for f in files: + fname = Path(f.filename or "").name + raw = await f.read() + if fname.lower().endswith(".csv"): + csv_bytes_list.append(raw) + else: + activity_items.append((fname, raw)) + + # Build metadata from the first CSV metadata = None - if csv_files: + if csv_bytes_list: from bincio.extract.strava_csv import StravaMetadata import tempfile - csv_bytes = await csv_files[0].read() with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: - tmp.write(csv_bytes) + tmp.write(csv_bytes_list[0]) tmp_path = Path(tmp.name) try: metadata = StravaMetadata(tmp_path) finally: tmp_path.unlink(missing_ok=True) - results = [] - any_added = False + total_files = len(activity_items) + job_id = _job_start(user.handle, total_files) if total_files > 0 else None - for file in activity_files: - name = Path(file.filename or "upload.fit").name - suffix = _file_suffix(name) - if suffix not in _SUPPORTED_SUFFIXES: - results.append({"name": name, "ok": False, "error": f"Unsupported file type '{suffix}'"}) - continue + def event_stream(): + added = 0 + duplicates = 0 + errors = 0 + any_added = False - contents = await file.read() - if len(contents) > 50 * 1024 * 1024: - results.append({"name": name, "ok": False, "error": "File too large (max 50 MB)"}) - continue + for n, (name, contents) in enumerate(activity_items, 1): + if job_id: + _job_update(job_id, n - 1, name) - staged = staging / name - staged.write_bytes(contents) - kept = False - try: - activity = parse_file(staged) - - # Enrich with CSV metadata when available (matched by filename) - if metadata is not None: - metadata.enrich(name, activity) - - activity_id = make_activity_id(activity) - if (dd / "activities" / f"{activity_id}.json").exists(): - results.append({"name": name, "ok": False, "error": "duplicate"}) + suffix = _file_suffix(name) + if suffix not in _SUPPORTED_SUFFIXES: + errors += 1 + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'unsupported type'})}\n\n" continue - ingest_parsed(activity, dd, privacy="public") - if store_original: - originals_dir = dd / "originals" - originals_dir.mkdir(exist_ok=True) - staged.rename(originals_dir / name) - kept = True - results.append({"name": name, "ok": True, "id": activity_id}) - any_added = True - except Exception: - results.append({"name": name, "ok": False, "error": "Processing failed"}) - finally: - if not kept: - staged.unlink(missing_ok=True) - # Retroactively update sidecars for existing activities matched by strava_id - csv_updates = 0 - if metadata is not None: - from bincio.extract.strava_csv import apply_csv_to_data_dir - csv_updates = apply_csv_to_data_dir(dd, metadata) + if len(contents) > 50 * 1024 * 1024: + errors += 1 + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'file too large'})}\n\n" + continue - if any_added or csv_updates: - merge_all(dd) - if any_added: - _trigger_rebuild(user.handle) + staged = staging / name + staged.write_bytes(contents) + kept = False + try: + activity = parse_file(staged) + if metadata is not None: + metadata.enrich(name, activity) + activity_id = make_activity_id(activity) + if (dd / "activities" / f"{activity_id}.json").exists(): + duplicates += 1 + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'duplicate'})}\n\n" + continue + ingest_parsed(activity, dd, privacy="public") + if store_original: + originals_dir = dd / "originals" + originals_dir.mkdir(exist_ok=True) + staged.rename(originals_dir / name) + kept = True + added += 1 + any_added = True + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'imported'})}\n\n" + except Exception: + errors += 1 + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error'})}\n\n" + finally: + if not kept: + staged.unlink(missing_ok=True) - added = [r for r in results if r["ok"]] - return JSONResponse({"ok": True, "added": len(added), "csv_updates": csv_updates, "results": results}) + # Retroactively apply CSV metadata to existing activities + csv_updates = 0 + if metadata is not None: + from bincio.extract.strava_csv import apply_csv_to_data_dir + csv_updates = apply_csv_to_data_dir(dd, metadata) + if csv_updates: + yield f"data: {json.dumps({'type': 'csv', 'updates': csv_updates})}\n\n" + + if any_added or csv_updates: + merge_all(dd) + if any_added: + _trigger_rebuild(user.handle) + + yield f"data: {json.dumps({'type': 'done', 'added': added, 'csv_updates': csv_updates, 'duplicates': duplicates, 'errors': errors})}\n\n" + + if job_id: + _job_finish(job_id) + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) @app.post("/api/upload/strava-zip") diff --git a/site/astro.config.mjs b/site/astro.config.mjs index 5622396..6bd511f 100644 --- a/site/astro.config.mjs +++ b/site/astro.config.mjs @@ -25,10 +25,11 @@ export default defineConfig({ // In production nginx handles this — same pattern, no code change needed. server: { proxy: { - // SSE response to a POST — Vite's default proxy buffers the full body before - // forwarding, which breaks streaming and can cause EPIPE on long uploads. + // Both /api/upload and /api/upload/strava-zip return SSE streams in response + // to POST requests. Vite's default proxy buffers the full body before forwarding, + // which breaks streaming and causes EPIPE on long uploads. // selfHandleResponse + manual pipe sends chunks as they arrive. - '/api/upload/strava-zip': { + '/api/upload': { target: serveTarget, changeOrigin: true, selfHandleResponse: true, diff --git a/site/src/layouts/Base.astro b/site/src/layouts/Base.astro index a44c785..cccea1b 100644 --- a/site/src/layouts/Base.astro +++ b/site/src/layouts/Base.astro @@ -190,6 +190,14 @@ try {
{!isPublicPage && ( <> + + +