athlete page first draft

This commit is contained in:
Davide Scaini
2026-03-30 09:05:18 +02:00
parent 2a1493a3e5
commit ec6175b143
8 changed files with 594 additions and 3 deletions
+16 -2
View File
@@ -94,6 +94,7 @@ def _process_file(path: Path) -> dict:
"started_at": activity.started_at.isoformat(),
"distance_m": metrics.distance_m,
"source": summary.get("source"),
"mmp": metrics.mmp,
}
@@ -210,12 +211,25 @@ def extract(
))
summaries.append(result["summary"])
from bincio.extract.writer import write_index
from bincio.extract.writer import write_athlete_json, write_index
existing = _load_existing_summaries(cfg.output_dir)
merged = {s["id"]: s for s in existing}
for s in summaries:
merged[s["id"]] = s
write_index(list(merged.values()), cfg.output_dir, owner)
all_summaries = list(merged.values())
write_index(all_summaries, cfg.output_dir, owner)
athlete_config: dict = {}
if cfg.athlete:
ath = cfg.athlete
athlete_config = {k: v for k, v in {
"max_hr": ath.max_hr,
"ftp_w": ath.ftp_w,
"hr_zones": ath.hr_zones,
"power_zones": ath.power_zones,
}.items() if v is not None}
write_athlete_json(all_summaries, cfg.output_dir, athlete_config)
dedup.save()
console.print(
+55
View File
@@ -6,10 +6,14 @@ Uses inline haversine rather than geopy.geodesic to keep the hot path fast.
import math
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from bincio.extract.models import DataPoint, ParsedActivity
# Standard MMP durations (seconds). Log-spaced so the curve looks good on a log-x axis.
MMP_DURATIONS_S = [1, 2, 5, 10, 15, 20, 30, 60, 120, 180, 300, 600, 1200, 1800, 3600]
# Speed below which we consider the athlete stopped (km/h)
_STOPPED_THRESHOLD_KMH = 1.0
_EARTH_R = 6_371_000.0 # metres
@@ -42,6 +46,7 @@ class ComputedMetrics:
bbox: Optional[tuple[float, float, float, float]] # min_lon, min_lat, max_lon, max_lat
start_latlng: Optional[tuple[float, float]]
end_latlng: Optional[tuple[float, float]]
mmp: Optional[list[list[int]]] # [[duration_s, avg_watts], ...] — None if no power data
def compute(activity: ParsedActivity) -> ComputedMetrics:
@@ -58,6 +63,7 @@ def compute(activity: ParsedActivity) -> ComputedMetrics:
max_pow = _max_nonnull([p.power_w for p in pts])
bbox = _bbox(pts)
start_ll, end_ll = _endpoints(pts)
mmp = compute_mmp(pts, activity.started_at)
return ComputedMetrics(
distance_m=distance_m,
@@ -75,9 +81,57 @@ def compute(activity: ParsedActivity) -> ComputedMetrics:
bbox=bbox,
start_latlng=start_ll,
end_latlng=end_ll,
mmp=mmp,
)
# ── mean maximal power ────────────────────────────────────────────────────────
def compute_mmp(pts: list[DataPoint], started_at: datetime) -> Optional[list[list[int]]]:
"""Compute Mean Maximal Power curve at the standard MMP_DURATIONS_S.
Builds a 1 Hz power series (same downsampling as timeseries.py), then uses
a O(n) sliding-window sum for each duration. Returns a list of
[duration_s, avg_watts] pairs (integers), or None when the activity has no
power data. Only durations shorter than the total activity are included.
"""
# 1 Hz downsample: at most one sample per second, skip sub-second duplicates.
# Seconds without a recorded sample are omitted (not zero-filled) so that
# paused-recording gaps don't silently lower power averages.
power_1hz: list[int] = []
last_t = -1
for p in pts:
t = int((p.timestamp - started_at).total_seconds())
if t < 0 or t == last_t:
continue
last_t = t
if p.power_w is not None:
power_1hz.append(p.power_w)
if len(power_1hz) < 2:
return None
n = len(power_1hz)
results: list[list[int]] = []
for d in MMP_DURATIONS_S:
if d > n:
break # activity shorter than this duration — stop (durations are sorted)
# Sliding window of exactly d samples = d seconds at 1 Hz.
window_sum = sum(power_1hz[:d])
best = window_sum
for i in range(1, n - d + 1):
window_sum += power_1hz[i + d - 1] - power_1hz[i - 1]
if window_sum > best:
best = window_sum
results.append([d, round(best / d)])
return results if results else None
# ── single-pass GPS stats ──────────────────────────────────────────────────────
# distance, moving time, avg speed, and max speed are all derived from the same
# per-segment loop, so we compute them in one pass instead of four.
@@ -209,4 +263,5 @@ def _empty() -> ComputedMetrics:
avg_hr_bpm=None, max_hr_bpm=None,
avg_cadence_rpm=None, avg_power_w=None, max_power_w=None,
bbox=None, start_latlng=None, end_latlng=None,
mmp=None,
)
+52
View File
@@ -68,6 +68,7 @@ def write_activity(
"bbox": list(metrics.bbox) if metrics.bbox else None,
"start_latlng": list(metrics.start_latlng) if metrics.start_latlng else None,
"end_latlng": list(metrics.end_latlng) if metrics.end_latlng else None,
"mmp": metrics.mmp,
"laps": [_serialise_lap(lap) for lap in activity.laps],
"timeseries": build_timeseries(activity.points, activity.started_at, privacy),
"source": source,
@@ -115,6 +116,7 @@ def build_summary(
"max_hr_bpm": metrics.max_hr_bpm,
"avg_cadence_rpm": metrics.avg_cadence_rpm,
"avg_power_w": metrics.avg_power_w,
"mmp": metrics.mmp,
"source": _infer_source(activity),
"privacy": privacy,
"detail_url": f"activities/{activity_id}.json",
@@ -124,6 +126,56 @@ def build_summary(
}
def write_athlete_json(summaries: list[dict], output_dir: Path, athlete_config: dict) -> None:
"""Aggregate per-activity MMP curves into athlete.json.
Computes element-wise max MMP for:
- all_time
- last_365d
- last_90d
The site reads this single file for the athlete/power-curve page.
Per-activity mmp is already in each summary for client-side season filtering.
"""
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
def _cutoff_iso(days: int) -> str:
from datetime import timedelta
return (now - timedelta(days=days)).isoformat()
cutoff_365 = _cutoff_iso(365)
cutoff_90 = _cutoff_iso(90)
def _merge_mmps(activity_mmps: list[list[list[int]]]) -> list[list[int]]:
"""Element-wise max across a list of mmp arrays."""
best: dict[int, int] = {}
for mmp in activity_mmps:
for d, w in mmp:
if d not in best or w > best[d]:
best[d] = w
return [[d, w] for d, w in sorted(best.items())]
all_mmps = [s["mmp"] for s in summaries if s.get("mmp")]
mmps_365 = [s["mmp"] for s in summaries if s.get("mmp") and s["started_at"] >= cutoff_365]
mmps_90 = [s["mmp"] for s in summaries if s.get("mmp") and s["started_at"] >= cutoff_90]
athlete = {
"bas_version": "1.0",
"generated_at": now.isoformat(),
"power_curve": {
"all_time": _merge_mmps(all_mmps) if all_mmps else None,
"last_365d": _merge_mmps(mmps_365) if mmps_365 else None,
"last_90d": _merge_mmps(mmps_90) if mmps_90 else None,
},
**athlete_config,
}
(output_dir / "athlete.json").write_text(
json.dumps(athlete, indent=2, ensure_ascii=False)
)
def write_index(summaries: list[dict], output_dir: Path, owner: dict) -> None:
"""Write index.json (sorted newest first)."""
sorted_summaries = sorted(