Files
bincio-activity/bincio/render/cli.py
T
Davide Scaini c0f6c4da6d render: add --recompute-vam to backfill climbing_time_s into existing activities
Reads each activity's timeseries, re-runs the VAM algorithm (which now returns
both climbing_vam_mh and climbing_time_s), and patches activities/*.json and
index.json in-place. Run once after upgrading to the new schema so NerdCorner
can filter and opacity-encode existing data.
2026-05-17 10:15:29 +02:00

653 lines
25 KiB
Python

"""bincio render — build or serve the Astro static site."""
import os
import subprocess
import sys
from pathlib import Path
from typing import Optional
import click
from rich.console import Console
console = Console()
def _find_site_dir(explicit: Optional[str]) -> Path:
"""Locate the Astro project directory."""
if explicit:
p = Path(explicit).expanduser().resolve()
if not (p / "package.json").exists():
raise click.UsageError(f"No package.json found in --site-dir {p}")
return p
# Search upward from cwd: ./site, ../site (for when cwd is bincio_data/)
for candidate in [Path.cwd() / "site", Path.cwd().parent / "site"]:
if (candidate / "package.json").exists():
return candidate
raise click.UsageError(
"Could not find the Astro site directory. "
"Run from the project root or pass --site-dir."
)
def _find_data_dir(explicit: Optional[str], config_path: Optional[str]) -> Path:
"""Resolve the BAS data directory."""
if explicit:
return Path(explicit).expanduser().resolve()
if config_path and Path(config_path).exists():
import yaml
raw = yaml.safe_load(Path(config_path).read_text()) or {}
out = raw.get("output", {}).get("dir")
if out:
return Path(out).expanduser().resolve()
# Auto-detect: try extract_config.yaml in cwd
auto_config = Path.cwd() / "extract_config.yaml"
if auto_config.exists():
import yaml
raw = yaml.safe_load(auto_config.read_text()) or {}
out = raw.get("output", {}).get("dir")
if out:
return Path(out).expanduser().resolve()
# Default: ./bincio_data next to cwd
default = Path.cwd() / "bincio_data"
if default.exists():
return default
raise click.UsageError(
"Could not find the BAS data directory. "
"Run `bincio extract` first, or pass --data-dir."
)
def _ensure_npm(site: Path) -> None:
"""Run `npm install` if node_modules is missing or stale."""
if not (site / "node_modules").exists():
console.print("Running [cyan]npm install[/cyan]…")
subprocess.run(["npm", "install"], cwd=site, check=True)
def _user_dirs(data: Path) -> list[Path]:
"""Return all per-user subdirectories (contain an activities/ dir)."""
return sorted(
p for p in data.iterdir()
if p.is_dir() and (p / "activities").exists()
)
def _merge_edits(data: Path, handle: str | None = None) -> None:
"""Run the sidecar merge step for one user or all users."""
from bincio.render.merge import merge_all
targets = [data / handle] if handle else _user_dirs(data)
total = 0
for user_dir in targets:
n = merge_all(user_dir)
total += n
console.print(f" [cyan]{user_dir.name}[/cyan]: {n} sidecar(s) merged")
if not total:
console.print("No sidecars found — _merged/ dirs mirror extracted data.")
def _bake_tracks(data: Path, handle: str | None = None) -> None:
"""Bake tracks.json for one user or all users."""
from bincio.explore import bake_tracks
targets = [data / handle] if handle else _user_dirs(data)
for user_dir in targets:
try:
n = bake_tracks(user_dir.name, data)
console.print(f" [cyan]{user_dir.name}[/cyan]: {n} track(s) baked")
except Exception as exc:
console.print(f" [yellow]{user_dir.name}[/yellow]: bake_tracks failed: {exc}")
def _rebuild_athlete_json(data: Path, handle: str | None = None) -> None:
"""Rebuild athlete.json for one user or all users.
Reads raw index.json summaries, applies any sidecar edits in-memory (so
overrides like sub_sport: indoor are respected), then calls write_athlete_json.
"""
import json
from bincio.extract.writer import write_athlete_json
from bincio.render.merge import parse_sidecar, _apply_sidecar_summary
targets = [data / handle] if handle else _user_dirs(data)
_COMPUTED = {"bas_version", "generated_at", "power_curve", "records", "best_climbs"}
for user_dir in targets:
index_path = user_dir / "index.json"
if not index_path.exists():
continue
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
summaries = index_data.get("activities", [])
if not summaries:
continue
# Apply sidecar edits so overrides (e.g. sub_sport: indoor) are visible
# to write_athlete_json without stripping best_efforts/best_climb_m.
edits_dir = user_dir / "edits"
if edits_dir.exists():
sidecars: dict[str, dict] = {}
for sc_path in edits_dir.glob("*.md"):
try:
fm, _ = parse_sidecar(sc_path)
sidecars[sc_path.stem] = fm
except Exception:
pass
if sidecars:
summaries = [
_apply_sidecar_summary(s, sidecars[s["id"]])
if s.get("id") in sidecars else s
for s in summaries
]
athlete_config: dict = {}
athlete_path = user_dir / "athlete.json"
if athlete_path.exists():
try:
existing = json.loads(athlete_path.read_text(encoding="utf-8"))
athlete_config = {k: v for k, v in existing.items() if k not in _COMPUTED}
except Exception:
pass
write_athlete_json(summaries, user_dir, athlete_config)
except Exception as exc:
console.print(f" [yellow]{user_dir.name}[/yellow]: rebuild_athlete failed: {exc}")
def _recompute_best_climbs(data: Path, handle: str | None = None) -> None:
"""Recompute best_climb_m for all cycling activities from their stored timeseries.
Updates activities/*.json and index.json in-place. Run this once after
upgrading the climb algorithm to fix values computed by the old code.
"""
import json
from bincio.extract.metrics import _best_climb
targets = [data / handle] if handle else _user_dirs(data)
for user_dir in targets:
acts_dir = user_dir / "activities"
index_path = user_dir / "index.json"
if not acts_dir.exists() or not index_path.exists():
continue
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except Exception:
continue
updated = 0
for act_path in acts_dir.glob("*.json"):
if act_path.stem.endswith((".timeseries", ".geojson")):
continue
ts_path = acts_dir / f"{act_path.stem}.timeseries.json"
if not ts_path.exists():
continue
try:
detail = json.loads(act_path.read_text(encoding="utf-8"))
if detail.get("sport") != "cycling":
continue
ts = json.loads(ts_path.read_text(encoding="utf-8"))
t_vals = ts.get("t", [])
e_vals = ts.get("elevation_m", [])
pairs = sorted(
(t, e) for t, e in zip(t_vals, e_vals) if e is not None
)
if len(pairs) < 2:
continue
new_val = _best_climb(pairs)
if new_val == detail.get("best_climb_m"):
continue
detail["best_climb_m"] = new_val
act_path.write_text(
json.dumps(detail, indent=2, ensure_ascii=False), encoding="utf-8"
)
act_id = act_path.stem
for s in index_data.get("activities", []):
if s.get("id") == act_id:
s["best_climb_m"] = new_val
break
updated += 1
except Exception:
pass
if updated:
index_path.write_text(
json.dumps(index_data, indent=2, ensure_ascii=False), encoding="utf-8"
)
console.print(f" [cyan]{user_dir.name}[/cyan]: {updated} climb(s) recomputed")
def _recompute_elevation(data: Path, handle: str | None = None) -> None:
"""Recompute elevation_gain_m / elevation_loss_m for all activities.
Applies the dropout-skip fix (near-zero values mid-recording) so stored
values computed by older code are corrected. Updates activities/*.json
and index.json in-place.
"""
import json
from bincio.extract.metrics import _ELEVATION_THRESHOLD
def _accumulate(elevations: list[float], altitude_source: str) -> tuple[float, float]:
if len(elevations) < 2:
return 0.0, 0.0
threshold = _ELEVATION_THRESHOLD.get(altitude_source, 10.0)
# Skip leading near-zeros (device acquiring lock)
start = 0
if abs(elevations[0]) < 0.5:
n_leading = sum(1 for e in elevations if abs(e) < 0.5)
if n_leading > 1:
for i, e in enumerate(elevations):
if abs(e) > threshold:
start = i
break
gain = loss = 0.0
committed = elevations[start]
for e in elevations[start + 1:]:
if abs(e) < 1.0 and abs(committed) > threshold:
continue
diff = e - committed
if abs(diff) >= threshold:
if diff > 0:
gain += diff
else:
loss += diff
committed = e
return gain, loss
targets = [data / handle] if handle else _user_dirs(data)
for user_dir in targets:
acts_dir = user_dir / "activities"
index_path = user_dir / "index.json"
if not acts_dir.exists() or not index_path.exists():
continue
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except Exception:
continue
updated = 0
for act_path in acts_dir.glob("*.json"):
if act_path.stem.endswith((".timeseries", ".geojson")):
continue
ts_path = acts_dir / f"{act_path.stem}.timeseries.json"
if not ts_path.exists():
continue
try:
detail = json.loads(act_path.read_text(encoding="utf-8"))
ts = json.loads(ts_path.read_text(encoding="utf-8"))
raw = ts.get("elevation_m", [])
elevations = [e for e in raw if e is not None]
if len(elevations) < 2:
continue
alt_src = detail.get("altitude_source", "unknown")
new_gain, new_loss = _accumulate(elevations, alt_src)
new_gain_r = round(new_gain, 1) if new_gain else None
new_loss_r = round(abs(new_loss), 1) if new_loss else None
if (new_gain_r == detail.get("elevation_gain_m") and
new_loss_r == detail.get("elevation_loss_m")):
continue
detail["elevation_gain_m"] = new_gain_r
detail["elevation_loss_m"] = new_loss_r
act_path.write_text(
json.dumps(detail, indent=2, ensure_ascii=False), encoding="utf-8"
)
act_id = act_path.stem
for s in index_data.get("activities", []):
if s.get("id") == act_id:
s["elevation_gain_m"] = new_gain_r
s["elevation_loss_m"] = new_loss_r
break
updated += 1
except Exception:
pass
if updated:
index_path.write_text(
json.dumps(index_data, indent=2, ensure_ascii=False), encoding="utf-8"
)
console.print(f" [cyan]{user_dir.name}[/cyan]: {updated} elevation(s) recomputed")
def _write_root_manifest(data: Path) -> None:
"""Rewrite the root index.json shard manifest from current user dirs."""
import json
from datetime import datetime, timezone
users = _user_dirs(data)
# Read existing manifest to preserve instance metadata
root = data / "index.json"
existing: dict = {}
if root.exists():
try:
existing = json.loads(root.read_text())
except Exception:
pass
has_auth = (data / "instance.db").exists()
existing_instance = existing.get("instance", {"name": "BincioActivity"})
if not has_auth:
# Single-user: no auth server, force private off regardless of what was written before.
existing_instance = {**existing_instance, "private": False}
elif "private" not in existing_instance:
# Multi-user first run: default to private.
existing_instance = {**existing_instance, "private": True}
manifest = {
"bas_version": "1.0",
"instance": existing_instance,
"generated_at": datetime.now(timezone.utc).isoformat(),
"shards": [
{
"handle": u.name,
"url": f"{u.name}/_merged/index.json"
if (u / "_merged" / "index.json").exists()
else f"{u.name}/index.json",
}
for u in users
],
"activities": [],
}
root.write_text(json.dumps(manifest, indent=2))
console.print(f"Root manifest updated: [cyan]{len(users)}[/cyan] user shard(s)")
if len(users) > 1:
from bincio.render.merge import write_combined_feed
n = write_combined_feed(data)
console.print(f"Combined feed: [cyan]{n}[/cyan] activities across all users")
def _link_data(site: Path, data: Path) -> None:
"""Symlink site/public/data → data root (each user has their own _merged/)."""
target = data
public_data = site / "public" / "data"
public_data.parent.mkdir(parents=True, exist_ok=True)
if public_data.is_symlink():
if public_data.resolve() == target.resolve():
return # already correct
public_data.unlink()
elif public_data.exists():
console.print(
f"[yellow]Warning:[/yellow] {public_data} exists and is not a symlink — "
"remove it manually if you want bincio to manage it."
)
return
public_data.symlink_to(target)
console.print(f"Linked data: [cyan]{target}[/cyan] → [cyan]{public_data}[/cyan]")
def _recompute_vam(data: Path, handle: str | None = None) -> None:
"""Recompute climbing_vam_mh and climbing_time_s for all activities.
Reads the stored timeseries, re-runs the VAM algorithm, and patches both
activities/*.json and index.json in-place. Run once after adding
climbing_time_s to the schema so the NerdCorner VAM chart can filter short
climbs and opacity-encode confidence.
"""
import json
from bincio.extract.metrics import _VAM_SPORTS, _build_ele_1hz, _vam_from_ele_1hz
targets = [data / handle] if handle else _user_dirs(data)
for user_dir in targets:
acts_dir = user_dir / "activities"
index_path = user_dir / "index.json"
if not acts_dir.exists() or not index_path.exists():
continue
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except Exception:
continue
index_by_id = {s["id"]: s for s in index_data.get("activities", [])}
updated = 0
for act_path in sorted(acts_dir.glob("*.json")):
if act_path.stem.endswith((".timeseries", ".geojson")):
continue
ts_path = acts_dir / f"{act_path.stem}.timeseries.json"
if not ts_path.exists():
continue
try:
detail = json.loads(act_path.read_text(encoding="utf-8"))
if detail.get("sport") not in _VAM_SPORTS:
continue
ts = json.loads(ts_path.read_text(encoding="utf-8"))
t_vals = ts.get("t", [])
e_vals = ts.get("elevation_m", [])
sparse: dict[int, float | None] = {int(t): e for t, e in zip(t_vals, e_vals)}
ele_1hz = _build_ele_1hz(sparse)
result = _vam_from_ele_1hz(ele_1hz) if ele_1hz else None
new_vam, new_climb_t = result if result else (None, None)
if (new_vam == detail.get("climbing_vam_mh") and
new_climb_t == detail.get("climbing_time_s")):
continue
detail["climbing_vam_mh"] = new_vam
detail["climbing_time_s"] = new_climb_t
act_path.write_text(
json.dumps(detail, indent=2, ensure_ascii=False), encoding="utf-8"
)
summary = index_by_id.get(act_path.stem)
if summary is not None:
summary["climbing_vam_mh"] = new_vam
summary["climbing_time_s"] = new_climb_t
updated += 1
except Exception:
pass
if updated:
index_path.write_text(
json.dumps(index_data, indent=2, ensure_ascii=False), encoding="utf-8"
)
console.print(f" [cyan]{user_dir.name}[/cyan]: {updated} VAM(s) recomputed")
def _backfill_vam_summary(data: Path, handle: str | None = None) -> None:
"""Copy climbing_vam_mh from detail JSONs into index.json summaries.
Needed once after the vam_curve→climbing_vam_mh-in-summary migration.
"""
import json
targets = [data / handle] if handle else _user_dirs(data)
for user_dir in targets:
acts_dir = user_dir / "activities"
index_path = user_dir / "index.json"
if not acts_dir.exists() or not index_path.exists():
continue
try:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
except Exception:
continue
updated = 0
for s in index_data.get("activities", []):
if "climbing_vam_mh" in s:
continue # already backfilled
act_path = acts_dir / f"{s['id']}.json"
if not act_path.exists():
continue
try:
detail = json.loads(act_path.read_text(encoding="utf-8"))
vam = detail.get("climbing_vam_mh")
if vam is not None:
s["climbing_vam_mh"] = vam
updated += 1
except Exception:
pass
if updated:
index_path.write_text(
json.dumps(index_data, indent=2, ensure_ascii=False), encoding="utf-8"
)
console.print(f" [cyan]{user_dir.name}[/cyan]: {updated} summary(ies) updated")
def _backfill_speed(data: Path, handle: str | None = None) -> None:
"""Compute GPS-derived speed for timeseries files where speed_kmh is all null.
Reads each *.timeseries.json, fills speed_kmh from haversine distances when
the device did not record per-second speed, and writes the file back.
"""
import json
from bincio.extract.timeseries import _gps_speed_kmh
targets = [data / handle] if handle else _user_dirs(data)
for user_dir in targets:
acts_dir = user_dir / "activities"
if not acts_dir.exists():
continue
updated = 0
for ts_path in sorted(acts_dir.glob("*.timeseries.json")):
try:
ts = json.loads(ts_path.read_text(encoding="utf-8"))
except Exception:
continue
spd = ts.get("speed_kmh", [])
if not spd or any(v is not None for v in spd):
continue # already has speed data
lat_vals = ts.get("lat") or []
lon_vals = ts.get("lon") or []
t_vals = ts.get("t") or []
if not lat_vals or not lon_vals or not t_vals:
continue
ts["speed_kmh"] = _gps_speed_kmh(lat_vals, lon_vals, t_vals)
ts_path.write_text(json.dumps(ts, indent=2, ensure_ascii=False), encoding="utf-8")
updated += 1
console.print(f" [cyan]{user_dir.name}[/cyan]: {updated} timeseries updated with GPS speed")
@click.command()
@click.option("--config", "config_path", default=None,
help="Path to extract_config.yaml (reads output.dir from it).")
@click.option("--data-dir", default=None,
help="BAS data store directory (output of bincio extract).")
@click.option("--site-dir", default=None,
help="Astro project directory (default: ./site).")
@click.option("--out", "out_dir", default=None,
help="Build output directory (default: site/dist).")
@click.option("--serve", is_flag=True,
help="Start dev server with hot reload instead of building.")
@click.option("--deploy", default=None, metavar="TARGET",
help="Deploy after build. Currently supports: github.")
@click.option("--handle", default=None,
help="(Multi-user) Incrementally re-merge one user's shard only.")
@click.option("--no-build", "no_build", is_flag=True,
help="Skip the Astro build step (just merge sidecars and update manifests).")
@click.option("--recompute-climbs", "recompute_climbs", is_flag=True,
help="Recompute best_climb_m for all cycling activities from stored timeseries "
"(run once after upgrading the climb algorithm).")
@click.option("--recompute-elevation", "recompute_elevation", is_flag=True,
help="Recompute elevation_gain_m/loss_m for all activities from stored timeseries "
"(run once after upgrading the dropout-skip fix).")
@click.option("--recompute-vam", "recompute_vam", is_flag=True,
help="Recompute climbing_vam_mh and climbing_time_s for all activities from stored "
"timeseries (run once after adding climbing_time_s to the schema).")
@click.option("--backfill-vam-summary", "backfill_vam_summary", is_flag=True,
help="Copy climbing_vam_mh from detail JSONs into index.json summaries "
"(run once after the VAM curve → summary migration).")
@click.option("--backfill-speed", "backfill_speed", is_flag=True,
help="Compute GPS-derived speed for timeseries where the device didn't record "
"per-second speed (run once to enable speed map coloring on older activities).")
def render(
config_path: Optional[str],
data_dir: Optional[str],
site_dir: Optional[str],
out_dir: Optional[str],
serve: bool,
deploy: Optional[str],
handle: Optional[str],
no_build: bool,
recompute_climbs: bool,
recompute_elevation: bool,
recompute_vam: bool,
backfill_vam_summary: bool,
backfill_speed: bool,
) -> None:
"""Build (or serve) the BincioActivity static site from a BAS data store."""
site = _find_site_dir(site_dir)
data = _find_data_dir(data_dir, config_path)
console.print(f"Site: [cyan]{site}[/cyan]")
console.print(f"Data: [cyan]{data}[/cyan]")
if recompute_climbs:
console.print("Recomputing best climbs from timeseries…")
_recompute_best_climbs(data, handle=handle)
if recompute_elevation:
console.print("Recomputing elevation gain/loss from timeseries…")
_recompute_elevation(data, handle=handle)
if recompute_vam:
console.print("Recomputing VAM and climbing time from timeseries…")
_recompute_vam(data, handle=handle)
if backfill_vam_summary:
console.print("Backfilling climbing_vam_mh into summaries…")
_backfill_vam_summary(data, handle=handle)
if backfill_speed:
console.print("Backfilling GPS-derived speed into timeseries…")
_backfill_speed(data, handle=handle)
_merge_edits(data, handle=handle)
_rebuild_athlete_json(data, handle=handle)
_bake_tracks(data, handle=handle)
_write_root_manifest(data)
if no_build:
console.print("[green]Data updated.[/green] Skipping Astro build (--no-build).")
return
_ensure_npm(site)
env = {**os.environ, "BINCIO_DATA_DIR": str(data)}
if serve:
# Dev server needs to serve /data/ files at runtime from public/
_link_data(site, data)
console.print("Starting [cyan]astro dev[/cyan]…")
subprocess.run(["npm", "run", "dev"], cwd=site, env=env)
return
# Production build: BINCIO_DATA_DIR is already set so manifest.ts reads
# data directly; remove any leftover public/data symlink so Astro doesn't
# copy the full data directory (9+ GB) into dist/.
public_data = site / "public" / "data"
if public_data.is_symlink():
public_data.unlink()
# Build
cmd = ["npm", "run", "build"]
if out_dir:
# Pass outDir via Astro CLI flag
cmd = ["npx", "astro", "build", "--outDir", str(Path(out_dir).resolve())]
console.print("Running [cyan]astro build[/cyan]…")
result = subprocess.run(cmd, cwd=site, env=env)
if result.returncode != 0:
console.print("[red]Build failed.[/red]")
sys.exit(result.returncode)
dist = Path(out_dir).resolve() if out_dir else site / "dist"
console.print(f"\n[green]Build complete.[/green] Output: [cyan]{dist}[/cyan]")
if deploy == "github":
_deploy_github(site, dist)
def _deploy_github(site: Path, dist: Path) -> None:
"""Push dist/ to the gh-pages branch."""
console.print("Deploying to [cyan]GitHub Pages[/cyan]…")
# Requires npx gh-pages or git subtree push
result = subprocess.run(
["npx", "gh-pages", "-d", str(dist)],
cwd=site,
)
if result.returncode != 0:
console.print(
"[yellow]Tip:[/yellow] install gh-pages with `npm install -g gh-pages`"
)