Files
Davide Scaini 02edb0b0f9 fix: per-source elevation params — strava_export vs barometric vs raw GPS
Previous thresholds (10 m GPS, 5 m barometric, 30 s MA) were calibrated for
raw noisy GPS. Strava-exported FIT files carry elevation already pre-processed
by Strava (smooth 1 m quantisation, no steps > 5 m), so the aggressive
filtering suppressed real climbing — avg −17 % error across 37 reference
activities.

New strategy, keyed on source + altitude_source:
  strava_export           → MA 5 s, threshold 1.0 m
  fit_file / barometric   → no MA, threshold 1.5 m
  fit_file / gps          → MA 5 s, threshold 2.0 m
  unknown non-strava      → MA 5 s, threshold 1.5 m

Result on 37 cross-referenced activities: avg −2.8 %, std 4.6 %,
37/37 within ±15 % (was 0/37).

Both paths — initial import (metrics._elevation) and bulk recalculate
(dem.recalculate_elevation_hysteresis) — now use the same elevation_params()
function from metrics.py.
2026-05-23 20:12:11 +02:00

663 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Compute aggregated metrics from a ParsedActivity.
All calculations are self-contained — no external state needed.
Uses inline haversine rather than geopy.geodesic to keep the hot path fast.
"""
import math
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from bincio.extract.models import DataPoint, ParsedActivity
# Standard MMP durations (seconds). Log-spaced so the curve looks good on a log-x axis.
MMP_DURATIONS_S = [1, 2, 5, 10, 15, 20, 30, 60, 120, 180, 300, 600, 1200, 1800, 3600]
_VAM_SPORTS = frozenset({"cycling", "running", "hiking", "walking"})
# Standard best-effort distances (km) per sport.
BEST_EFFORT_DISTANCES: dict[str, list[float]] = {
"running": [0.4, 1.0, 1.609, 5.0, 10.0, 21.097, 42.195],
"cycling": [5.0, 10.0, 20.0, 50.0, 100.0],
"swimming": [0.1, 0.2, 0.5, 1.0, 2.0],
"hiking": [], # no sliding-window records; aggregate from summaries only
"walking": [],
"skiing": [],
"other": [],
}
# Speed below which we consider the athlete stopped (km/h)
_STOPPED_THRESHOLD_KMH = 1.0
_EARTH_R = 6_371_000.0 # metres
def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance in metres. ~10x faster than geopy.geodesic."""
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = phi2 - phi1
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi * 0.5) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam * 0.5) ** 2
return 2.0 * _EARTH_R * math.asin(math.sqrt(min(a, 1.0)))
@dataclass
class ComputedMetrics:
distance_m: Optional[float]
duration_s: Optional[int]
moving_time_s: Optional[int]
elevation_gain_m: Optional[float]
elevation_loss_m: Optional[float]
avg_speed_kmh: Optional[float]
max_speed_kmh: Optional[float]
avg_hr_bpm: Optional[int]
max_hr_bpm: Optional[int]
avg_cadence_rpm: Optional[int]
avg_power_w: Optional[int]
np_power_w: Optional[int]
max_power_w: Optional[int]
bbox: Optional[tuple[float, float, float, float]] # min_lon, min_lat, max_lon, max_lat
start_latlng: Optional[tuple[float, float]]
end_latlng: Optional[tuple[float, float]]
mmp: Optional[list[list[int]]] # [[duration_s, avg_watts], ...] — None if no power data
# [[distance_km, time_s], ...] sorted by distance — None if sport has no distance targets
best_efforts: Optional[list[list[float]]]
best_climb_m: Optional[float] # max net elevation gain in one contiguous window (cycling only)
climbing_vam_mh: Optional[int] # average VAM on ascending segments only (m/h)
climbing_time_s: Optional[int] # total ascending seconds used to compute VAM
def compute(activity: ParsedActivity) -> ComputedMetrics:
pts = activity.points
if not pts:
return _empty()
duration_s = _duration(pts)
distance_m, moving_time_s, avg_speed_kmh, max_speed_kmh = _gps_stats(pts)
inferred_source = "strava_export" if activity.strava_id else ""
gain, loss = _elevation(pts, activity.altitude_source, inferred_source)
avg_hr, max_hr = _hr_stats(pts)
avg_cad = _avg_nonnull([p.cadence_rpm for p in pts])
avg_pow = _avg_nonnull([p.power_w for p in pts])
np_pow = _np_power(pts, activity.started_at)
max_pow = _max_nonnull([p.power_w for p in pts])
bbox = _bbox(pts)
start_ll, end_ll = _endpoints(pts)
mmp = compute_mmp(pts, activity.started_at)
best_efforts, best_climb_m = compute_best_efforts(pts, activity.started_at, activity.sport)
_vam = compute_vam(pts, activity.started_at, activity.sport)
climbing_vam_mh, climbing_time_s = _vam if _vam else (None, None)
return ComputedMetrics(
distance_m=distance_m,
duration_s=duration_s,
moving_time_s=moving_time_s,
elevation_gain_m=round(gain, 1) if gain is not None else None,
elevation_loss_m=round(abs(loss), 1) if loss is not None else None,
avg_speed_kmh=round(avg_speed_kmh, 2) if avg_speed_kmh is not None else None,
max_speed_kmh=round(max_speed_kmh, 2) if max_speed_kmh is not None else None,
avg_hr_bpm=avg_hr,
max_hr_bpm=max_hr,
avg_cadence_rpm=avg_cad,
avg_power_w=avg_pow,
np_power_w=np_pow,
max_power_w=max_pow,
bbox=bbox,
start_latlng=start_ll,
end_latlng=end_ll,
mmp=mmp,
best_efforts=best_efforts,
best_climb_m=best_climb_m,
climbing_vam_mh=climbing_vam_mh,
climbing_time_s=climbing_time_s,
)
# ── mean maximal power ────────────────────────────────────────────────────────
def compute_mmp(pts: list[DataPoint], started_at: datetime) -> Optional[list[list[int]]]:
"""Compute Mean Maximal Power curve at the standard MMP_DURATIONS_S.
Builds a 1 Hz power series (same downsampling as timeseries.py), then uses
a O(n) sliding-window sum for each duration. Returns a list of
[duration_s, avg_watts] pairs (integers), or None when the activity has no
power data. Only durations shorter than the total activity are included.
"""
# Build a dense 1 Hz power array with gaps zero-filled.
# Zero-filling is the standard approach (matches GoldenCheetah / WKO):
# a recording gap counts as 0 W so windows cannot silently span pauses
# and inflate MMP values.
sparse: dict[int, int] = {}
last_t = -1
for p in pts:
t = int((p.timestamp - started_at).total_seconds())
if t < 0 or t == last_t:
continue
last_t = t
if p.power_w is not None:
sparse[t] = p.power_w
if len(sparse) < 2:
return None
t_min = min(sparse)
t_max = max(sparse)
# Guard against corrupted time data (e.g. absolute Unix timestamps stored as
# elapsed offsets, which can make t_max astronomically large and OOM the process).
if t_max - t_min > 7 * 24 * 3600: # > 1 week → corrupted stream
return None
power_1hz: list[int] = [sparse.get(t, 0) for t in range(t_min, t_max + 1)]
n = len(power_1hz)
results: list[list[int]] = []
for d in MMP_DURATIONS_S:
if d > n:
break # activity shorter than this duration — stop (durations are sorted)
# Sliding window of exactly d samples = d seconds at 1 Hz.
window_sum = sum(power_1hz[:d])
best = window_sum
for i in range(1, n - d + 1):
window_sum += power_1hz[i + d - 1] - power_1hz[i - 1]
if window_sum > best:
best = window_sum
results.append([d, round(best / d)])
return results if results else None
# ── VAM (Velocità Ascensionale Media) ────────────────────────────────────────
def _rolling_mean_ele(data: list[float], win: int) -> list[float]:
"""O(n) rolling mean via prefix sums."""
n = len(data)
prefix = [0.0] * (n + 1)
for i, v in enumerate(data):
prefix[i + 1] = prefix[i] + v
half = win // 2
result = []
for i in range(n):
lo = max(0, i - half)
hi = min(n, i + half + 1)
result.append((prefix[hi] - prefix[lo]) / (hi - lo))
return result
def _vam_from_ele_1hz(ele_1hz: list[float]) -> Optional[tuple[int, int]]:
"""Climbing VAM from a dense 1 Hz elevation array.
Accumulates gain and time only on ascending seconds, identified by a 30 s
forward-lookahead on the smoothed elevation signal.
Returns (climbing_vam_mh, climbing_time_s), or None when there is too little
climbing data.
"""
n = len(ele_1hz)
if n < 60:
return None
ele_smooth = _rolling_mean_ele(ele_1hz, 30)
climbing_gain = 0.0
climbing_time = 0
for i in range(n - 1):
look = min(i + 30, n - 1)
if ele_smooth[look] - ele_smooth[i] >= 2.0:
inst = ele_smooth[i + 1] - ele_smooth[i]
if inst > 0:
climbing_gain += inst
climbing_time += 1
if climbing_time >= 60 and climbing_gain >= 5.0:
return round(climbing_gain * 3600.0 / climbing_time), climbing_time
return None
def _build_ele_1hz(sparse: dict[int, Optional[float]]) -> Optional[list[float]]:
"""Build a dense 1 Hz elevation array from a {t: ele} sparse dict, forward-filling gaps."""
if not sparse:
return None
t_min = min(sparse)
t_max = max(sparse)
if t_max - t_min > 7 * 24 * 3600:
return None
ele_raw: list[Optional[float]] = []
last_known: Optional[float] = None
for t in range(t_min, t_max + 1):
v = sparse.get(t)
if v is not None:
last_known = v
ele_raw.append(last_known)
if sum(1 for e in ele_raw if e is not None) < 60:
return None
first_valid = next((e for e in ele_raw if e is not None), None)
if first_valid is None:
return None
return [e if e is not None else first_valid for e in ele_raw]
def compute_vam(pts: list[DataPoint], started_at: datetime, sport: str) -> Optional[tuple[int, int]]:
"""Compute average climbing VAM (m/h) from DataPoints.
Only computed for cycling, running, hiking, walking.
Returns (climbing_vam_mh, climbing_time_s), or None when there is insufficient
climbing data.
"""
if sport not in _VAM_SPORTS:
return None
sparse: dict[int, Optional[float]] = {}
last_t = -1
for p in pts:
t = int((p.timestamp - started_at).total_seconds())
if t < 0 or t == last_t:
continue
sparse[t] = p.elevation_m
last_t = t
ele_1hz = _build_ele_1hz(sparse)
if ele_1hz is None:
return None
return _vam_from_ele_1hz(ele_1hz)
# ── best efforts & best climb ─────────────────────────────────────────────────
def compute_best_efforts(
pts: list[DataPoint],
started_at: datetime,
sport: str,
) -> tuple[Optional[list[list[float]]], Optional[float]]:
"""Return (best_efforts, best_climb_m) for this activity.
best_efforts: [[distance_km, time_s], ...] — one entry per target distance
where the activity was long enough to contain that effort.
best_climb_m: maximum net elevation gain over any contiguous window (cycling).
Both use the same 1 Hz downsampled series as the timeseries writer.
"""
targets = BEST_EFFORT_DISTANCES.get(sport, [])
# Build dense 1 Hz speed (km/h) and elevation (m) arrays with gap zero-filling.
# Zero-filling speed gaps (0 km/h) prevents best-effort windows from spanning
# recording pauses and producing artificially fast times.
# When the device didn't record speed (common in older FIT files), fall back to
# GPS-derived speed: spread the haversine segment speed evenly across the interval
# so the sliding window accumulates the correct distance.
sparse_speed: dict[int, float] = {}
sparse_ele: dict[int, Optional[float]] = {}
last_t = -1
_prev: Optional[DataPoint] = None
for p in pts:
t = int((p.timestamp - started_at).total_seconds())
if t < 0 or t == last_t:
continue
sparse_ele[t] = p.elevation_m
if p.speed_kmh is not None:
sparse_speed[t] = p.speed_kmh
elif (_prev is not None
and _prev.lat is not None and _prev.lon is not None
and p.lat is not None and p.lon is not None):
dt_s = t - last_t
seg_m = _haversine_m(_prev.lat, _prev.lon, p.lat, p.lon)
seg_kmh = (seg_m / dt_s) * 3.6
for slot in range(last_t, t):
sparse_speed[slot] = seg_kmh
else:
sparse_speed[t] = 0.0
last_t = t
_prev = p
if not sparse_speed:
return None, None
t_min = min(sparse_speed)
t_max = max(sparse_speed)
# Guard against corrupted time data (e.g. absolute Unix timestamps stored as
# elapsed offsets, which can make t_max astronomically large and OOM the process).
if t_max - t_min > 7 * 24 * 3600: # > 1 week → corrupted stream
return None, None
speed_1hz: list[float] = [sparse_speed.get(t, 0.0) for t in range(t_min, t_max + 1)]
ele_1hz: list[Optional[float]] = [sparse_ele.get(t) for t in range(t_min, t_max + 1)]
best_efforts: Optional[list[list[float]]] = None
if targets and speed_1hz:
results = []
for d_km in targets:
t_s = _fastest_time_for_distance(speed_1hz, d_km)
if t_s is not None:
results.append([d_km, t_s])
best_efforts = results if results else None
best_climb_m: Optional[float] = None
if sport == "cycling":
# Use cumulative device distance as the x-axis so recording pauses
# (where distance doesn't increase) don't create gaps that reset the window.
# Fall back to elapsed-time ordering when no device distance is recorded.
dist_ele = sorted(
(p.distance_m, p.elevation_m)
for p in pts
if p.distance_m is not None and p.elevation_m is not None
)
if not dist_ele:
dist_ele = sorted(
(int((p.timestamp - started_at).total_seconds()), p.elevation_m)
for p in pts
if p.elevation_m is not None
and int((p.timestamp - started_at).total_seconds()) >= 0
)
if len(dist_ele) >= 2:
best_climb_m = _best_climb(dist_ele)
return best_efforts, best_climb_m
def _fastest_time_for_distance(speed_1hz: list[float], target_km: float) -> Optional[int]:
"""Minimum number of seconds to cover target_km using a two-pointer sliding window.
Each sample contributes speed_kmh / 3600 km (one second at that speed).
Nulls/zeros extend the window without adding distance — naturally deprioritised.
"""
n = len(speed_1hz)
left = 0
window_dist = 0.0
best_s: Optional[int] = None
for right in range(n):
window_dist += speed_1hz[right] / 3600.0
# Shrink from the left while we still cover the target
while window_dist >= target_km and left <= right:
window_s = right - left + 1
if best_s is None or window_s < best_s:
best_s = window_s
window_dist -= speed_1hz[left] / 3600.0
left += 1
return best_s
def _best_climb(pts_sorted: list[tuple[float, float]]) -> Optional[float]:
"""Maximum net elevation gain over any contiguous uphill window (Kadane's).
pts_sorted: list of (x, elevation_m) pairs sorted by x, where x is
cumulative distance (m) or elapsed time (s). Using cumulative distance
means recording pauses (x doesn't increase while stopped) don't create
gaps that falsely reset the climbing window.
"""
if len(pts_sorted) < 2:
return None
max_gain = 0.0
current = 0.0
prev_e = pts_sorted[0][1]
for _, e in pts_sorted[1:]:
current = max(0.0, current + (e - prev_e))
if current > max_gain:
max_gain = current
prev_e = e
return round(max_gain, 1) if max_gain > 0 else None
# ── single-pass GPS stats ──────────────────────────────────────────────────────
# distance, moving time, avg speed, and max speed are all derived from the same
# per-segment loop, so we compute them in one pass instead of four.
def _gps_stats(
pts: list[DataPoint],
) -> tuple[Optional[float], Optional[int], Optional[float], Optional[float]]:
"""Return (distance_m, moving_time_s, avg_speed_kmh, max_speed_kmh)."""
# Prefer device-recorded cumulative distance (FIT files always have this)
device_dist = next(
(p.distance_m for p in reversed(pts) if p.distance_m is not None), None
)
moving_s = 0
moving_dist_m = 0.0
total_dist_m = 0.0
max_seg_kmh = 0.0
has_data = False
# Device speed values (used for max if present)
device_max_kmh: Optional[float] = None
if any(p.speed_kmh is not None for p in pts):
device_max_kmh = max(p.speed_kmh for p in pts if p.speed_kmh is not None)
for a, b in zip(pts, pts[1:]):
dt = (b.timestamp - a.timestamp).total_seconds()
if dt <= 0:
continue
if a.lat is not None and a.lon is not None and b.lat is not None and b.lon is not None:
seg_m = _haversine_m(a.lat, a.lon, b.lat, b.lon)
seg_kmh = (seg_m / dt) * 3.6
has_data = True
elif a.speed_kmh is not None:
seg_kmh = a.speed_kmh
seg_m = (seg_kmh / 3.6) * dt
has_data = True
else:
continue
total_dist_m += seg_m
if seg_kmh > max_seg_kmh:
max_seg_kmh = seg_kmh
if seg_kmh >= _STOPPED_THRESHOLD_KMH:
moving_s += int(dt)
moving_dist_m += seg_m
if not has_data:
return device_dist, None, None, None
# Fall back to haversine distance if device recorded 0 but we computed real GPS distance
if device_dist is not None and device_dist > 0:
distance_m = device_dist
else:
distance_m = round(total_dist_m, 1) if total_dist_m > 0 else device_dist
moving_time_s = moving_s if moving_s > 0 else None
avg_speed_kmh = (moving_dist_m / moving_s) * 3.6 if moving_s > 0 else None
# Prefer device speed for max (more stable than GPS-derived per-second spikes)
max_speed_kmh = device_max_kmh if device_max_kmh is not None else (
max_seg_kmh if max_seg_kmh > 0 else None
)
return distance_m, moving_time_s, avg_speed_kmh, max_speed_kmh
# ── remaining helpers ──────────────────────────────────────────────────────────
def _duration(pts: list[DataPoint]) -> Optional[int]:
if len(pts) < 2:
return None
return int((pts[-1].timestamp - pts[0].timestamp).total_seconds())
def elevation_params(altitude_source: str, source: str = "") -> tuple[int, float]:
"""Return (ma_window_s, threshold_m) for elevation gain/loss computation.
Tuned on 37 activities cross-referenced against Strava-reported elevation:
strava_export — elevation already pre-processed by Strava (smooth 1 m
quantisation, 0 steps > 5 m). Light 5 s MA + 1.0 m
threshold gives avg 2.8 %, std 4.8 %, 34/37 within ±10 %.
barometric — raw barometric altimeter from a FIT file. No smoothing
needed; 1.5 m threshold gives ~0 % error on available data.
gps / unknown — raw GPS or unidentified non-Strava source. Light 5 s MA
+ 1.52.0 m threshold suppresses GPS jitter while keeping
real terrain changes.
"""
if source == "strava_export":
return (5, 1.0)
if altitude_source == "barometric":
return (0, 1.5)
if altitude_source == "gps":
return (5, 2.0)
return (5, 1.5) # unknown non-strava: conservative middle ground
def _ele_moving_average(values: list[float], window: int) -> list[float]:
if window <= 1:
return list(values)
half = window // 2
n = len(values)
cumsum = [0.0] * (n + 1)
for i, v in enumerate(values):
cumsum[i + 1] = cumsum[i] + v
return [
(cumsum[min(n, i + half + 1)] - cumsum[max(0, i - half)])
/ (min(n, i + half + 1) - max(0, i - half))
for i in range(n)
]
def _elevation(
pts: list[DataPoint],
altitude_source: str = "unknown",
source: str = "",
) -> tuple[Optional[float], Optional[float]]:
"""Hysteresis-based elevation accumulation.
Applies a short moving-average pre-smoothing then commits a new elevation
level only when it differs from the last committed value by at least the
source-specific threshold. Parameters are chosen per data source via
:func:`elevation_params`.
"""
elevations = [p.elevation_m for p in pts if p.elevation_m is not None]
if len(elevations) < 2:
return None, None
ma_window, threshold = elevation_params(altitude_source, source)
# Some devices (e.g. Apple Watch) record exactly 0.0 for the initial samples
# while waiting for barometric/GPS lock, then jump to the real altitude.
# Only activate when there are at least 2 consecutive near-zero leading
# values — a single 0.0 is a legitimate sea-level starting point.
start = 0
if abs(elevations[0]) < 0.5:
n_leading = 0
for e in elevations:
if abs(e) < 0.5:
n_leading += 1
else:
break
if n_leading > 1:
for i, e in enumerate(elevations):
if abs(e) > threshold:
start = i
break
elevations = _ele_moving_average(elevations[start:], ma_window)
gain = loss = 0.0
committed = elevations[0]
for e in elevations[1:]:
# Skip near-zero values that appear mid-recording while we are at a
# significant elevation — these are sensor dropouts (device lost GPS/
# barometric lock), not genuine sea-level crossings.
if abs(e) < 1.0 and abs(committed) > threshold:
continue
diff = e - committed
if abs(diff) >= threshold:
if diff > 0:
gain += diff
else:
loss += diff
committed = e
return gain, loss
def _hr_stats(pts: list[DataPoint]) -> tuple[Optional[int], Optional[int]]:
hrs = [p.hr_bpm for p in pts if p.hr_bpm is not None]
if not hrs:
return None, None
return int(sum(hrs) / len(hrs)), max(hrs)
def _avg_nonnull(values: list) -> Optional[int]:
v = [x for x in values if x is not None]
return int(sum(v) / len(v)) if v else None
def _max_nonnull(values: list) -> Optional[int]:
v = [x for x in values if x is not None]
return max(v) if v else None
def _np_power(pts: list[DataPoint], started_at: datetime) -> Optional[int]:
"""Normalized power (Coggan method): 30 s rolling average → 4th power → mean → 4th root.
Uses a dense 1 Hz series (gaps zero-filled) identical to the MMP pipeline.
Returns None when the activity has no power data or is shorter than 30 s.
"""
sparse: dict[int, int] = {}
last_t = -1
for p in pts:
t = int((p.timestamp - started_at).total_seconds())
if t < 0 or t == last_t:
continue
last_t = t
if p.power_w is not None:
sparse[t] = p.power_w
if len(sparse) < 2:
return None
t_min, t_max = min(sparse), max(sparse)
if t_max - t_min > 7 * 24 * 3600:
return None
power_1hz = [sparse.get(t, 0) for t in range(t_min, t_max + 1)]
n = len(power_1hz)
win = 30
if n < win:
return None
# 30 s centred rolling mean, then raise to 4th power
half = win // 2
total = sum(power_1hz[:win])
fourth_powers: list[float] = []
for i in range(half, n - half):
avg = total / win
fourth_powers.append(avg ** 4)
if i + half + 1 < n:
total += power_1hz[i + half + 1] - power_1hz[i - half]
if not fourth_powers:
return None
return int(round((sum(fourth_powers) / len(fourth_powers)) ** 0.25))
def _bbox(pts: list[DataPoint]) -> Optional[tuple[float, float, float, float]]:
lats = [p.lat for p in pts if p.lat is not None]
lons = [p.lon for p in pts if p.lon is not None]
if not lats:
return None
return (min(lons), min(lats), max(lons), max(lats))
def _endpoints(
pts: list[DataPoint],
) -> tuple[Optional[tuple[float, float]], Optional[tuple[float, float]]]:
gps = [(p.lat, p.lon) for p in pts if p.lat is not None and p.lon is not None]
if not gps:
return None, None
return gps[0], gps[-1]
def _empty() -> ComputedMetrics:
return ComputedMetrics(
distance_m=None, duration_s=None, moving_time_s=None,
elevation_gain_m=None, elevation_loss_m=None,
avg_speed_kmh=None, max_speed_kmh=None,
avg_hr_bpm=None, max_hr_bpm=None,
avg_cadence_rpm=None, avg_power_w=None, np_power_w=None, max_power_w=None,
bbox=None, start_latlng=None, end_latlng=None,
mmp=None, best_efforts=None, best_climb_m=None,
climbing_vam_mh=None, climbing_time_s=None,
)