diff --git a/bincio/reextract_cmd.py b/bincio/reextract_cmd.py index ff73287..8c68231 100644 --- a/bincio/reextract_cmd.py +++ b/bincio/reextract_cmd.py @@ -33,7 +33,9 @@ _GC_EVERY = 50 # call gc.collect() + malloc_trim every N activities @click.option("--data-dir", required=True, type=click.Path(), help="BAS data directory") @click.option("--handle", required=True, help="User handle to re-extract for") @click.option("--force", is_flag=True, default=False, help="Re-extract even if activity JSON already exists") -def reextract_originals(data_dir: str, handle: str, force: bool) -> None: +@click.option("--offset", default=0, type=int, help="Skip first N originals (for batch processing)") +@click.option("--limit", default=0, type=int, help="Process at most N originals then stop (0 = all)") +def reextract_originals(data_dir: str, handle: str, force: bool, offset: int, limit: int) -> None: """Re-extract activities from stored Strava originals (originals/strava/*.json). Prints one JSON object per line to stdout for streaming progress: @@ -57,13 +59,22 @@ def reextract_originals(data_dir: str, handle: str, force: bool) -> None: _emit({"type": "error", "message": f"No Strava originals directory at {originals_dir}"}) sys.exit(1) - original_files = sorted(originals_dir.glob("*.json")) - total = len(original_files) - if total == 0: + all_files = sorted(originals_dir.glob("*.json")) + if not all_files: _emit({"type": "error", "message": "No Strava originals found"}) sys.exit(1) - _emit({"type": "status", "message": f"Found {total} originals, starting extraction…"}) + # Apply offset/limit for batch processing + batch = all_files[offset:] if not limit else all_files[offset: offset + limit] + total_all = len(all_files) + total = len(batch) + original_files = batch + + _emit({"type": "status", "message": ( + f"Batch {offset + 1}–{offset + total} of {total_all}, starting extraction…" + if offset or limit else + f"Found {total_all} originals, starting extraction…" + )}) # Load existing index to get owner info and existing summaries index_path = user_dir / "index.json" diff --git a/bincio/serve/server.py b/bincio/serve/server.py index 052d008..723f641 100644 --- a/bincio/serve/server.py +++ b/bincio/serve/server.py @@ -642,37 +642,62 @@ async def admin_reextract_originals( import sys as _sys bincio_exe = str(Path(_sys.executable).parent / "bincio") data_dir = str(_get_data_dir()) - log.info("reextract[%s]: spawning subprocess via %s", handle, bincio_exe) + + # Count originals so we can split into memory-safe batches. + total_originals = len(list(originals_dir.glob("*.json"))) + # Each activity can briefly peak at ~10–30 MB; 100 per batch keeps RSS + # well under 3 GB even on a cheap VPS. + _BATCH = 100 + log.info("reextract[%s]: %d originals, batch size %d, via %s", + handle, total_originals, _BATCH, bincio_exe) async def event_stream(): - proc = await asyncio.create_subprocess_exec( - bincio_exe, "reextract-originals", - "--data-dir", data_dir, - "--handle", handle, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - assert proc.stdout is not None + total_imported = total_skipped = total_errors = 0 + offset = 0 - async for raw_line in proc.stdout: - line = raw_line.decode(errors="replace").strip() - if not line: - continue - # Forward the JSON line as an SSE event - yield f"data: {line}\n\n" + while offset < total_originals: + limit = min(_BATCH, total_originals - offset) + proc = await asyncio.create_subprocess_exec( + bincio_exe, "reextract-originals", + "--data-dir", data_dir, + "--handle", handle, + "--offset", str(offset), + "--limit", str(limit), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + assert proc.stdout is not None - await proc.wait() - stderr_out = b"" - if proc.stderr: - stderr_out = await proc.stderr.read() + async for raw_line in proc.stdout: + line = raw_line.decode(errors="replace").strip() + if not line: + continue + yield f"data: {line}\n\n" + try: + evt = json.loads(line) + if evt.get("type") == "done": + total_imported += evt.get("imported", 0) + total_skipped += evt.get("skipped", 0) + total_errors += evt.get("errors", 0) + except Exception: + pass - if proc.returncode != 0: - log.error("reextract[%s]: subprocess exited %d — stderr: %s", - handle, proc.returncode, stderr_out.decode(errors="replace")[:500]) - yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n" - else: - log.info("reextract[%s]: subprocess done, triggering rebuild", handle) - _trigger_rebuild(handle) + await proc.wait() + if proc.returncode != 0: + stderr_out = await proc.stderr.read() if proc.stderr else b"" + log.error("reextract[%s]: batch offset=%d exited %d — stderr: %s", + handle, offset, proc.returncode, + stderr_out.decode(errors="replace")[:500]) + yield f"data: {json.dumps({'type': 'error', 'message': f'Batch {offset}–{offset+limit} exited with code {proc.returncode}'})}\n\n" + return # stop on batch failure + + offset += limit + + # All batches complete + log.info("reextract[%s]: all batches done — imported=%d skipped=%d errors=%d; triggering rebuild", + handle, total_imported, total_skipped, total_errors) + _trigger_rebuild(handle) + yield f"data: {json.dumps({'type': 'done', 'imported': total_imported, 'skipped': total_skipped, 'errors': total_errors})}\n\n" return StreamingResponse( event_stream(),