fix reextract: async generator + run_in_executor, imports at endpoint level
The sync generator was failing with a network error because Starlette's iterate_in_threadpool doesn't properly propagate exceptions from sync generators — the connection resets with no body. Fix: convert event_stream to an async generator (Starlette handles these natively without thread wrapping), move imports to the endpoint function scope so failures raise HTTPException before the stream starts, and run CPU-intensive work (parse + write) via loop.run_in_executor so the async generator can actually yield between activities.
This commit is contained in:
+46
-44
@@ -630,12 +630,9 @@ async def admin_reextract_originals(
|
||||
calls strava_to_parsed, and writes the activity JSON + GeoJSON.
|
||||
|
||||
Skips activities that already have a valid JSON file in activities/.
|
||||
Streams SSE progress; call /rebuild after completion if no --webroot.
|
||||
|
||||
SSE events:
|
||||
{"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"skipped"|"error", "detail": "..."}
|
||||
{"type": "done", "imported": N, "skipped": N, "errors": N}
|
||||
Streams SSE progress. Calls merge_all + rebuild on completion.
|
||||
"""
|
||||
import asyncio
|
||||
_require_admin(bincio_session)
|
||||
user_dir = _get_data_dir() / handle
|
||||
originals_dir = user_dir / "originals" / "strava"
|
||||
@@ -646,34 +643,29 @@ async def admin_reextract_originals(
|
||||
total = len(original_files)
|
||||
log.info("reextract[%s]: starting, %d originals found", handle, total)
|
||||
|
||||
def event_stream():
|
||||
from bincio.extract.strava_api import strava_to_parsed
|
||||
from bincio.extract.metrics import compute as compute_metrics
|
||||
from bincio.extract.writer import (
|
||||
build_summary, make_activity_id, write_activity,
|
||||
write_index, write_athlete_json,
|
||||
)
|
||||
from bincio.render.merge import merge_all
|
||||
# Imports at endpoint level so failures are visible before the stream starts
|
||||
from bincio.extract.strava_api import strava_to_parsed
|
||||
from bincio.extract.metrics import compute as compute_metrics
|
||||
from bincio.extract.writer import (
|
||||
build_summary, make_activity_id, write_activity,
|
||||
write_index, write_athlete_json,
|
||||
)
|
||||
from bincio.render.merge import merge_all
|
||||
|
||||
# Immediate heartbeat so the client knows the connection is alive
|
||||
async def event_stream():
|
||||
yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n"
|
||||
|
||||
imported = 0
|
||||
skipped = 0
|
||||
errors = 0
|
||||
|
||||
# Load existing index once upfront (to preserve any non-Strava activities)
|
||||
index_path = user_dir / "index.json"
|
||||
if index_path.exists():
|
||||
try:
|
||||
existing_index = json.loads(index_path.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
existing_index = {}
|
||||
else:
|
||||
try:
|
||||
existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {}
|
||||
except Exception:
|
||||
existing_index = {}
|
||||
|
||||
owner = existing_index.get("owner", {"handle": handle})
|
||||
# Seed summaries from whatever already exists in activities/
|
||||
summaries: dict[str, Any] = {s["id"]: s for s in existing_index.get("activities", [])}
|
||||
|
||||
_COMPUTED = {"bas_version", "generated_at", "power_curve", "records", "best_climbs"}
|
||||
@@ -686,42 +678,52 @@ async def admin_reextract_originals(
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
for n, orig_path in enumerate(original_files, 1):
|
||||
try:
|
||||
raw = json.loads(orig_path.read_text(encoding="utf-8"))
|
||||
meta = raw.get("meta", {})
|
||||
streams = raw.get("streams", {})
|
||||
name = meta.get("name", orig_path.stem)
|
||||
# Run CPU-intensive parsing + file writing in a thread
|
||||
def _process_one(op=orig_path):
|
||||
raw = json.loads(op.read_text(encoding="utf-8"))
|
||||
meta = raw.get("meta", {})
|
||||
streams = raw.get("streams", {})
|
||||
name = meta.get("name", op.stem)
|
||||
parsed = strava_to_parsed(meta, streams)
|
||||
activity_id = make_activity_id(parsed)
|
||||
if (user_dir / "activities" / f"{activity_id}.json").exists():
|
||||
return "skipped", name, activity_id, None
|
||||
metrics = compute_metrics(parsed)
|
||||
effective_privacy = parsed.privacy if parsed.privacy is not None else "public"
|
||||
write_activity(parsed, metrics, user_dir, privacy=effective_privacy, rdp_epsilon=0.0001)
|
||||
summary = build_summary(parsed, metrics, activity_id, effective_privacy)
|
||||
return "imported", name, activity_id, summary
|
||||
|
||||
parsed = strava_to_parsed(meta, streams)
|
||||
activity_id = make_activity_id(parsed)
|
||||
status, name, activity_id, summary = await loop.run_in_executor(None, _process_one)
|
||||
|
||||
if (user_dir / "activities" / f"{activity_id}.json").exists():
|
||||
if status == "skipped":
|
||||
skipped += 1
|
||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'skipped'})}\n\n"
|
||||
continue
|
||||
else:
|
||||
summaries[activity_id] = summary
|
||||
imported += 1
|
||||
|
||||
metrics = compute_metrics(parsed)
|
||||
effective_privacy = parsed.privacy if parsed.privacy is not None else "public"
|
||||
write_activity(parsed, metrics, user_dir, privacy=effective_privacy, rdp_epsilon=0.0001)
|
||||
summary = build_summary(parsed, metrics, activity_id, effective_privacy)
|
||||
summaries[activity_id] = summary
|
||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': status})}\n\n"
|
||||
|
||||
imported += 1
|
||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'imported'})}\n\n"
|
||||
except Exception as exc:
|
||||
errors += 1
|
||||
log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True)
|
||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n"
|
||||
|
||||
# Write index and athlete.json once at the end (not once per activity)
|
||||
if imported > 0:
|
||||
yield f"data: {json.dumps({'type': 'status', 'message': 'Writing index and athlete data…'})}\n\n"
|
||||
write_index(list(summaries.values()), user_dir, owner)
|
||||
write_athlete_json(list(summaries.values()), user_dir, athlete_config)
|
||||
yield f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n"
|
||||
merge_all(user_dir)
|
||||
_trigger_rebuild(handle)
|
||||
try:
|
||||
await loop.run_in_executor(None, lambda: write_index(list(summaries.values()), user_dir, owner))
|
||||
await loop.run_in_executor(None, lambda: write_athlete_json(list(summaries.values()), user_dir, athlete_config))
|
||||
yield f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n"
|
||||
await loop.run_in_executor(None, lambda: merge_all(user_dir))
|
||||
_trigger_rebuild(handle)
|
||||
except Exception as exc:
|
||||
log.exception("reextract[%s]: error in final write/merge", handle)
|
||||
yield f"data: {json.dumps({'type': 'error', 'message': f'Final write failed: {exc}'})}\n\n"
|
||||
|
||||
log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", handle, imported, skipped, errors)
|
||||
yield f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n"
|
||||
|
||||
Reference in New Issue
Block a user