1a563012e2
The in-process approach loaded all 2015 Strava originals into the server process memory, causing OOM kills. Now spawns `bincio reextract-originals` as a child process; heavy work runs in an isolated Python interpreter that exits when done, freeing all memory. Also adds `bincio reextract-originals` as a standalone CLI command that prints JSON-lines progress to stdout — useful for running directly on the VPS via SSH for large backlogs.
109 lines
4.4 KiB
Python
109 lines
4.4 KiB
Python
"""bincio reextract-originals — re-extract activities from stored Strava originals."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import click
|
|
|
|
|
|
def _emit(obj: dict) -> None:
|
|
"""Write a JSON progress line to stdout (flushed immediately)."""
|
|
print(json.dumps(obj), flush=True)
|
|
|
|
|
|
@click.command("reextract-originals")
|
|
@click.option("--data-dir", required=True, type=click.Path(), help="BAS data directory")
|
|
@click.option("--handle", required=True, help="User handle to re-extract for")
|
|
@click.option("--force", is_flag=True, default=False, help="Re-extract even if activity JSON already exists")
|
|
def reextract_originals(data_dir: str, handle: str, force: bool) -> None:
|
|
"""Re-extract activities from stored Strava originals (originals/strava/*.json).
|
|
|
|
Prints one JSON object per line to stdout for streaming progress:
|
|
{"type": "status", "message": "..."}
|
|
{"type": "progress", "n": 1, "total": 2015, "name": "...", "status": "imported"|"skipped"|"error", ["detail": "..."]}
|
|
{"type": "done", "imported": N, "skipped": N, "errors": N}
|
|
{"type": "error", "message": "..."}
|
|
"""
|
|
from bincio.extract.strava_api import strava_to_parsed
|
|
from bincio.extract.metrics import compute as compute_metrics
|
|
from bincio.extract.writer import (
|
|
build_summary, make_activity_id, write_activity, write_index,
|
|
)
|
|
from bincio.render.merge import merge_all
|
|
|
|
dd = Path(data_dir).expanduser().resolve()
|
|
user_dir = dd / handle
|
|
originals_dir = user_dir / "originals" / "strava"
|
|
|
|
if not originals_dir.exists():
|
|
_emit({"type": "error", "message": f"No Strava originals directory at {originals_dir}"})
|
|
sys.exit(1)
|
|
|
|
original_files = sorted(originals_dir.glob("*.json"))
|
|
total = len(original_files)
|
|
if total == 0:
|
|
_emit({"type": "error", "message": "No Strava originals found"})
|
|
sys.exit(1)
|
|
|
|
_emit({"type": "status", "message": f"Found {total} originals, starting extraction…"})
|
|
|
|
# Load existing index to get owner info and existing summaries
|
|
index_path = user_dir / "index.json"
|
|
try:
|
|
existing_index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {}
|
|
except Exception:
|
|
existing_index = {}
|
|
owner = existing_index.get("owner", {"handle": handle})
|
|
summaries: dict[str, dict] = {s["id"]: s for s in existing_index.get("activities", [])}
|
|
|
|
imported = skipped = errors = 0
|
|
|
|
for n, orig_path in enumerate(original_files, 1):
|
|
try:
|
|
raw = json.loads(orig_path.read_text(encoding="utf-8"))
|
|
meta = raw.get("meta", {})
|
|
streams = raw.get("streams", {})
|
|
name = meta.get("name", orig_path.stem)
|
|
|
|
parsed = strava_to_parsed(meta, streams)
|
|
activity_id = make_activity_id(parsed)
|
|
|
|
if not force and (user_dir / "activities" / f"{activity_id}.json").exists():
|
|
skipped += 1
|
|
_emit({"type": "progress", "n": n, "total": total, "name": name, "status": "skipped"})
|
|
else:
|
|
metrics = compute_metrics(parsed)
|
|
ep = parsed.privacy if parsed.privacy is not None else "public"
|
|
write_activity(parsed, metrics, user_dir, privacy=ep, rdp_epsilon=0.0001)
|
|
summaries[activity_id] = build_summary(parsed, metrics, activity_id, ep)
|
|
imported += 1
|
|
_emit({"type": "progress", "n": n, "total": total, "name": name, "status": "imported"})
|
|
|
|
# Explicitly free large objects to keep memory low
|
|
del parsed, metrics
|
|
except Exception as exc:
|
|
errors += 1
|
|
_emit({"type": "progress", "n": n, "total": total, "name": orig_path.stem,
|
|
"status": "error", "detail": str(exc)})
|
|
|
|
if imported > 0:
|
|
_emit({"type": "status", "message": "Writing index…"})
|
|
try:
|
|
write_index(list(summaries.values()), user_dir, owner)
|
|
except Exception as exc:
|
|
_emit({"type": "error", "message": f"write_index failed: {exc}"})
|
|
sys.exit(1)
|
|
|
|
_emit({"type": "status", "message": "Running merge…"})
|
|
try:
|
|
merge_all(user_dir)
|
|
except Exception as exc:
|
|
_emit({"type": "error", "message": f"merge_all failed: {exc}"})
|
|
sys.exit(1)
|
|
|
|
_emit({"type": "done", "imported": imported, "skipped": skipped, "errors": errors})
|