reextract: process in batches of 100 to bound subprocess memory

One Python process for 2015 activities exhausts all RAM + swap on a
cheap VPS. Split into sequential batches of 100: each subprocess handles
100 activities and exits, returning all memory to the OS before the
next batch starts. The server chains batches in the SSE event_stream
and triggers a single rebuild when all batches complete.
This commit is contained in:
Davide Scaini
2026-04-15 10:08:55 +02:00
parent a67b237161
commit 25d80c8132
2 changed files with 67 additions and 31 deletions
+16 -5
View File
@@ -33,7 +33,9 @@ _GC_EVERY = 50 # call gc.collect() + malloc_trim every N activities
@click.option("--data-dir", required=True, type=click.Path(), help="BAS data directory")
@click.option("--handle", required=True, help="User handle to re-extract for")
@click.option("--force", is_flag=True, default=False, help="Re-extract even if activity JSON already exists")
def reextract_originals(data_dir: str, handle: str, force: bool) -> None:
@click.option("--offset", default=0, type=int, help="Skip first N originals (for batch processing)")
@click.option("--limit", default=0, type=int, help="Process at most N originals then stop (0 = all)")
def reextract_originals(data_dir: str, handle: str, force: bool, offset: int, limit: int) -> None:
"""Re-extract activities from stored Strava originals (originals/strava/*.json).
Prints one JSON object per line to stdout for streaming progress:
@@ -57,13 +59,22 @@ def reextract_originals(data_dir: str, handle: str, force: bool) -> None:
_emit({"type": "error", "message": f"No Strava originals directory at {originals_dir}"})
sys.exit(1)
original_files = sorted(originals_dir.glob("*.json"))
total = len(original_files)
if total == 0:
all_files = sorted(originals_dir.glob("*.json"))
if not all_files:
_emit({"type": "error", "message": "No Strava originals found"})
sys.exit(1)
_emit({"type": "status", "message": f"Found {total} originals, starting extraction…"})
# Apply offset/limit for batch processing
batch = all_files[offset:] if not limit else all_files[offset: offset + limit]
total_all = len(all_files)
total = len(batch)
original_files = batch
_emit({"type": "status", "message": (
f"Batch {offset + 1}{offset + total} of {total_all}, starting extraction…"
if offset or limit else
f"Found {total_all} originals, starting extraction…"
)})
# Load existing index to get owner info and existing summaries
index_path = user_dir / "index.json"
+51 -26
View File
@@ -642,37 +642,62 @@ async def admin_reextract_originals(
import sys as _sys
bincio_exe = str(Path(_sys.executable).parent / "bincio")
data_dir = str(_get_data_dir())
log.info("reextract[%s]: spawning subprocess via %s", handle, bincio_exe)
# Count originals so we can split into memory-safe batches.
total_originals = len(list(originals_dir.glob("*.json")))
# Each activity can briefly peak at ~1030 MB; 100 per batch keeps RSS
# well under 3 GB even on a cheap VPS.
_BATCH = 100
log.info("reextract[%s]: %d originals, batch size %d, via %s",
handle, total_originals, _BATCH, bincio_exe)
async def event_stream():
proc = await asyncio.create_subprocess_exec(
bincio_exe, "reextract-originals",
"--data-dir", data_dir,
"--handle", handle,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert proc.stdout is not None
total_imported = total_skipped = total_errors = 0
offset = 0
async for raw_line in proc.stdout:
line = raw_line.decode(errors="replace").strip()
if not line:
continue
# Forward the JSON line as an SSE event
yield f"data: {line}\n\n"
while offset < total_originals:
limit = min(_BATCH, total_originals - offset)
proc = await asyncio.create_subprocess_exec(
bincio_exe, "reextract-originals",
"--data-dir", data_dir,
"--handle", handle,
"--offset", str(offset),
"--limit", str(limit),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert proc.stdout is not None
await proc.wait()
stderr_out = b""
if proc.stderr:
stderr_out = await proc.stderr.read()
async for raw_line in proc.stdout:
line = raw_line.decode(errors="replace").strip()
if not line:
continue
yield f"data: {line}\n\n"
try:
evt = json.loads(line)
if evt.get("type") == "done":
total_imported += evt.get("imported", 0)
total_skipped += evt.get("skipped", 0)
total_errors += evt.get("errors", 0)
except Exception:
pass
if proc.returncode != 0:
log.error("reextract[%s]: subprocess exited %d — stderr: %s",
handle, proc.returncode, stderr_out.decode(errors="replace")[:500])
yield f"data: {json.dumps({'type': 'error', 'message': f'Process exited with code {proc.returncode}'})}\n\n"
else:
log.info("reextract[%s]: subprocess done, triggering rebuild", handle)
_trigger_rebuild(handle)
await proc.wait()
if proc.returncode != 0:
stderr_out = await proc.stderr.read() if proc.stderr else b""
log.error("reextract[%s]: batch offset=%d exited %d — stderr: %s",
handle, offset, proc.returncode,
stderr_out.decode(errors="replace")[:500])
yield f"data: {json.dumps({'type': 'error', 'message': f'Batch {offset}{offset+limit} exited with code {proc.returncode}'})}\n\n"
return # stop on batch failure
offset += limit
# All batches complete
log.info("reextract[%s]: all batches done — imported=%d skipped=%d errors=%d; triggering rebuild",
handle, total_imported, total_skipped, total_errors)
_trigger_rebuild(handle)
yield f"data: {json.dumps({'type': 'done', 'imported': total_imported, 'skipped': total_skipped, 'errors': total_errors})}\n\n"
return StreamingResponse(
event_stream(),