147 lines
4.8 KiB
Python
147 lines
4.8 KiB
Python
"""Backfill MMP curve into existing BAS activity JSONs and index.json.
|
|
|
|
Reads power_w from the already-extracted 1Hz timeseries — no need to re-parse
|
|
source FIT files. Run once after upgrading to the MMP-enabled extract pipeline.
|
|
|
|
Usage:
|
|
uv run python scripts/backfill_mmp.py [--data-dir ~/src/bincio_data]
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import click
|
|
from rich.console import Console
|
|
from rich.progress import BarColumn, MofNCompleteColumn, Progress, TextColumn, TimeElapsedColumn
|
|
|
|
console = Console()
|
|
|
|
MMP_DURATIONS_S = [1, 2, 5, 10, 15, 20, 30, 60, 120, 180, 300, 600, 1200, 1800, 3600]
|
|
|
|
|
|
def compute_mmp_from_timeseries(power_w: list) -> list[list[int]] | None:
|
|
"""Compute MMP from an already-extracted 1Hz power array (nulls dropped)."""
|
|
samples = [w for w in power_w if w is not None]
|
|
if len(samples) < 2:
|
|
return None
|
|
|
|
n = len(samples)
|
|
results = []
|
|
for d in MMP_DURATIONS_S:
|
|
if d > n:
|
|
break
|
|
window_sum = sum(samples[:d])
|
|
best = window_sum
|
|
for i in range(1, n - d + 1):
|
|
window_sum += samples[i + d - 1] - samples[i - 1]
|
|
if window_sum > best:
|
|
best = window_sum
|
|
results.append([d, round(best / d)])
|
|
|
|
return results if results else None
|
|
|
|
|
|
@click.command()
|
|
@click.option("--data-dir", default="~/src/bincio_data", show_default=True,
|
|
help="Path to the BAS data directory.")
|
|
@click.option("--dry-run", is_flag=True, help="Compute but don't write anything.")
|
|
def main(data_dir: str, dry_run: bool) -> None:
|
|
"""Backfill mmp field into existing activity JSONs and index.json."""
|
|
data = Path(data_dir).expanduser()
|
|
acts_dir = data / "activities"
|
|
|
|
if not acts_dir.exists():
|
|
console.print(f"[red]Activities dir not found: {acts_dir}[/red]")
|
|
sys.exit(1)
|
|
|
|
jsons = sorted(acts_dir.glob("*.json"))
|
|
console.print(f"Found [bold]{len(jsons)}[/bold] activity JSONs in {acts_dir}")
|
|
|
|
updated = skipped = no_power = 0
|
|
|
|
with Progress(
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(), MofNCompleteColumn(), TimeElapsedColumn(),
|
|
console=console,
|
|
) as progress:
|
|
task = progress.add_task("Backfilling MMP…", total=len(jsons))
|
|
|
|
for path in jsons:
|
|
progress.advance(task)
|
|
try:
|
|
detail = json.loads(path.read_text())
|
|
except Exception:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Skip if already has mmp
|
|
if detail.get("mmp") is not None:
|
|
skipped += 1
|
|
continue
|
|
|
|
ts = detail.get("timeseries") or {}
|
|
power_w = ts.get("power_w") or []
|
|
mmp = compute_mmp_from_timeseries(power_w)
|
|
|
|
if not mmp:
|
|
no_power += 1
|
|
continue
|
|
|
|
detail["mmp"] = mmp
|
|
if not dry_run:
|
|
path.write_text(json.dumps(detail, indent=2, ensure_ascii=False))
|
|
updated += 1
|
|
|
|
console.print(
|
|
f"\n[green]Done.[/green] "
|
|
f"Updated [bold]{updated}[/bold], "
|
|
f"already had mmp [bold]{skipped}[/bold], "
|
|
f"no power data [bold]{no_power}[/bold]."
|
|
)
|
|
|
|
if dry_run:
|
|
console.print("[yellow]Dry run — nothing written.[/yellow]")
|
|
return
|
|
|
|
# Rebuild index.json summaries with mmp
|
|
console.print("Updating index.json summaries…")
|
|
index_path = data / "index.json"
|
|
if index_path.exists():
|
|
index = json.loads(index_path.read_text())
|
|
# Build a lookup from the now-updated detail JSONs
|
|
mmp_by_id: dict[str, list] = {}
|
|
for path in acts_dir.glob("*.json"):
|
|
try:
|
|
d = json.loads(path.read_text())
|
|
if d.get("mmp"):
|
|
mmp_by_id[d["id"]] = d["mmp"]
|
|
except Exception:
|
|
pass
|
|
|
|
for s in index.get("activities", []):
|
|
if s["id"] in mmp_by_id and not s.get("mmp"):
|
|
s["mmp"] = mmp_by_id[s["id"]]
|
|
|
|
index_path.write_text(json.dumps(index, indent=2, ensure_ascii=False))
|
|
console.print(f" Patched {len(mmp_by_id)} summaries in index.json.")
|
|
|
|
# Rebuild athlete.json
|
|
console.print("Rebuilding athlete.json…")
|
|
from bincio.extract.writer import write_athlete_json
|
|
index = json.loads(index_path.read_text())
|
|
owner = index.get("owner", {})
|
|
athlete_cfg = {k: v for k, v in (owner.get("athlete") or {}).items() if v is not None}
|
|
write_athlete_json(index.get("activities", []), data, athlete_cfg)
|
|
console.print(" athlete.json written.")
|
|
|
|
# Re-run merge_all so _merged picks up the changes
|
|
console.print("Running merge_all…")
|
|
from bincio.render.merge import merge_all
|
|
n = merge_all(data)
|
|
console.print(f" merge_all done ({n} sidecars).")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|