reextract-originals: run as subprocess to avoid OOM
The in-process approach loaded all 2015 Strava originals into the server process memory, causing OOM kills. Now spawns `bincio reextract-originals` as a child process; heavy work runs in an isolated Python interpreter that exits when done, freeing all memory. Also adds `bincio reextract-originals` as a standalone CLI command that prints JSON-lines progress to stdout — useful for running directly on the VPS via SSH for large backlogs.
This commit is contained in:
@@ -18,6 +18,7 @@ from bincio.import_.cli import import_group # noqa: E402
|
||||
from bincio.serve.init_cmd import init # noqa: E402
|
||||
from bincio.serve.cli import serve # noqa: E402
|
||||
from bincio.dev import dev # noqa: E402
|
||||
from bincio.reextract_cmd import reextract_originals # noqa: E402
|
||||
|
||||
main.add_command(extract)
|
||||
main.add_command(render)
|
||||
@@ -26,3 +27,4 @@ main.add_command(import_group)
|
||||
main.add_command(init)
|
||||
main.add_command(serve)
|
||||
main.add_command(dev)
|
||||
main.add_command(reextract_originals)
|
||||
|
||||
@@ -0,0 +1,108 @@
|
||||
"""bincio reextract-originals — re-extract activities from stored Strava originals."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import click
|
||||
|
||||
|
||||
def _emit(obj: dict) -> None:
|
||||
"""Write a JSON progress line to stdout (flushed immediately)."""
|
||||
print(json.dumps(obj), flush=True)
|
||||
|
||||
|
||||
@click.command("reextract-originals")
|
||||
@click.option("--data-dir", required=True, type=click.Path(), help="BAS data directory")
|
||||
@click.option("--handle", required=True, help="User handle to re-extract for")
|
||||
@click.option("--force", is_flag=True, default=False, help="Re-extract even if activity JSON already exists")
|
||||
def reextract_originals(data_dir: str, handle: str, force: bool) -> None:
|
||||
"""Re-extract activities from stored Strava originals (originals/strava/*.json).
|
||||
|
||||
Prints one JSON object per line to stdout for streaming progress:
|
||||
{"type": "status", "message": "..."}
|
||||
{"type": "progress", "n": 1, "total": 2015, "name": "...", "status": "imported"|"skipped"|"error", ["detail": "..."]}
|
||||
{"type": "done", "imported": N, "skipped": N, "errors": N}
|
||||
{"type": "error", "message": "..."}
|
||||
"""
|
||||
from bincio.extract.strava_api import strava_to_parsed
|
||||
from bincio.extract.metrics import compute as compute_metrics
|
||||
from bincio.extract.writer import (
|
||||
build_summary, make_activity_id, write_activity, write_index,
|
||||
)
|
||||
from bincio.render.merge import merge_all
|
||||
|
||||
dd = Path(data_dir).expanduser().resolve()
|
||||
user_dir = dd / handle
|
||||
originals_dir = user_dir / "originals" / "strava"
|
||||
|
||||
if not originals_dir.exists():
|
||||
_emit({"type": "error", "message": f"No Strava originals directory at {originals_dir}"})
|
||||
sys.exit(1)
|
||||
|
||||
original_files = sorted(originals_dir.glob("*.json"))
|
||||
total = len(original_files)
|
||||
if total == 0:
|
||||
_emit({"type": "error", "message": "No Strava originals found"})
|
||||
sys.exit(1)
|
||||
|
||||
_emit({"type": "status", "message": f"Found {total} originals, starting extraction…"})
|
||||
|
||||
# Load existing index to get owner info and existing summaries
|
||||
index_path = user_dir / "index.json"
|
||||
try:
|
||||
existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {}
|
||||
except Exception:
|
||||
existing_index = {}
|
||||
owner = existing_index.get("owner", {"handle": handle})
|
||||
summaries: dict[str, dict] = {s["id"]: s for s in existing_index.get("activities", [])}
|
||||
|
||||
imported = skipped = errors = 0
|
||||
|
||||
for n, orig_path in enumerate(original_files, 1):
|
||||
try:
|
||||
raw = json.loads(orig_path.read_text(encoding="utf-8"))
|
||||
meta = raw.get("meta", {})
|
||||
streams = raw.get("streams", {})
|
||||
name = meta.get("name", orig_path.stem)
|
||||
|
||||
parsed = strava_to_parsed(meta, streams)
|
||||
activity_id = make_activity_id(parsed)
|
||||
|
||||
if not force and (user_dir / "activities" / f"{activity_id}.json").exists():
|
||||
skipped += 1
|
||||
_emit({"type": "progress", "n": n, "total": total, "name": name, "status": "skipped"})
|
||||
else:
|
||||
metrics = compute_metrics(parsed)
|
||||
ep = parsed.privacy if parsed.privacy is not None else "public"
|
||||
write_activity(parsed, metrics, user_dir, privacy=ep, rdp_epsilon=0.0001)
|
||||
summaries[activity_id] = build_summary(parsed, metrics, activity_id, ep)
|
||||
imported += 1
|
||||
_emit({"type": "progress", "n": n, "total": total, "name": name, "status": "imported"})
|
||||
|
||||
# Explicitly free large objects to keep memory low
|
||||
del parsed, metrics
|
||||
except Exception as exc:
|
||||
errors += 1
|
||||
_emit({"type": "progress", "n": n, "total": total, "name": orig_path.stem,
|
||||
"status": "error", "detail": str(exc)})
|
||||
|
||||
if imported > 0:
|
||||
_emit({"type": "status", "message": "Writing index…"})
|
||||
try:
|
||||
write_index(list(summaries.values()), user_dir, owner)
|
||||
except Exception as exc:
|
||||
_emit({"type": "error", "message": f"write_index failed: {exc}"})
|
||||
sys.exit(1)
|
||||
|
||||
_emit({"type": "status", "message": "Running merge…"})
|
||||
try:
|
||||
merge_all(user_dir)
|
||||
except Exception as exc:
|
||||
_emit({"type": "error", "message": f"merge_all failed: {exc}"})
|
||||
sys.exit(1)
|
||||
|
||||
_emit({"type": "done", "imported": imported, "skipped": skipped, "errors": errors})
|
||||
+36
-106
@@ -626,122 +626,52 @@ async def admin_reextract_originals(
|
||||
) -> StreamingResponse:
|
||||
"""Re-extract activities from stored Strava originals without hitting the API.
|
||||
|
||||
Reads each file in originals/strava/{id}.json (containing {"meta", "streams"}),
|
||||
calls strava_to_parsed, and writes the activity JSON + GeoJSON.
|
||||
|
||||
Skips activities that already have a valid JSON file in activities/.
|
||||
Streams SSE progress. Calls merge_all + rebuild on completion.
|
||||
Spawns `bincio reextract-originals` as a subprocess so heavy memory use
|
||||
is isolated from the server process. Streams its JSON-lines output as SSE.
|
||||
Triggers a full rebuild on completion.
|
||||
"""
|
||||
import asyncio
|
||||
import shutil
|
||||
_require_admin(bincio_session)
|
||||
user_dir = _get_data_dir() / handle
|
||||
originals_dir = user_dir / "originals" / "strava"
|
||||
if not originals_dir.exists():
|
||||
raise HTTPException(404, f"No Strava originals directory for '{handle}'")
|
||||
|
||||
original_files = sorted(originals_dir.glob("*.json"))
|
||||
total = len(original_files)
|
||||
log.info("reextract[%s]: starting, %d originals found", handle, total)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
# Queue carries SSE event strings; None is the sentinel for "done"
|
||||
q: asyncio.Queue[str | None] = asyncio.Queue()
|
||||
|
||||
def _run_extraction() -> None:
|
||||
"""Runs entirely in a thread pool worker. Puts SSE strings into q."""
|
||||
try:
|
||||
from bincio.extract.strava_api import strava_to_parsed
|
||||
from bincio.extract.metrics import compute as compute_metrics
|
||||
from bincio.extract.writer import (
|
||||
build_summary, make_activity_id, write_activity,
|
||||
write_index,
|
||||
)
|
||||
from bincio.render.merge import merge_all
|
||||
except Exception as exc:
|
||||
log.exception("reextract[%s]: import error", handle)
|
||||
loop.call_soon_threadsafe(q.put_nowait,
|
||||
f"data: {json.dumps({'type': 'error', 'message': f'Import error: {exc}'})}\n\n")
|
||||
loop.call_soon_threadsafe(q.put_nowait, None)
|
||||
return
|
||||
|
||||
index_path = user_dir / "index.json"
|
||||
try:
|
||||
existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {}
|
||||
except Exception:
|
||||
existing_index = {}
|
||||
owner = existing_index.get("owner", {"handle": handle})
|
||||
summaries: dict[str, Any] = {s["id"]: s for s in existing_index.get("activities", [])}
|
||||
|
||||
_COMPUTED = {"bas_version", "generated_at", "power_curve", "records", "best_climbs"}
|
||||
athlete_config: dict[str, Any] = {}
|
||||
athlete_path = user_dir / "athlete.json"
|
||||
if athlete_path.exists():
|
||||
try:
|
||||
existing_athlete = json.loads(athlete_path.read_text(encoding="utf-8"))
|
||||
athlete_config = {k: v for k, v in existing_athlete.items() if k not in _COMPUTED}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
imported = skipped = errors = 0
|
||||
|
||||
for n, orig_path in enumerate(original_files, 1):
|
||||
try:
|
||||
raw = json.loads(orig_path.read_text(encoding="utf-8"))
|
||||
meta = raw.get("meta", {})
|
||||
streams = raw.get("streams", {})
|
||||
name = meta.get("name", orig_path.stem)
|
||||
parsed = strava_to_parsed(meta, streams)
|
||||
activity_id = make_activity_id(parsed)
|
||||
|
||||
if (user_dir / "activities" / f"{activity_id}.json").exists():
|
||||
skipped += 1
|
||||
status = "skipped"
|
||||
else:
|
||||
metrics = compute_metrics(parsed)
|
||||
ep = parsed.privacy if parsed.privacy is not None else "public"
|
||||
write_activity(parsed, metrics, user_dir, privacy=ep, rdp_epsilon=0.0001)
|
||||
summaries[activity_id] = build_summary(parsed, metrics, activity_id, ep)
|
||||
imported += 1
|
||||
status = "imported"
|
||||
|
||||
loop.call_soon_threadsafe(q.put_nowait,
|
||||
f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': status})}\n\n")
|
||||
except Exception as exc:
|
||||
errors += 1
|
||||
log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True)
|
||||
loop.call_soon_threadsafe(q.put_nowait,
|
||||
f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n")
|
||||
|
||||
if imported > 0:
|
||||
loop.call_soon_threadsafe(q.put_nowait,
|
||||
f"data: {json.dumps({'type': 'status', 'message': 'Writing index…'})}\n\n")
|
||||
try:
|
||||
write_index(list(summaries.values()), user_dir, owner)
|
||||
loop.call_soon_threadsafe(q.put_nowait,
|
||||
f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n")
|
||||
merge_all(user_dir)
|
||||
_trigger_rebuild(handle)
|
||||
except Exception as exc:
|
||||
log.exception("reextract[%s]: error in final write/merge", handle)
|
||||
loop.call_soon_threadsafe(q.put_nowait,
|
||||
f"data: {json.dumps({'type': 'error', 'message': f'Final write failed: {exc}'})}\n\n")
|
||||
|
||||
log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d",
|
||||
handle, imported, skipped, errors)
|
||||
loop.call_soon_threadsafe(q.put_nowait,
|
||||
f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n")
|
||||
loop.call_soon_threadsafe(q.put_nowait, None) # sentinel
|
||||
# Find the `uv` or `bincio` executable that launched us
|
||||
uv_exe = shutil.which("uv") or "uv"
|
||||
data_dir = str(_get_data_dir())
|
||||
log.info("reextract[%s]: spawning subprocess via %s", handle, uv_exe)
|
||||
|
||||
async def event_stream():
|
||||
yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n"
|
||||
# Kick off the extraction thread
|
||||
loop.run_in_executor(None, _run_extraction)
|
||||
# Drain the queue until sentinel
|
||||
while True:
|
||||
chunk = await q.get()
|
||||
if chunk is None:
|
||||
break
|
||||
yield chunk
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
uv_exe, "run", "bincio", "reextract-originals",
|
||||
"--data-dir", data_dir,
|
||||
"--handle", handle,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
assert proc.stdout is not None
|
||||
|
||||
async for raw_line in proc.stdout:
|
||||
line = raw_line.decode(errors="replace").strip()
|
||||
if not line:
|
||||
continue
|
||||
# Forward the JSON line as an SSE event
|
||||
yield f"data: {line}\n\n"
|
||||
|
||||
await proc.wait()
|
||||
stderr_out = b""
|
||||
if proc.stderr:
|
||||
stderr_out = await proc.stderr.read()
|
||||
|
||||
if proc.returncode != 0:
|
||||
log.error("reextract[%s]: subprocess exited %d — stderr: %s",
|
||||
handle, proc.returncode, stderr_out.decode(errors="replace")[:500])
|
||||
yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n"
|
||||
else:
|
||||
log.info("reextract[%s]: subprocess done, triggering rebuild", handle)
|
||||
_trigger_rebuild(handle)
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
|
||||
Reference in New Issue
Block a user