From 1a563012e22f252766e3f52ba5f7c7a6902e3e55 Mon Sep 17 00:00:00 2001 From: Davide Scaini Date: Wed, 15 Apr 2026 09:42:31 +0200 Subject: [PATCH] reextract-originals: run as subprocess to avoid OOM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The in-process approach loaded all 2015 Strava originals into the server process memory, causing OOM kills. Now spawns `bincio reextract-originals` as a child process; heavy work runs in an isolated Python interpreter that exits when done, freeing all memory. Also adds `bincio reextract-originals` as a standalone CLI command that prints JSON-lines progress to stdout — useful for running directly on the VPS via SSH for large backlogs. --- bincio/cli.py | 2 + bincio/reextract_cmd.py | 108 ++++++++++++++++++++++++++++++ bincio/serve/server.py | 142 ++++++++++------------------------------ 3 files changed, 146 insertions(+), 106 deletions(-) create mode 100644 bincio/reextract_cmd.py diff --git a/bincio/cli.py b/bincio/cli.py index 3d73efc..017833a 100644 --- a/bincio/cli.py +++ b/bincio/cli.py @@ -18,6 +18,7 @@ from bincio.import_.cli import import_group # noqa: E402 from bincio.serve.init_cmd import init # noqa: E402 from bincio.serve.cli import serve # noqa: E402 from bincio.dev import dev # noqa: E402 +from bincio.reextract_cmd import reextract_originals # noqa: E402 main.add_command(extract) main.add_command(render) @@ -26,3 +27,4 @@ main.add_command(import_group) main.add_command(init) main.add_command(serve) main.add_command(dev) +main.add_command(reextract_originals) diff --git a/bincio/reextract_cmd.py b/bincio/reextract_cmd.py new file mode 100644 index 0000000..c25c328 --- /dev/null +++ b/bincio/reextract_cmd.py @@ -0,0 +1,108 @@ +"""bincio reextract-originals — re-extract activities from stored Strava originals.""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path +from typing import Optional + +import click + + +def _emit(obj: dict) -> None: + """Write a JSON progress line to stdout (flushed immediately).""" + print(json.dumps(obj), flush=True) + + +@click.command("reextract-originals") +@click.option("--data-dir", required=True, type=click.Path(), help="BAS data directory") +@click.option("--handle", required=True, help="User handle to re-extract for") +@click.option("--force", is_flag=True, default=False, help="Re-extract even if activity JSON already exists") +def reextract_originals(data_dir: str, handle: str, force: bool) -> None: + """Re-extract activities from stored Strava originals (originals/strava/*.json). + + Prints one JSON object per line to stdout for streaming progress: + {"type": "status", "message": "..."} + {"type": "progress", "n": 1, "total": 2015, "name": "...", "status": "imported"|"skipped"|"error", ["detail": "..."]} + {"type": "done", "imported": N, "skipped": N, "errors": N} + {"type": "error", "message": "..."} + """ + from bincio.extract.strava_api import strava_to_parsed + from bincio.extract.metrics import compute as compute_metrics + from bincio.extract.writer import ( + build_summary, make_activity_id, write_activity, write_index, + ) + from bincio.render.merge import merge_all + + dd = Path(data_dir).expanduser().resolve() + user_dir = dd / handle + originals_dir = user_dir / "originals" / "strava" + + if not originals_dir.exists(): + _emit({"type": "error", "message": f"No Strava originals directory at {originals_dir}"}) + sys.exit(1) + + original_files = sorted(originals_dir.glob("*.json")) + total = len(original_files) + if total == 0: + _emit({"type": "error", "message": "No Strava originals found"}) + sys.exit(1) + + _emit({"type": "status", "message": f"Found {total} originals, starting extraction…"}) + + # Load existing index to get owner info and existing summaries + index_path = user_dir / "index.json" + try: + existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {} + except Exception: + existing_index = {} + owner = existing_index.get("owner", {"handle": handle}) + summaries: dict[str, dict] = {s["id"]: s for s in existing_index.get("activities", [])} + + imported = skipped = errors = 0 + + for n, orig_path in enumerate(original_files, 1): + try: + raw = json.loads(orig_path.read_text(encoding="utf-8")) + meta = raw.get("meta", {}) + streams = raw.get("streams", {}) + name = meta.get("name", orig_path.stem) + + parsed = strava_to_parsed(meta, streams) + activity_id = make_activity_id(parsed) + + if not force and (user_dir / "activities" / f"{activity_id}.json").exists(): + skipped += 1 + _emit({"type": "progress", "n": n, "total": total, "name": name, "status": "skipped"}) + else: + metrics = compute_metrics(parsed) + ep = parsed.privacy if parsed.privacy is not None else "public" + write_activity(parsed, metrics, user_dir, privacy=ep, rdp_epsilon=0.0001) + summaries[activity_id] = build_summary(parsed, metrics, activity_id, ep) + imported += 1 + _emit({"type": "progress", "n": n, "total": total, "name": name, "status": "imported"}) + + # Explicitly free large objects to keep memory low + del parsed, metrics + except Exception as exc: + errors += 1 + _emit({"type": "progress", "n": n, "total": total, "name": orig_path.stem, + "status": "error", "detail": str(exc)}) + + if imported > 0: + _emit({"type": "status", "message": "Writing index…"}) + try: + write_index(list(summaries.values()), user_dir, owner) + except Exception as exc: + _emit({"type": "error", "message": f"write_index failed: {exc}"}) + sys.exit(1) + + _emit({"type": "status", "message": "Running merge…"}) + try: + merge_all(user_dir) + except Exception as exc: + _emit({"type": "error", "message": f"merge_all failed: {exc}"}) + sys.exit(1) + + _emit({"type": "done", "imported": imported, "skipped": skipped, "errors": errors}) diff --git a/bincio/serve/server.py b/bincio/serve/server.py index bd6420f..be5ff36 100644 --- a/bincio/serve/server.py +++ b/bincio/serve/server.py @@ -626,122 +626,52 @@ async def admin_reextract_originals( ) -> StreamingResponse: """Re-extract activities from stored Strava originals without hitting the API. - Reads each file in originals/strava/{id}.json (containing {"meta", "streams"}), - calls strava_to_parsed, and writes the activity JSON + GeoJSON. - - Skips activities that already have a valid JSON file in activities/. - Streams SSE progress. Calls merge_all + rebuild on completion. + Spawns `bincio reextract-originals` as a subprocess so heavy memory use + is isolated from the server process. Streams its JSON-lines output as SSE. + Triggers a full rebuild on completion. """ import asyncio + import shutil _require_admin(bincio_session) user_dir = _get_data_dir() / handle originals_dir = user_dir / "originals" / "strava" if not originals_dir.exists(): raise HTTPException(404, f"No Strava originals directory for '{handle}'") - original_files = sorted(originals_dir.glob("*.json")) - total = len(original_files) - log.info("reextract[%s]: starting, %d originals found", handle, total) - - loop = asyncio.get_event_loop() - # Queue carries SSE event strings; None is the sentinel for "done" - q: asyncio.Queue[str | None] = asyncio.Queue() - - def _run_extraction() -> None: - """Runs entirely in a thread pool worker. Puts SSE strings into q.""" - try: - from bincio.extract.strava_api import strava_to_parsed - from bincio.extract.metrics import compute as compute_metrics - from bincio.extract.writer import ( - build_summary, make_activity_id, write_activity, - write_index, - ) - from bincio.render.merge import merge_all - except Exception as exc: - log.exception("reextract[%s]: import error", handle) - loop.call_soon_threadsafe(q.put_nowait, - f"data: {json.dumps({'type': 'error', 'message': f'Import error: {exc}'})}\n\n") - loop.call_soon_threadsafe(q.put_nowait, None) - return - - index_path = user_dir / "index.json" - try: - existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {} - except Exception: - existing_index = {} - owner = existing_index.get("owner", {"handle": handle}) - summaries: dict[str, Any] = {s["id"]: s for s in existing_index.get("activities", [])} - - _COMPUTED = {"bas_version", "generated_at", "power_curve", "records", "best_climbs"} - athlete_config: dict[str, Any] = {} - athlete_path = user_dir / "athlete.json" - if athlete_path.exists(): - try: - existing_athlete = json.loads(athlete_path.read_text(encoding="utf-8")) - athlete_config = {k: v for k, v in existing_athlete.items() if k not in _COMPUTED} - except Exception: - pass - - imported = skipped = errors = 0 - - for n, orig_path in enumerate(original_files, 1): - try: - raw = json.loads(orig_path.read_text(encoding="utf-8")) - meta = raw.get("meta", {}) - streams = raw.get("streams", {}) - name = meta.get("name", orig_path.stem) - parsed = strava_to_parsed(meta, streams) - activity_id = make_activity_id(parsed) - - if (user_dir / "activities" / f"{activity_id}.json").exists(): - skipped += 1 - status = "skipped" - else: - metrics = compute_metrics(parsed) - ep = parsed.privacy if parsed.privacy is not None else "public" - write_activity(parsed, metrics, user_dir, privacy=ep, rdp_epsilon=0.0001) - summaries[activity_id] = build_summary(parsed, metrics, activity_id, ep) - imported += 1 - status = "imported" - - loop.call_soon_threadsafe(q.put_nowait, - f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': name, 'status': status})}\n\n") - except Exception as exc: - errors += 1 - log.error("reextract[%s]: failed on %s: %s", handle, orig_path.name, exc, exc_info=True) - loop.call_soon_threadsafe(q.put_nowait, - f"data: {json.dumps({'type': 'progress', 'n': n, 'total': total, 'name': orig_path.stem, 'status': 'error', 'detail': str(exc)})}\n\n") - - if imported > 0: - loop.call_soon_threadsafe(q.put_nowait, - f"data: {json.dumps({'type': 'status', 'message': 'Writing index…'})}\n\n") - try: - write_index(list(summaries.values()), user_dir, owner) - loop.call_soon_threadsafe(q.put_nowait, - f"data: {json.dumps({'type': 'status', 'message': 'Running merge and rebuild…'})}\n\n") - merge_all(user_dir) - _trigger_rebuild(handle) - except Exception as exc: - log.exception("reextract[%s]: error in final write/merge", handle) - loop.call_soon_threadsafe(q.put_nowait, - f"data: {json.dumps({'type': 'error', 'message': f'Final write failed: {exc}'})}\n\n") - - log.info("reextract[%s]: done — imported=%d skipped=%d errors=%d", - handle, imported, skipped, errors) - loop.call_soon_threadsafe(q.put_nowait, - f"data: {json.dumps({'type': 'done', 'imported': imported, 'skipped': skipped, 'errors': errors})}\n\n") - loop.call_soon_threadsafe(q.put_nowait, None) # sentinel + # Find the `uv` or `bincio` executable that launched us + uv_exe = shutil.which("uv") or "uv" + data_dir = str(_get_data_dir()) + log.info("reextract[%s]: spawning subprocess via %s", handle, uv_exe) async def event_stream(): - yield f"data: {json.dumps({'type': 'status', 'message': f'Found {total} originals, starting extraction…'})}\n\n" - # Kick off the extraction thread - loop.run_in_executor(None, _run_extraction) - # Drain the queue until sentinel - while True: - chunk = await q.get() - if chunk is None: - break - yield chunk + proc = await asyncio.create_subprocess_exec( + uv_exe, "run", "bincio", "reextract-originals", + "--data-dir", data_dir, + "--handle", handle, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + assert proc.stdout is not None + + async for raw_line in proc.stdout: + line = raw_line.decode(errors="replace").strip() + if not line: + continue + # Forward the JSON line as an SSE event + yield f"data: {line}\n\n" + + await proc.wait() + stderr_out = b"" + if proc.stderr: + stderr_out = await proc.stderr.read() + + if proc.returncode != 0: + log.error("reextract[%s]: subprocess exited %d — stderr: %s", + handle, proc.returncode, stderr_out.decode(errors="replace")[:500]) + yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n" + else: + log.info("reextract[%s]: subprocess done, triggering rebuild", handle) + _trigger_rebuild(handle) return StreamingResponse( event_stream(),