From 10dd1185b9d86945d7880e1fba62b64cf65159ed Mon Sep 17 00:00:00 2001 From: Davide Scaini Date: Wed, 15 Apr 2026 09:05:29 +0200 Subject: [PATCH] fix reextract: async generator + run_in_executor, imports at endpoint level MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The sync generator was failing with a network error because Starlette's iterate_in_threadpool doesn't properly propagate exceptions from sync generators — the connection resets with no body. Fix: convert event_stream to an async generator (Starlette handles these natively without thread wrapping), move imports to the endpoint function scope so failures raise HTTPException before the stream starts, and run CPU-intensive work (parse + write) via loop.run_in_executor so the async generator can actually yield between activities. --- bincio/serve/server.py | 90 +++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/bincio/serve/server.py b/bincio/serve/server.py index 551021d..3bc665c 100644 --- a/bincio/serve/server.py +++ b/bincio/serve/server.py @@ -630,12 +630,9 @@ async def admin_reextract_originals( calls strava_to_parsed, and writes the activity JSON + GeoJSON. Skips activities that already have a valid JSON file in activities/. - Streams SSE progress; call /rebuild after completion if no --webroot. - - SSE events: - {"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"skipped"|"error", "detail": "..."} - {"type": "done", "imported": N, "skipped": N, "errors": N} + Streams SSE progress. Calls merge_all + rebuild on completion. """ + import asyncio _require_admin(bincio_session) user_dir = _get_data_dir() / handle originals_dir = user_dir / "originals" / "strava" @@ -646,34 +643,29 @@ async def admin_reextract_originals( total = len(original_files) log.info("reextract[%s]: starting, %d originals found", handle, total) - def event_stream(): - from bincio.extract.strava_api import strava_to_parsed - from bincio.extract.metrics import compute as compute_metrics - from bincio.extract.writer import ( - build_summary, make_activity_id, write_activity, - write_index, write_athlete_json, - ) - from bincio.render.merge import merge_all + # Imports at endpoint level so failures are visible before the stream starts + from bincio.extract.strava_api import strava_to_parsed + from bincio.extract.metrics import compute as compute_metrics + from bincio.extract.writer import ( + build_summary, make_activity_id, write_activity, + write_index, write_athlete_json, + ) + from bincio.render.merge import merge_all - # Immediate heartbeat so the client knows the connection is alive + async def event_stream(): yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n" imported = 0 skipped = 0 errors = 0 - # Load existing index once upfront (to preserve any non-Strava activities) index_path = user_dir / "index.json" - if index_path.exists(): - try: - existing_index = json.loads(index_path.read_text(encoding="utf-8")) - except Exception: - existing_index = {} - else: + try: + existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {} + except Exception: existing_index = {} owner = existing_index.get("owner", {"handle": handle}) - # Seed summaries from whatever already exists in activities/ summaries: dict[str, Any] = {s["id"]: s for s in existing_index.get("activities", [])} _COMPUTED = {"bas_version", "generated_at", "power_curve", "records", "best_climbs"} @@ -686,42 +678,52 @@ async def admin_reextract_originals( except Exception: pass + loop = asyncio.get_event_loop() + for n, orig_path in enumerate(original_files, 1): try: - raw = json.loads(orig_path.read_text(encoding="utf-8")) - meta = raw.get("meta", {}) - streams = raw.get("streams", {}) - name = meta.get("name", orig_path.stem) + # Run CPU-intensive parsing + file writing in a thread + def _process_one(op=orig_path): + raw = json.loads(op.read_text(encoding="utf-8")) + meta = raw.get("meta", {}) + streams = raw.get("streams", {}) + name = meta.get("name", op.stem) + parsed = strava_to_parsed(meta, streams) + activity_id = make_activity_id(parsed) + if (user_dir / "activities" / f"{activity_id}.json").exists(): + return "skipped", name, activity_id, None + metrics = compute_metrics(parsed) + effective_privacy = parsed.privacy if parsed.privacy is not None else "public" + write_activity(parsed, metrics, user_dir, privacy=effective_privacy, rdp_epsilon=0.0001) + summary = build_summary(parsed, metrics, activity_id, effective_privacy) + return "imported", name, activity_id, summary - parsed = strava_to_parsed(meta, streams) - activity_id = make_activity_id(parsed) + status, name, activity_id, summary = await loop.run_in_executor(None, _process_one) - if (user_dir / "activities" / f"{activity_id}.json").exists(): + if status == "skipped": skipped += 1 - yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'skipped'})}\n\n" - continue + else: + summaries[activity_id] = summary + imported += 1 - metrics = compute_metrics(parsed) - effective_privacy = parsed.privacy if parsed.privacy is not None else "public" - write_activity(parsed, metrics, user_dir, privacy=effective_privacy, rdp_epsilon=0.0001) - summary = build_summary(parsed, metrics, activity_id, effective_privacy) - summaries[activity_id] = summary + yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': status})}\n\n" - imported += 1 - yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'imported'})}\n\n" except Exception as exc: errors += 1 log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True) yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n" - # Write index and athlete.json once at the end (not once per activity) if imported > 0: yield f"data: {json.dumps({'type': 'status', 'message': 'Writing index and athlete data…'})}\n\n" - write_index(list(summaries.values()), user_dir, owner) - write_athlete_json(list(summaries.values()), user_dir, athlete_config) - yield f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n" - merge_all(user_dir) - _trigger_rebuild(handle) + try: + await loop.run_in_executor(None, lambda: write_index(list(summaries.values()), user_dir, owner)) + await loop.run_in_executor(None, lambda: write_athlete_json(list(summaries.values()), user_dir, athlete_config)) + yield f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n" + await loop.run_in_executor(None, lambda: merge_all(user_dir)) + _trigger_rebuild(handle) + except Exception as exc: + log.exception("reextract[%s]: error in final write/merge", handle) + yield f"data: {json.dumps({'type': 'error', 'message': f'Final write failed: {exc}'})}\n\n" log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", handle, imported, skipped, errors) yield f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n"