diff --git a/bincio/serve/server.py b/bincio/serve/server.py index 3bc665c..4310f00 100644 --- a/bincio/serve/server.py +++ b/bincio/serve/server.py @@ -643,28 +643,32 @@ async def admin_reextract_originals( total = len(original_files) log.info("reextract[%s]: starting, %d originals found", handle, total) - # Imports at endpoint level so failures are visible before the stream starts - from bincio.extract.strava_api import strava_to_parsed - from bincio.extract.metrics import compute as compute_metrics - from bincio.extract.writer import ( - build_summary, make_activity_id, write_activity, - write_index, write_athlete_json, - ) - from bincio.render.merge import merge_all + loop = asyncio.get_event_loop() + # Queue carries SSE event strings; None is the sentinel for "done" + q: asyncio.Queue[str | None] = asyncio.Queue() - async def event_stream(): - yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n" - - imported = 0 - skipped = 0 - errors = 0 + def _run_extraction() -> None: + """Runs entirely in a thread pool worker. Puts SSE strings into q.""" + try: + from bincio.extract.strava_api import strava_to_parsed + from bincio.extract.metrics import compute as compute_metrics + from bincio.extract.writer import ( + build_summary, make_activity_id, write_activity, + write_index, write_athlete_json, + ) + from bincio.render.merge import merge_all + except Exception as exc: + log.exception("reextract[%s]: import error", handle) + loop.call_soon_threadsafe(q.put_nowait, + f"data: {json.dumps({'type': 'error', 'message': f'Import error: {exc}'})}\n\n") + loop.call_soon_threadsafe(q.put_nowait, None) + return index_path = user_dir / "index.json" try: existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {} except Exception: existing_index = {} - owner = existing_index.get("owner", {"handle": handle}) summaries: dict[str, Any] = {s["id"]: s for s in existing_index.get("activities", [])} @@ -678,55 +682,67 @@ async def admin_reextract_originals( except Exception: pass - loop = asyncio.get_event_loop() + imported = skipped = errors = 0 for n, orig_path in enumerate(original_files, 1): try: - # Run CPU-intensive parsing + file writing in a thread - def _process_one(op=orig_path): - raw = json.loads(op.read_text(encoding="utf-8")) - meta = raw.get("meta", {}) - streams = raw.get("streams", {}) - name = meta.get("name", op.stem) - parsed = strava_to_parsed(meta, streams) - activity_id = make_activity_id(parsed) - if (user_dir / "activities" / f"{activity_id}.json").exists(): - return "skipped", name, activity_id, None - metrics = compute_metrics(parsed) - effective_privacy = parsed.privacy if parsed.privacy is not None else "public" - write_activity(parsed, metrics, user_dir, privacy=effective_privacy, rdp_epsilon=0.0001) - summary = build_summary(parsed, metrics, activity_id, effective_privacy) - return "imported", name, activity_id, summary + raw = json.loads(orig_path.read_text(encoding="utf-8")) + meta = raw.get("meta", {}) + streams = raw.get("streams", {}) + name = meta.get("name", orig_path.stem) + parsed = strava_to_parsed(meta, streams) + activity_id = make_activity_id(parsed) - status, name, activity_id, summary = await loop.run_in_executor(None, _process_one) - - if status == "skipped": + if (user_dir / "activities" / f"{activity_id}.json").exists(): skipped += 1 + status = "skipped" else: - summaries[activity_id] = summary + metrics = compute_metrics(parsed) + ep = parsed.privacy if parsed.privacy is not None else "public" + write_activity(parsed, metrics, user_dir, privacy=ep, rdp_epsilon=0.0001) + summaries[activity_id] = build_summary(parsed, metrics, activity_id, ep) imported += 1 + status = "imported" - yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': status})}\n\n" - + loop.call_soon_threadsafe(q.put_nowait, + f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': status})}\n\n") except Exception as exc: errors += 1 log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True) - yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n" + loop.call_soon_threadsafe(q.put_nowait, + f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n") if imported > 0: - yield f"data: {json.dumps({'type': 'status', 'message': 'Writing index and athlete data…'})}\n\n" + loop.call_soon_threadsafe(q.put_nowait, + f"data: {json.dumps({'type': 'status', 'message': 'Writing index and athlete data…'})}\n\n") try: - await loop.run_in_executor(None, lambda: write_index(list(summaries.values()), user_dir, owner)) - await loop.run_in_executor(None, lambda: write_athlete_json(list(summaries.values()), user_dir, athlete_config)) - yield f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n" - await loop.run_in_executor(None, lambda: merge_all(user_dir)) + write_index(list(summaries.values()), user_dir, owner) + write_athlete_json(list(summaries.values()), user_dir, athlete_config) + loop.call_soon_threadsafe(q.put_nowait, + f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n") + merge_all(user_dir) _trigger_rebuild(handle) except Exception as exc: log.exception("reextract[%s]: error in final write/merge", handle) - yield f"data: {json.dumps({'type': 'error', 'message': f'Final write failed: {exc}'})}\n\n" + loop.call_soon_threadsafe(q.put_nowait, + f"data: {json.dumps({'type': 'error', 'message': f'Final write failed: {exc}'})}\n\n") - log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", handle, imported, skipped, errors) - yield f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n" + log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", + handle, imported, skipped, errors) + loop.call_soon_threadsafe(q.put_nowait, + f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n") + loop.call_soon_threadsafe(q.put_nowait, None) # sentinel + + async def event_stream(): + yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n" + # Kick off the extraction thread + loop.run_in_executor(None, _run_extraction) + # Drain the queue until sentinel + while True: + chunk = await q.get() + if chunk is None: + break + yield chunk return StreamingResponse( event_stream(),