fix re-extract: add heartbeat yield, batch index writes, handle HTTP errors in UI
- Generator now yields a 'status' event immediately so the client can distinguish 'working' from 'failed silently before first event' - Batch mode: call write_activity per file but write index.json and athlete.json only once at the end (was O(n²) — 2015 rewrites) - JS: check r.ok before reading the body stream; show HTTP error detail instead of staying stuck at 'Starting…' - Handle 'status' event type in the progress log
This commit is contained in:
+46
-8
@@ -648,13 +648,43 @@ async def admin_reextract_originals(
|
|||||||
|
|
||||||
def event_stream():
|
def event_stream():
|
||||||
from bincio.extract.strava_api import strava_to_parsed
|
from bincio.extract.strava_api import strava_to_parsed
|
||||||
from bincio.extract.ingest import ingest_parsed
|
from bincio.extract.metrics import compute as compute_metrics
|
||||||
|
from bincio.extract.writer import (
|
||||||
|
build_summary, make_activity_id, write_activity,
|
||||||
|
write_index, write_athlete_json,
|
||||||
|
)
|
||||||
from bincio.render.merge import merge_all
|
from bincio.render.merge import merge_all
|
||||||
|
|
||||||
|
# Immediate heartbeat so the client knows the connection is alive
|
||||||
|
yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n"
|
||||||
|
|
||||||
imported = 0
|
imported = 0
|
||||||
skipped = 0
|
skipped = 0
|
||||||
errors = 0
|
errors = 0
|
||||||
any_imported = False
|
|
||||||
|
# Load existing index once upfront (to preserve any non-Strava activities)
|
||||||
|
index_path = user_dir / "index.json"
|
||||||
|
if index_path.exists():
|
||||||
|
try:
|
||||||
|
existing_index = json.loads(index_path.read_text(encoding="utf-8"))
|
||||||
|
except Exception:
|
||||||
|
existing_index = {}
|
||||||
|
else:
|
||||||
|
existing_index = {}
|
||||||
|
|
||||||
|
owner = existing_index.get("owner", {"handle": handle})
|
||||||
|
# Seed summaries from whatever already exists in activities/
|
||||||
|
summaries: dict[str, Any] = {s["id"]: s for s in existing_index.get("activities", [])}
|
||||||
|
|
||||||
|
_COMPUTED = {"bas_version", "generated_at", "power_curve", "records", "best_climbs"}
|
||||||
|
athlete_config: dict[str, Any] = {}
|
||||||
|
athlete_path = user_dir / "athlete.json"
|
||||||
|
if athlete_path.exists():
|
||||||
|
try:
|
||||||
|
existing_athlete = json.loads(athlete_path.read_text(encoding="utf-8"))
|
||||||
|
athlete_config = {k: v for k, v in existing_athlete.items() if k not in _COMPUTED}
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
for n, orig_path in enumerate(original_files, 1):
|
for n, orig_path in enumerate(original_files, 1):
|
||||||
try:
|
try:
|
||||||
@@ -664,28 +694,36 @@ async def admin_reextract_originals(
|
|||||||
name = meta.get("name", orig_path.stem)
|
name = meta.get("name", orig_path.stem)
|
||||||
|
|
||||||
parsed = strava_to_parsed(meta, streams)
|
parsed = strava_to_parsed(meta, streams)
|
||||||
|
|
||||||
from bincio.extract.writer import make_activity_id
|
|
||||||
activity_id = make_activity_id(parsed)
|
activity_id = make_activity_id(parsed)
|
||||||
|
|
||||||
if (user_dir / "activities" / f"{activity_id}.json").exists():
|
if (user_dir / "activities" / f"{activity_id}.json").exists():
|
||||||
skipped += 1
|
skipped += 1
|
||||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'skipped'})}\n\n"
|
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'skipped'})}\n\n"
|
||||||
continue
|
continue
|
||||||
|
|
||||||
ingest_parsed(parsed, user_dir, privacy="public", rdp_epsilon=0.0001)
|
metrics = compute_metrics(parsed)
|
||||||
|
effective_privacy = parsed.privacy if parsed.privacy is not None else "public"
|
||||||
|
write_activity(parsed, metrics, user_dir, privacy=effective_privacy, rdp_epsilon=0.0001)
|
||||||
|
summary = build_summary(parsed, metrics, activity_id, effective_privacy)
|
||||||
|
summaries[activity_id] = summary
|
||||||
|
|
||||||
imported += 1
|
imported += 1
|
||||||
any_imported = True
|
|
||||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'imported'})}\n\n"
|
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'imported'})}\n\n"
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
errors += 1
|
errors += 1
|
||||||
log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True)
|
log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True)
|
||||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n"
|
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n"
|
||||||
|
|
||||||
log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", handle, imported, skipped, errors)
|
# Write index and athlete.json once at the end (not once per activity)
|
||||||
if any_imported:
|
if imported > 0:
|
||||||
|
yield f"data: {json.dumps({'type': 'status', 'message': 'Writing index and athlete data…'})}\n\n"
|
||||||
|
write_index(list(summaries.values()), user_dir, owner)
|
||||||
|
write_athlete_json(list(summaries.values()), user_dir, athlete_config)
|
||||||
|
yield f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n"
|
||||||
merge_all(user_dir)
|
merge_all(user_dir)
|
||||||
_trigger_rebuild(handle)
|
_trigger_rebuild(handle)
|
||||||
|
|
||||||
|
log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", handle, imported, skipped, errors)
|
||||||
yield f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n"
|
yield f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n"
|
||||||
|
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
|
|||||||
@@ -190,13 +190,15 @@ import Base from '../../layouts/Base.astro';
|
|||||||
|
|
||||||
let imported = 0, skipped = 0, errors = 0;
|
let imported = 0, skipped = 0, errors = 0;
|
||||||
try {
|
try {
|
||||||
const es = new EventSource(`/api/admin/users/${h}/reextract-originals`);
|
|
||||||
// EventSource only does GET; use fetch + ReadableStream instead
|
|
||||||
es.close();
|
|
||||||
|
|
||||||
const r = await fetch(`/api/admin/users/${h}/reextract-originals`, {
|
const r = await fetch(`/api/admin/users/${h}/reextract-originals`, {
|
||||||
method: 'POST', credentials: 'include',
|
method: 'POST', credentials: 'include',
|
||||||
});
|
});
|
||||||
|
if (!r.ok) {
|
||||||
|
const errText = await r.text().catch(() => r.status.toString());
|
||||||
|
reextractSummary.textContent = `Error ${r.status}: ${errText}`;
|
||||||
|
reextractClose.disabled = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
const reader = r.body!.getReader();
|
const reader = r.body!.getReader();
|
||||||
const decoder = new TextDecoder();
|
const decoder = new TextDecoder();
|
||||||
let buf = '';
|
let buf = '';
|
||||||
@@ -210,7 +212,9 @@ import Base from '../../layouts/Base.astro';
|
|||||||
const dataLine = chunk.split('\n').find(l => l.startsWith('data: '));
|
const dataLine = chunk.split('\n').find(l => l.startsWith('data: '));
|
||||||
if (!dataLine) continue;
|
if (!dataLine) continue;
|
||||||
const ev = JSON.parse(dataLine.slice(6));
|
const ev = JSON.parse(dataLine.slice(6));
|
||||||
if (ev.type === 'progress') {
|
if (ev.type === 'status') {
|
||||||
|
reextractSummary.textContent = ev.message;
|
||||||
|
} else if (ev.type === 'progress') {
|
||||||
const color = ev.status === 'imported' ? 'text-green-400'
|
const color = ev.status === 'imported' ? 'text-green-400'
|
||||||
: ev.status === 'error' ? 'text-red-400'
|
: ev.status === 'error' ? 'text-red-400'
|
||||||
: 'text-zinc-500';
|
: 'text-zinc-500';
|
||||||
|
|||||||
Reference in New Issue
Block a user