226 lines
8.0 KiB
Python
226 lines
8.0 KiB
Python
"""Backfill MMP and best-effort records into existing BAS activity JSONs.
|
|
|
|
Reads 1Hz timeseries (power_w, speed_kmh, elevation_m) from already-extracted
|
|
detail JSONs — no need to re-parse source FIT/GPX/TCX files.
|
|
|
|
Run once after upgrading to the MMP + records extract pipeline, or whenever
|
|
the computation logic changes and you want to refresh all activities.
|
|
|
|
Usage:
|
|
uv run python scripts/backfill.py [--data-dir ~/src/bincio_data]
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import click
|
|
from rich.console import Console
|
|
from rich.progress import BarColumn, MofNCompleteColumn, Progress, TextColumn, TimeElapsedColumn
|
|
|
|
console = Console()
|
|
|
|
# ── MMP ───────────────────────────────────────────────────────────────────────
|
|
|
|
MMP_DURATIONS_S = [1, 2, 5, 10, 15, 20, 30, 60, 120, 180, 300, 600, 1200, 1800, 3600]
|
|
|
|
|
|
def _compute_mmp(power_w: list) -> list[list[int]] | None:
|
|
samples = [w for w in power_w if w is not None]
|
|
if len(samples) < 2:
|
|
return None
|
|
n = len(samples)
|
|
results = []
|
|
for d in MMP_DURATIONS_S:
|
|
if d > n:
|
|
break
|
|
window_sum = sum(samples[:d])
|
|
best = window_sum
|
|
for i in range(1, n - d + 1):
|
|
window_sum += samples[i + d - 1] - samples[i - 1]
|
|
if window_sum > best:
|
|
best = window_sum
|
|
results.append([d, round(best / d)])
|
|
return results if results else None
|
|
|
|
|
|
# ── Best efforts ──────────────────────────────────────────────────────────────
|
|
|
|
BEST_EFFORT_DISTANCES: dict[str, list[float]] = {
|
|
"running": [0.4, 1.0, 1.609, 5.0, 10.0, 21.097, 42.195],
|
|
"cycling": [5.0, 10.0, 20.0, 50.0, 100.0],
|
|
"swimming": [0.1, 0.2, 0.5, 1.0, 2.0],
|
|
}
|
|
|
|
|
|
def _fastest_time(speed_kmh: list, target_km: float) -> int | None:
|
|
left = 0
|
|
window_dist = 0.0
|
|
best_s = None
|
|
for right, spd in enumerate(speed_kmh):
|
|
window_dist += (spd or 0.0) / 3600.0
|
|
while window_dist >= target_km and left <= right:
|
|
window_s = right - left + 1
|
|
if best_s is None or window_s < best_s:
|
|
best_s = window_s
|
|
window_dist -= (speed_kmh[left] or 0.0) / 3600.0
|
|
left += 1
|
|
return best_s
|
|
|
|
|
|
def _compute_best_efforts(speed_kmh: list, sport: str) -> list[list[float]] | None:
|
|
targets = BEST_EFFORT_DISTANCES.get(sport, [])
|
|
if not targets or not speed_kmh:
|
|
return None
|
|
results = []
|
|
for d_km in targets:
|
|
t_s = _fastest_time(speed_kmh, d_km)
|
|
if t_s is not None:
|
|
results.append([d_km, t_s])
|
|
return results if results else None
|
|
|
|
|
|
def _compute_best_climb(elevation_m: list) -> float | None:
|
|
valid = [e for e in elevation_m if e is not None]
|
|
if len(valid) < 2:
|
|
return None
|
|
max_gain = current = 0.0
|
|
for a, b in zip(valid, valid[1:]):
|
|
current = max(0.0, current + (b - a))
|
|
if current > max_gain:
|
|
max_gain = current
|
|
return round(max_gain, 1) if max_gain > 0 else None
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
@click.command()
|
|
@click.option("--data-dir", default="~/src/bincio_data", show_default=True)
|
|
@click.option("--dry-run", is_flag=True)
|
|
@click.option("--force", is_flag=True, help="Recompute even if fields already present.")
|
|
def main(data_dir: str, dry_run: bool, force: bool) -> None:
|
|
"""Backfill mmp, best_efforts, and best_climb_m into existing activity JSONs."""
|
|
data = Path(data_dir).expanduser()
|
|
acts_dir = data / "activities"
|
|
|
|
if not acts_dir.exists():
|
|
console.print(f"[red]Activities dir not found: {acts_dir}[/red]")
|
|
sys.exit(1)
|
|
|
|
jsons = sorted(acts_dir.glob("*.json"))
|
|
console.print(f"Found [bold]{len(jsons)}[/bold] activity JSONs in {acts_dir}")
|
|
|
|
updated = skipped = 0
|
|
|
|
with Progress(
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(), MofNCompleteColumn(), TimeElapsedColumn(),
|
|
console=console,
|
|
) as progress:
|
|
task = progress.add_task("Backfilling…", total=len(jsons))
|
|
|
|
for path in jsons:
|
|
progress.advance(task)
|
|
try:
|
|
detail = json.loads(path.read_text())
|
|
except Exception:
|
|
skipped += 1
|
|
continue
|
|
|
|
already_done = (
|
|
detail.get("mmp") is not None
|
|
and detail.get("best_efforts") is not None
|
|
or detail.get("best_efforts") == [] # explicitly empty = computed, no results
|
|
)
|
|
if already_done and not force:
|
|
skipped += 1
|
|
continue
|
|
|
|
sport = detail.get("sport", "other")
|
|
ts = detail.get("timeseries") or {}
|
|
power_w = ts.get("power_w") or []
|
|
speed_kmh = ts.get("speed_kmh") or []
|
|
ele_m = ts.get("elevation_m") or []
|
|
|
|
changed = False
|
|
|
|
if detail.get("mmp") is None or force:
|
|
mmp = _compute_mmp(power_w)
|
|
if mmp is not None:
|
|
detail["mmp"] = mmp
|
|
changed = True
|
|
|
|
if detail.get("best_efforts") is None or force:
|
|
be = _compute_best_efforts(speed_kmh, sport)
|
|
detail["best_efforts"] = be # store None or list (None = sport has no targets)
|
|
changed = True
|
|
|
|
if (detail.get("best_climb_m") is None or force) and sport == "cycling":
|
|
bc = _compute_best_climb(ele_m)
|
|
if bc is not None:
|
|
detail["best_climb_m"] = bc
|
|
changed = True
|
|
|
|
if changed:
|
|
if not dry_run:
|
|
path.write_text(json.dumps(detail, indent=2, ensure_ascii=False))
|
|
updated += 1
|
|
|
|
console.print(
|
|
f"\n[green]Done.[/green] "
|
|
f"Updated [bold]{updated}[/bold], skipped [bold]{skipped}[/bold]."
|
|
)
|
|
if dry_run:
|
|
console.print("[yellow]Dry run — nothing written.[/yellow]")
|
|
return
|
|
|
|
# Patch index.json summaries
|
|
console.print("Patching index.json summaries…")
|
|
index_path = data / "index.json"
|
|
index = json.loads(index_path.read_text())
|
|
|
|
lookup: dict[str, dict] = {}
|
|
for path in acts_dir.glob("*.json"):
|
|
try:
|
|
d = json.loads(path.read_text())
|
|
lookup[d["id"]] = {
|
|
"mmp": d.get("mmp"),
|
|
"best_efforts": d.get("best_efforts"),
|
|
"best_climb_m": d.get("best_climb_m"),
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
patched = 0
|
|
for s in index.get("activities", []):
|
|
row = lookup.get(s["id"])
|
|
if not row:
|
|
continue
|
|
if row.get("mmp") and not s.get("mmp"):
|
|
s["mmp"] = row["mmp"]; patched += 1
|
|
if row.get("best_efforts") is not None and s.get("best_efforts") is None:
|
|
s["best_efforts"] = row["best_efforts"]; patched += 1
|
|
if row.get("best_climb_m") and not s.get("best_climb_m"):
|
|
s["best_climb_m"] = row["best_climb_m"]; patched += 1
|
|
|
|
index_path.write_text(json.dumps(index, indent=2, ensure_ascii=False))
|
|
console.print(f" {patched} fields patched in index.json.")
|
|
|
|
# Rebuild athlete.json
|
|
console.print("Rebuilding athlete.json…")
|
|
from bincio.extract.writer import write_athlete_json
|
|
owner = index.get("owner", {})
|
|
athlete_cfg = {k: v for k, v in (owner.get("athlete") or {}).items() if v is not None}
|
|
write_athlete_json(index.get("activities", []), data, athlete_cfg)
|
|
console.print(" athlete.json written.")
|
|
|
|
# Re-merge
|
|
console.print("Running merge_all…")
|
|
from bincio.render.merge import merge_all
|
|
n = merge_all(data)
|
|
console.print(f" merge_all done ({n} sidecars).")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|