add re-extract from Strava originals endpoint and improve diag
- POST /api/admin/users/{handle}/reextract-originals: reads stored
originals/strava/*.json and re-runs strava_to_parsed + ingest_parsed
without hitting the Strava API; streams SSE progress; calls merge_all
and rebuild on completion
- GET /api/admin/users/{handle}/diag: now shows _merged/activities/
file counts, a sample of filenames in activities/ (with symlink flag),
and lists pending_files by name
- Admin page: Re-extract button per user with live SSE progress modal
This commit is contained in:
+101
-1
@@ -619,6 +619,82 @@ async def admin_rebuild_sync(
|
||||
return JSONResponse(resp)
|
||||
|
||||
|
||||
@app.post("/api/admin/users/{handle}/reextract-originals")
|
||||
async def admin_reextract_originals(
|
||||
handle: str,
|
||||
bincio_session: Optional[str] = Cookie(default=None),
|
||||
) -> StreamingResponse:
|
||||
"""Re-extract activities from stored Strava originals without hitting the API.
|
||||
|
||||
Reads each file in originals/strava/{id}.json (containing {"meta", "streams"}),
|
||||
calls strava_to_parsed, and writes the activity JSON + GeoJSON.
|
||||
|
||||
Skips activities that already have a valid JSON file in activities/.
|
||||
Streams SSE progress; call /rebuild after completion if no --webroot.
|
||||
|
||||
SSE events:
|
||||
{"type": "progress", "n": N, "total": T, "name": "...", "status": "imported"|"skipped"|"error", "detail": "..."}
|
||||
{"type": "done", "imported": N, "skipped": N, "errors": N}
|
||||
"""
|
||||
_require_admin(bincio_session)
|
||||
user_dir = _get_data_dir() / handle
|
||||
originals_dir = user_dir / "originals" / "strava"
|
||||
if not originals_dir.exists():
|
||||
raise HTTPException(404, f"No Strava originals directory for '{handle}'")
|
||||
|
||||
original_files = sorted(originals_dir.glob("*.json"))
|
||||
total = len(original_files)
|
||||
log.info("reextract[%s]: starting, %d originals found", handle, total)
|
||||
|
||||
def event_stream():
|
||||
from bincio.extract.strava_api import strava_to_parsed
|
||||
from bincio.extract.ingest import ingest_parsed
|
||||
from bincio.render.merge import merge_all
|
||||
|
||||
imported = 0
|
||||
skipped = 0
|
||||
errors = 0
|
||||
any_imported = False
|
||||
|
||||
for n, orig_path in enumerate(original_files, 1):
|
||||
try:
|
||||
raw = json.loads(orig_path.read_text(encoding="utf-8"))
|
||||
meta = raw.get("meta", {})
|
||||
streams = raw.get("streams", {})
|
||||
name = meta.get("name", orig_path.stem)
|
||||
|
||||
parsed = strava_to_parsed(meta, streams)
|
||||
|
||||
from bincio.extract.writer import make_activity_id
|
||||
activity_id = make_activity_id(parsed)
|
||||
if (user_dir / "activities" / f"{activity_id}.json").exists():
|
||||
skipped += 1
|
||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'skipped'})}\n\n"
|
||||
continue
|
||||
|
||||
ingest_parsed(parsed, user_dir, privacy="public", rdp_epsilon=0.0001)
|
||||
imported += 1
|
||||
any_imported = True
|
||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': 'imported'})}\n\n"
|
||||
except Exception as exc:
|
||||
errors += 1
|
||||
log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True)
|
||||
yield f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n"
|
||||
|
||||
log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", handle, imported, skipped, errors)
|
||||
if any_imported:
|
||||
merge_all(user_dir)
|
||||
_trigger_rebuild(handle)
|
||||
|
||||
yield f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/api/admin/users/{handle}/diag")
|
||||
async def admin_diag(
|
||||
handle: str,
|
||||
@@ -662,6 +738,25 @@ async def admin_diag(
|
||||
except Exception:
|
||||
root_activity_count = -1
|
||||
|
||||
# Peek at a few filenames in activities/ to understand the actual state
|
||||
acts_sample: list[str] = []
|
||||
acts_symlinks = 0
|
||||
if activities_dir.exists():
|
||||
for f in sorted(activities_dir.iterdir())[:10]:
|
||||
acts_sample.append(f.name + (" → symlink" if f.is_symlink() else ""))
|
||||
if f.is_symlink():
|
||||
acts_symlinks += 1
|
||||
|
||||
# Check _merged/activities/ separately
|
||||
merged_acts_dir = merged_dir / "activities"
|
||||
merged_acts_json = _count(merged_acts_dir, "*.json")
|
||||
merged_acts_geojson = _count(merged_acts_dir, "*.geojson")
|
||||
|
||||
# List pending files
|
||||
pending_files: list[str] = []
|
||||
if uploads_dir.exists():
|
||||
pending_files = [f.name for f in uploads_dir.iterdir() if f.is_file()]
|
||||
|
||||
return JSONResponse({
|
||||
"handle": handle,
|
||||
"user_dir": str(user_dir),
|
||||
@@ -669,6 +764,8 @@ async def admin_diag(
|
||||
"json_files": _count(activities_dir, "*.json"),
|
||||
"geojson_files": _count(activities_dir, "*.geojson"),
|
||||
"size_mb": round(_size_mb(activities_dir), 2),
|
||||
"sample": acts_sample,
|
||||
"symlink_count": acts_symlinks,
|
||||
},
|
||||
"originals": {
|
||||
"exists": originals_dir.exists(),
|
||||
@@ -679,12 +776,15 @@ async def admin_diag(
|
||||
"exists": merged_dir.exists(),
|
||||
"activity_count_in_index": merged_activity_count,
|
||||
"size_mb": round(_size_mb(merged_dir), 2),
|
||||
"activities_json": merged_acts_json,
|
||||
"activities_geojson": merged_acts_geojson,
|
||||
},
|
||||
"root_index": {
|
||||
"exists": root_index.exists(),
|
||||
"activity_count": root_activity_count,
|
||||
},
|
||||
"pending_uploads": _count(uploads_dir),
|
||||
"pending_uploads": len(pending_files),
|
||||
"pending_files": pending_files,
|
||||
"dedup_cache_exists": (user_dir / ".bincio_cache.json").exists(),
|
||||
"athlete_json_exists": (user_dir / "athlete.json").exists(),
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user