For users uploading:

- POST /api/upload now returns text/event-stream instead of JSON
  - Per-file progress events stream back as each file is processed: ↓ 3/47 (6%) — morning_ride.fit
  - Final done event shows the summary: "12 added, 35 duplicates"
  - The Vite proxy is configured to stream this properly (no buffering)

  For the admin:
  - New GET /api/admin/jobs endpoint (admin-only) returns the list of active upload jobs, each with
  user, started_at, total, done, current (filename being processed)
  - A pulsing amber badge appears in the nav bar for admins when any user has an active upload running
   — it shows e.g. "2 uploads running" with a tooltip listing each user's progress (@alice: 12/50
  files)
  - Polls every 5 seconds, disappears automatically when all jobs finish
This commit is contained in:
Davide Scaini
2026-04-11 08:33:21 +02:00
parent 01db4eb9ae
commit 82830222ba
4 changed files with 302 additions and 153 deletions
+69 -63
View File
@@ -540,11 +540,11 @@ def _file_suffix(name: str) -> str:
async def upload_activity( async def upload_activity(
files: list[UploadFile] = File(...), files: list[UploadFile] = File(...),
store_original: bool = Form(False), store_original: bool = Form(False),
) -> JSONResponse: ) -> StreamingResponse:
"""Accept FIT/GPX/TCX files and/or activities.csv, extract, update index, re-merge. """Accept FIT/GPX/TCX files and/or activities.csv; stream SSE progress while processing.
activities.csv (Strava export format) can be included in the batch to: activities.csv (Strava export format) can be included in the batch to:
- Enrich activity files being uploaded in the same batch (matched by filename) - Enrich activity files in the same batch (matched by filename)
- Retroactively update sidecars for existing activities (matched by strava_id) - Retroactively update sidecars for existing activities (matched by strava_id)
""" """
from bincio.extract.ingest import ingest_parsed from bincio.extract.ingest import ingest_parsed
@@ -556,90 +556,96 @@ async def upload_activity(
staging = dd / "_uploads" staging = dd / "_uploads"
staging.mkdir(exist_ok=True) staging.mkdir(exist_ok=True)
_MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB # Read all files into memory now (async), then process synchronously in the generator
csv_bytes_list: list[bytes] = []
activity_items: list[tuple[str, bytes]] = []
# Separate CSV files from activity files
csv_files: list[UploadFile] = []
activity_files: list[UploadFile] = []
for f in files: for f in files:
name = Path(f.filename or "").name.lower() fname = Path(f.filename or "").name
if name.endswith(".csv"): raw = await f.read()
csv_files.append(f) if fname.lower().endswith(".csv"):
csv_bytes_list.append(raw)
else: else:
activity_files.append(f) activity_items.append((fname, raw))
# Build metadata from the first CSV found (activities.csv from Strava export) # Build metadata from the first CSV found (activities.csv from Strava export)
metadata = None metadata = None
if csv_files: if csv_bytes_list:
from bincio.extract.strava_csv import StravaMetadata from bincio.extract.strava_csv import StravaMetadata
import tempfile import tempfile
csv_upload = csv_files[0]
csv_bytes = await csv_upload.read()
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
tmp.write(csv_bytes) tmp.write(csv_bytes_list[0])
tmp_path = Path(tmp.name) tmp_path = Path(tmp.name)
try: try:
metadata = StravaMetadata(tmp_path) metadata = StravaMetadata(tmp_path)
finally: finally:
tmp_path.unlink(missing_ok=True) tmp_path.unlink(missing_ok=True)
results = [] total_files = len(activity_items)
any_added = False
for file in activity_files: def event_stream():
name = Path(file.filename or "upload.fit").name added = 0
suffix = _file_suffix(name) duplicates = 0
if suffix not in _SUPPORTED_SUFFIXES: errors = 0
results.append({"name": name, "ok": False, "error": f"Unsupported file type '{Path(name).suffix}'"}) any_added = False
continue
contents = await file.read() for n, (name, contents) in enumerate(activity_items, 1):
if len(contents) > _MAX_UPLOAD_BYTES: suffix = _file_suffix(name)
results.append({"name": name, "ok": False, "error": "File too large (max 50 MB)"}) if suffix not in _SUPPORTED_SUFFIXES:
continue errors += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'unsupported type'})}\n\n"
staged = staging / name
staged.write_bytes(contents)
kept = False
try:
activity = parse_file(staged)
# Enrich with CSV metadata when available (matched by filename)
if metadata is not None:
metadata.enrich(name, activity)
activity_id = make_activity_id(activity)
if (dd / "activities" / f"{activity_id}.json").exists():
results.append({"name": name, "ok": False, "error": "duplicate"})
continue continue
ingest_parsed(activity, dd, privacy="public") if len(contents) > _MAX_UPLOAD_BYTES:
errors += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'file too large'})}\n\n"
continue
if store_original: staged = staging / name
originals_dir = dd / "originals" staged.write_bytes(contents)
originals_dir.mkdir(exist_ok=True) kept = False
staged.rename(originals_dir / name) try:
kept = True activity = parse_file(staged)
if metadata is not None:
metadata.enrich(name, activity)
activity_id = make_activity_id(activity)
if (dd / "activities" / f"{activity_id}.json").exists():
duplicates += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'duplicate'})}\n\n"
continue
ingest_parsed(activity, dd, privacy="public")
if store_original:
originals_dir = dd / "originals"
originals_dir.mkdir(exist_ok=True)
staged.rename(originals_dir / name)
kept = True
added += 1
any_added = True
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'imported'})}\n\n"
except Exception:
errors += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error'})}\n\n"
finally:
if not kept:
staged.unlink(missing_ok=True)
results.append({"name": name, "ok": True, "id": activity_id}) csv_updates = 0
any_added = True if metadata is not None:
except Exception: from bincio.extract.strava_csv import apply_csv_to_data_dir
results.append({"name": name, "ok": False, "error": "Processing failed"}) csv_updates = apply_csv_to_data_dir(dd, metadata)
finally: if csv_updates:
if not kept: yield f"data: {json.dumps({'type': 'csv', 'updates': csv_updates})}\n\n"
staged.unlink(missing_ok=True)
# Retroactively update sidecars for existing activities matched by strava_id if any_added or csv_updates:
csv_updates = 0 merge_all(dd)
if metadata is not None:
from bincio.extract.strava_csv import apply_csv_to_data_dir
csv_updates = apply_csv_to_data_dir(dd, metadata)
if any_added or csv_updates: yield f"data: {json.dumps({'type': 'done', 'added': added, 'csv_updates': csv_updates, 'duplicates': duplicates, 'errors': errors})}\n\n"
merge_all(dd)
added = [r for r in results if r["ok"]] return StreamingResponse(
return JSONResponse({"ok": True, "added": len(added), "csv_updates": csv_updates, "results": results}) event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.post("/api/import-bas") @app.post("/api/import-bas")
+135 -65
View File
@@ -13,7 +13,9 @@ import re
import secrets import secrets
import shutil import shutil
import subprocess import subprocess
import threading
import time import time
import uuid
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Optional
@@ -42,6 +44,40 @@ from bincio.serve.db import (
use_invite, use_invite,
) )
# ── Active job tracker ───────────────────────────────────────────────────────
# Tracks in-progress upload/processing jobs so admins can see what's running.
# Jobs are added when a streaming upload starts and removed when it finishes.
_jobs_lock = threading.Lock()
_active_jobs: dict[str, dict] = {}
def _job_start(user_handle: str, total_files: int) -> str:
job_id = uuid.uuid4().hex[:8]
with _jobs_lock:
_active_jobs[job_id] = {
"id": job_id,
"user": user_handle,
"started_at": int(time.time()),
"total": total_files,
"done": 0,
"current": "",
}
return job_id
def _job_update(job_id: str, done: int, current: str) -> None:
with _jobs_lock:
if job_id in _active_jobs:
_active_jobs[job_id]["done"] = done
_active_jobs[job_id]["current"] = current
def _job_finish(job_id: str) -> None:
with _jobs_lock:
_active_jobs.pop(job_id, None)
# ── Globals (set by CLI before uvicorn starts) ──────────────────────────────── # ── Globals (set by CLI before uvicorn starts) ────────────────────────────────
data_dir: Path | None = None data_dir: Path | None = None
@@ -346,6 +382,15 @@ async def admin_users(bincio_session: Optional[str] = Cookie(default=None)) -> J
} for u in users]) } for u in users])
@app.get("/api/admin/jobs")
async def admin_jobs(bincio_session: Optional[str] = Cookie(default=None)) -> JSONResponse:
"""Return currently active upload/processing jobs. Admin only."""
_require_admin(bincio_session)
with _jobs_lock:
jobs = list(_active_jobs.values())
return JSONResponse(jobs)
# ── Write API (ported from bincio edit, auth-gated) ─────────────────────────── # ── Write API (ported from bincio edit, auth-gated) ───────────────────────────
def _user_data_dir(handle: str) -> Path: def _user_data_dir(handle: str) -> Path:
@@ -529,12 +574,17 @@ async def upload_activity(
files: list[UploadFile] = File(...), files: list[UploadFile] = File(...),
store_original: bool = Form(False), store_original: bool = Form(False),
bincio_session: Optional[str] = Cookie(default=None), bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse: ) -> StreamingResponse:
"""Accept FIT/GPX/TCX files and/or activities.csv, extract, update index, re-merge. """Accept FIT/GPX/TCX files and/or activities.csv; stream SSE progress while processing.
activities.csv (Strava export format) can be included in the batch to: activities.csv (Strava export format) can be included in the batch to:
- Enrich activity files being uploaded in the same batch (matched by filename) - Enrich activity files in the same batch (matched by filename)
- Retroactively update sidecars for existing activities (matched by strava_id) - Retroactively update sidecars for existing activities (matched by strava_id)
SSE events:
{"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"duplicate"|"error"}
{"type": "csv", "updates": N} -- only when CSV was included
{"type": "done", "added": N, "csv_updates": N, "duplicates": N, "errors": N}
""" """
from bincio.extract.ingest import ingest_parsed from bincio.extract.ingest import ingest_parsed
from bincio.extract.parsers.factory import parse_file from bincio.extract.parsers.factory import parse_file
@@ -546,86 +596,106 @@ async def upload_activity(
staging = dd / "_uploads" staging = dd / "_uploads"
staging.mkdir(exist_ok=True) staging.mkdir(exist_ok=True)
# Separate CSV files from activity files # Read all files into memory now (async), then process synchronously in the generator
csv_files: list[UploadFile] = [] csv_bytes_list: list[bytes] = []
activity_files: list[UploadFile] = [] activity_items: list[tuple[str, bytes]] = [] # (original_filename, bytes)
for f in files:
fname = Path(f.filename or "").name.lower()
if fname.endswith(".csv"):
csv_files.append(f)
else:
activity_files.append(f)
# Build metadata from the first CSV found (activities.csv from Strava export) for f in files:
fname = Path(f.filename or "").name
raw = await f.read()
if fname.lower().endswith(".csv"):
csv_bytes_list.append(raw)
else:
activity_items.append((fname, raw))
# Build metadata from the first CSV
metadata = None metadata = None
if csv_files: if csv_bytes_list:
from bincio.extract.strava_csv import StravaMetadata from bincio.extract.strava_csv import StravaMetadata
import tempfile import tempfile
csv_bytes = await csv_files[0].read()
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
tmp.write(csv_bytes) tmp.write(csv_bytes_list[0])
tmp_path = Path(tmp.name) tmp_path = Path(tmp.name)
try: try:
metadata = StravaMetadata(tmp_path) metadata = StravaMetadata(tmp_path)
finally: finally:
tmp_path.unlink(missing_ok=True) tmp_path.unlink(missing_ok=True)
results = [] total_files = len(activity_items)
any_added = False job_id = _job_start(user.handle, total_files) if total_files > 0 else None
for file in activity_files: def event_stream():
name = Path(file.filename or "upload.fit").name added = 0
suffix = _file_suffix(name) duplicates = 0
if suffix not in _SUPPORTED_SUFFIXES: errors = 0
results.append({"name": name, "ok": False, "error": f"Unsupported file type '{suffix}'"}) any_added = False
continue
contents = await file.read() for n, (name, contents) in enumerate(activity_items, 1):
if len(contents) > 50 * 1024 * 1024: if job_id:
results.append({"name": name, "ok": False, "error": "File too large (max 50 MB)"}) _job_update(job_id, n - 1, name)
continue
staged = staging / name suffix = _file_suffix(name)
staged.write_bytes(contents) if suffix not in _SUPPORTED_SUFFIXES:
kept = False errors += 1
try: yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'unsupported type'})}\n\n"
activity = parse_file(staged)
# Enrich with CSV metadata when available (matched by filename)
if metadata is not None:
metadata.enrich(name, activity)
activity_id = make_activity_id(activity)
if (dd / "activities" / f"{activity_id}.json").exists():
results.append({"name": name, "ok": False, "error": "duplicate"})
continue continue
ingest_parsed(activity, dd, privacy="public")
if store_original:
originals_dir = dd / "originals"
originals_dir.mkdir(exist_ok=True)
staged.rename(originals_dir / name)
kept = True
results.append({"name": name, "ok": True, "id": activity_id})
any_added = True
except Exception:
results.append({"name": name, "ok": False, "error": "Processing failed"})
finally:
if not kept:
staged.unlink(missing_ok=True)
# Retroactively update sidecars for existing activities matched by strava_id if len(contents) > 50 * 1024 * 1024:
csv_updates = 0 errors += 1
if metadata is not None: yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error', 'detail': 'file too large'})}\n\n"
from bincio.extract.strava_csv import apply_csv_to_data_dir continue
csv_updates = apply_csv_to_data_dir(dd, metadata)
if any_added or csv_updates: staged = staging / name
merge_all(dd) staged.write_bytes(contents)
if any_added: kept = False
_trigger_rebuild(user.handle) try:
activity = parse_file(staged)
if metadata is not None:
metadata.enrich(name, activity)
activity_id = make_activity_id(activity)
if (dd / "activities" / f"{activity_id}.json").exists():
duplicates += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'duplicate'})}\n\n"
continue
ingest_parsed(activity, dd, privacy="public")
if store_original:
originals_dir = dd / "originals"
originals_dir.mkdir(exist_ok=True)
staged.rename(originals_dir / name)
kept = True
added += 1
any_added = True
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'imported'})}\n\n"
except Exception:
errors += 1
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total_files, 'name': name, 'status': 'error'})}\n\n"
finally:
if not kept:
staged.unlink(missing_ok=True)
added = [r for r in results if r["ok"]] # Retroactively apply CSV metadata to existing activities
return JSONResponse({"ok": True, "added": len(added), "csv_updates": csv_updates, "results": results}) csv_updates = 0
if metadata is not None:
from bincio.extract.strava_csv import apply_csv_to_data_dir
csv_updates = apply_csv_to_data_dir(dd, metadata)
if csv_updates:
yield f"data: {json.dumps({'type': 'csv', 'updates': csv_updates})}\n\n"
if any_added or csv_updates:
merge_all(dd)
if any_added:
_trigger_rebuild(user.handle)
yield f"data: {json.dumps({'type': 'done', 'added': added, 'csv_updates': csv_updates, 'duplicates': duplicates, 'errors': errors})}\n\n"
if job_id:
_job_finish(job_id)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.post("/api/upload/strava-zip") @app.post("/api/upload/strava-zip")
+4 -3
View File
@@ -25,10 +25,11 @@ export default defineConfig({
// In production nginx handles this — same pattern, no code change needed. // In production nginx handles this — same pattern, no code change needed.
server: { server: {
proxy: { proxy: {
// SSE response to a POST — Vite's default proxy buffers the full body before // Both /api/upload and /api/upload/strava-zip return SSE streams in response
// forwarding, which breaks streaming and can cause EPIPE on long uploads. // to POST requests. Vite's default proxy buffers the full body before forwarding,
// which breaks streaming and causes EPIPE on long uploads.
// selfHandleResponse + manual pipe sends chunks as they arrive. // selfHandleResponse + manual pipe sends chunks as they arrive.
'/api/upload/strava-zip': { '/api/upload': {
target: serveTarget, target: serveTarget,
changeOrigin: true, changeOrigin: true,
selfHandleResponse: true, selfHandleResponse: true,
+94 -22
View File
@@ -190,6 +190,14 @@ try {
<div class="ml-auto shrink-0 flex items-center gap-1"> <div class="ml-auto shrink-0 flex items-center gap-1">
{!isPublicPage && ( {!isPublicPage && (
<> <>
<!-- Admin: active upload jobs badge (hidden until jobs exist) -->
<span
id="admin-jobs-badge"
style="display:none"
title=""
class="text-xs px-2 py-0.5 rounded-full bg-amber-900/60 text-amber-300 border border-amber-700/50 animate-pulse cursor-default"
></span>
<!-- Logout button — hidden until logged in --> <!-- Logout button — hidden until logged in -->
<button <button
id="nav-logout" id="nav-logout"
@@ -423,6 +431,31 @@ try {
// Pre-populate the "keep original" checkbox from the instance default // Pre-populate the "keep original" checkbox from the instance default
const chk = document.getElementById('upload-keep-original'); const chk = document.getElementById('upload-keep-original');
if (chk && user.store_originals_default) chk.checked = true; if (chk && user.store_originals_default) chk.checked = true;
// Admin: poll for active jobs and show a badge in the nav
if (user.is_admin) {
const badge = document.getElementById('admin-jobs-badge');
async function pollJobs() {
try {
const jr = await fetch('/api/admin/jobs', { credentials: 'include' });
if (!jr.ok) return;
const jobs = await jr.json();
if (!badge) return;
if (jobs.length === 0) {
badge.style.display = 'none';
} else {
const summary = jobs.map(j =>
`@${j.user}: ${j.done}/${j.total} files`
).join(' · ');
badge.title = summary;
badge.textContent = `${jobs.length} upload${jobs.length > 1 ? 's' : ''} running`;
badge.style.display = '';
}
} catch (_) {}
}
pollJobs();
setInterval(pollJobs, 5000);
}
} catch (_) {} } catch (_) {}
})(); })();
@@ -510,38 +543,77 @@ try {
}); });
input.addEventListener('change', () => { if (input.files?.length) doUpload(input.files); }); input.addEventListener('change', () => { if (input.files?.length) doUpload(input.files); });
async function doUpload(files) { function doUpload(files) {
const n = files.length; const n = files.length;
label.textContent = n === 1 ? files[0].name : `${n} files selected`; label.textContent = n === 1 ? files[0].name : `${n} files selected`;
fileStatus.textContent = `Uploading ${n} file${n > 1 ? 's' : ''}…`; fileStatus.textContent = `Uploading…`;
fileStatus.style.color = 'var(--text-4)'; fileStatus.style.color = 'var(--text-4)';
drop.style.pointerEvents = 'none'; drop.style.pointerEvents = 'none';
const fd = new FormData(); const fd = new FormData();
for (const f of files) fd.append('files', f); for (const f of files) fd.append('files', f);
fd.append('store_original', keepOriginalChk?.checked ? 'true' : 'false'); fd.append('store_original', keepOriginalChk?.checked ? 'true' : 'false');
try {
const r = await fetch(`${editUrl}/api/upload`, { method: 'POST', credentials: 'include', body: fd }); const xhr = new XMLHttpRequest();
if (!r.ok) throw new Error(await r.text()); xhr.open('POST', `${editUrl}/api/upload`);
const d = await r.json(); xhr.withCredentials = true;
const dupes = d.results.filter(r => r.error === 'duplicate').length; xhr.setRequestHeader('Accept', 'text/event-stream');
const errors = d.results.filter(r => !r.ok && r.error !== 'duplicate').length;
const parts = []; let buf = '';
if (d.added > 0) parts.push(`${d.added} added`); let added = 0, dupes = 0, errors = 0, csvUpdates = 0;
if (d.csv_updates > 0) parts.push(`${d.csv_updates} updated from CSV`);
if (dupes) parts.push(`${dupes} duplicate${dupes > 1 ? 's' : ''}`); xhr.onprogress = () => {
if (errors) parts.push(`${errors} failed`); const newText = xhr.responseText.slice(buf.length);
if (parts.length === 0) parts.push('nothing to add'); buf = xhr.responseText;
fileStatus.textContent = parts.join(', '); for (const line of newText.split('\n')) {
const anyGood = d.added > 0 || d.csv_updates > 0; if (!line.startsWith('data: ')) continue;
fileStatus.style.color = anyGood ? '#4ade80' : '#a1a1aa'; try {
if (anyGood) setTimeout(() => { window.location.reload(); }, 1200); const ev = JSON.parse(line.slice(6));
else drop.style.pointerEvents = ''; if (ev.type === 'progress') {
} catch (e) { const pct = Math.round((ev.n / ev.total) * 100);
fileStatus.textContent = 'Error: ' + e.message; const icon = ev.status === 'imported' ? '↓' : ev.status === 'duplicate' ? '·' : '✗';
fileStatus.textContent = `${icon} ${ev.n}/${ev.total} (${pct}%) — ${ev.name}`;
if (ev.status === 'imported') added++;
else if (ev.status === 'duplicate') dupes++;
else errors++;
} else if (ev.type === 'csv') {
csvUpdates = ev.updates;
} else if (ev.type === 'done') {
added = ev.added; dupes = ev.duplicates; errors = ev.errors; csvUpdates = ev.csv_updates;
const parts = [];
if (added > 0) parts.push(`${added} added`);
if (csvUpdates > 0) parts.push(`${csvUpdates} updated from CSV`);
if (dupes) parts.push(`${dupes} duplicate${dupes > 1 ? 's' : ''}`);
if (errors) parts.push(`${errors} failed`);
if (parts.length === 0) parts.push('nothing to add');
fileStatus.textContent = parts.join(', ');
const anyGood = added > 0 || csvUpdates > 0;
fileStatus.style.color = anyGood ? '#4ade80' : '#a1a1aa';
if (anyGood) setTimeout(() => window.location.reload(), 1200);
else drop.style.pointerEvents = '';
input.value = '';
}
} catch (_) {}
}
};
xhr.onload = () => {
if (xhr.status !== 200) {
fileStatus.textContent = `Upload failed (${xhr.status}).`;
fileStatus.style.color = '#f87171';
drop.style.pointerEvents = '';
input.value = '';
}
};
xhr.onerror = () => {
fileStatus.textContent = 'Upload failed — check your connection.';
fileStatus.style.color = '#f87171'; fileStatus.style.color = '#f87171';
drop.style.pointerEvents = ''; drop.style.pointerEvents = '';
input.value = ''; input.value = '';
} };
xhr.send(fd);
} }
// ── Strava ──────────────────────────────────────────────────────────── // ── Strava ────────────────────────────────────────────────────────────