rewrite reextract with queue-based thread/async bridge
The per-call run_in_executor pattern caused network errors. New approach: one thread runs the entire extraction loop and puts SSE strings into an asyncio.Queue via call_soon_threadsafe; the async generator drains the queue. This is the correct pattern for background-thread + SSE streaming in FastAPI.
This commit is contained in:
+62
-46
@@ -643,28 +643,32 @@ async def admin_reextract_originals(
|
|||||||
total = len(original_files)
|
total = len(original_files)
|
||||||
log.info("reextract[%s]: starting, %d originals found", handle, total)
|
log.info("reextract[%s]: starting, %d originals found", handle, total)
|
||||||
|
|
||||||
# Imports at endpoint level so failures are visible before the stream starts
|
loop = asyncio.get_event_loop()
|
||||||
from bincio.extract.strava_api import strava_to_parsed
|
# Queue carries SSE event strings; None is the sentinel for "done"
|
||||||
from bincio.extract.metrics import compute as compute_metrics
|
q: asyncio.Queue[str | None] = asyncio.Queue()
|
||||||
from bincio.extract.writer import (
|
|
||||||
build_summary, make_activity_id, write_activity,
|
|
||||||
write_index, write_athlete_json,
|
|
||||||
)
|
|
||||||
from bincio.render.merge import merge_all
|
|
||||||
|
|
||||||
async def event_stream():
|
def _run_extraction() -> None:
|
||||||
yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n"
|
"""Runs entirely in a thread pool worker. Puts SSE strings into q."""
|
||||||
|
try:
|
||||||
imported = 0
|
from bincio.extract.strava_api import strava_to_parsed
|
||||||
skipped = 0
|
from bincio.extract.metrics import compute as compute_metrics
|
||||||
errors = 0
|
from bincio.extract.writer import (
|
||||||
|
build_summary, make_activity_id, write_activity,
|
||||||
|
write_index, write_athlete_json,
|
||||||
|
)
|
||||||
|
from bincio.render.merge import merge_all
|
||||||
|
except Exception as exc:
|
||||||
|
log.exception("reextract[%s]: import error", handle)
|
||||||
|
loop.call_soon_threadsafe(q.put_nowait,
|
||||||
|
f"data: {json.dumps({'type': 'error', 'message': f'Import error: {exc}'})}\n\n")
|
||||||
|
loop.call_soon_threadsafe(q.put_nowait, None)
|
||||||
|
return
|
||||||
|
|
||||||
index_path = user_dir / "index.json"
|
index_path = user_dir / "index.json"
|
||||||
try:
|
try:
|
||||||
existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {}
|
existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {}
|
||||||
except Exception:
|
except Exception:
|
||||||
existing_index = {}
|
existing_index = {}
|
||||||
|
|
||||||
owner = existing_index.get("owner", {"handle": handle})
|
owner = existing_index.get("owner", {"handle": handle})
|
||||||
summaries: dict[str, Any] = {s["id"]: s for s in existing_index.get("activities", [])}
|
summaries: dict[str, Any] = {s["id"]: s for s in existing_index.get("activities", [])}
|
||||||
|
|
||||||
@@ -678,55 +682,67 @@ async def admin_reextract_originals(
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
imported = skipped = errors = 0
|
||||||
|
|
||||||
for n, orig_path in enumerate(original_files, 1):
|
for n, orig_path in enumerate(original_files, 1):
|
||||||
try:
|
try:
|
||||||
# Run CPU-intensive parsing + file writing in a thread
|
raw = json.loads(orig_path.read_text(encoding="utf-8"))
|
||||||
def _process_one(op=orig_path):
|
meta = raw.get("meta", {})
|
||||||
raw = json.loads(op.read_text(encoding="utf-8"))
|
streams = raw.get("streams", {})
|
||||||
meta = raw.get("meta", {})
|
name = meta.get("name", orig_path.stem)
|
||||||
streams = raw.get("streams", {})
|
parsed = strava_to_parsed(meta, streams)
|
||||||
name = meta.get("name", op.stem)
|
activity_id = make_activity_id(parsed)
|
||||||
parsed = strava_to_parsed(meta, streams)
|
|
||||||
activity_id = make_activity_id(parsed)
|
|
||||||
if (user_dir / "activities" / f"{activity_id}.json").exists():
|
|
||||||
return "skipped", name, activity_id, None
|
|
||||||
metrics = compute_metrics(parsed)
|
|
||||||
effective_privacy = parsed.privacy if parsed.privacy is not None else "public"
|
|
||||||
write_activity(parsed, metrics, user_dir, privacy=effective_privacy, rdp_epsilon=0.0001)
|
|
||||||
summary = build_summary(parsed, metrics, activity_id, effective_privacy)
|
|
||||||
return "imported", name, activity_id, summary
|
|
||||||
|
|
||||||
status, name, activity_id, summary = await loop.run_in_executor(None, _process_one)
|
if (user_dir / "activities" / f"{activity_id}.json").exists():
|
||||||
|
|
||||||
if status == "skipped":
|
|
||||||
skipped += 1
|
skipped += 1
|
||||||
|
status = "skipped"
|
||||||
else:
|
else:
|
||||||
summaries[activity_id] = summary
|
metrics = compute_metrics(parsed)
|
||||||
|
ep = parsed.privacy if parsed.privacy is not None else "public"
|
||||||
|
write_activity(parsed, metrics, user_dir, privacy=ep, rdp_epsilon=0.0001)
|
||||||
|
summaries[activity_id] = build_summary(parsed, metrics, activity_id, ep)
|
||||||
imported += 1
|
imported += 1
|
||||||
|
status = "imported"
|
||||||
|
|
||||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': status})}\n\n"
|
loop.call_soon_threadsafe(q.put_nowait,
|
||||||
|
f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': status})}\n\n")
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
errors += 1
|
errors += 1
|
||||||
log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True)
|
log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True)
|
||||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n"
|
loop.call_soon_threadsafe(q.put_nowait,
|
||||||
|
f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n")
|
||||||
|
|
||||||
if imported > 0:
|
if imported > 0:
|
||||||
yield f"data: {json.dumps({'type': 'status', 'message': 'Writing index and athlete data…'})}\n\n"
|
loop.call_soon_threadsafe(q.put_nowait,
|
||||||
|
f"data: {json.dumps({'type': 'status', 'message': 'Writing index and athlete data…'})}\n\n")
|
||||||
try:
|
try:
|
||||||
await loop.run_in_executor(None, lambda: write_index(list(summaries.values()), user_dir, owner))
|
write_index(list(summaries.values()), user_dir, owner)
|
||||||
await loop.run_in_executor(None, lambda: write_athlete_json(list(summaries.values()), user_dir, athlete_config))
|
write_athlete_json(list(summaries.values()), user_dir, athlete_config)
|
||||||
yield f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n"
|
loop.call_soon_threadsafe(q.put_nowait,
|
||||||
await loop.run_in_executor(None, lambda: merge_all(user_dir))
|
f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n")
|
||||||
|
merge_all(user_dir)
|
||||||
_trigger_rebuild(handle)
|
_trigger_rebuild(handle)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
log.exception("reextract[%s]: error in final write/merge", handle)
|
log.exception("reextract[%s]: error in final write/merge", handle)
|
||||||
yield f"data: {json.dumps({'type': 'error', 'message': f'Final write failed: {exc}'})}\n\n"
|
loop.call_soon_threadsafe(q.put_nowait,
|
||||||
|
f"data: {json.dumps({'type': 'error', 'message': f'Final write failed: {exc}'})}\n\n")
|
||||||
|
|
||||||
log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", handle, imported, skipped, errors)
|
log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d",
|
||||||
yield f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n"
|
handle, imported, skipped, errors)
|
||||||
|
loop.call_soon_threadsafe(q.put_nowait,
|
||||||
|
f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n")
|
||||||
|
loop.call_soon_threadsafe(q.put_nowait, None) # sentinel
|
||||||
|
|
||||||
|
async def event_stream():
|
||||||
|
yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n"
|
||||||
|
# Kick off the extraction thread
|
||||||
|
loop.run_in_executor(None, _run_extraction)
|
||||||
|
# Drain the queue until sentinel
|
||||||
|
while True:
|
||||||
|
chunk = await q.get()
|
||||||
|
if chunk is None:
|
||||||
|
break
|
||||||
|
yield chunk
|
||||||
|
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
event_stream(),
|
event_stream(),
|
||||||
|
|||||||
Reference in New Issue
Block a user