metrics: replace naive elevation accumulation with hysteresis dead-band
GPS jitter and barometric quantization noise caused systematic overestimation
of elevation gain — in extreme cases 100% of reported gain was sub-1m noise.
Implements source-aware hysteresis: elevation is only committed when it
deviates from the last committed value by ≥5m (barometric) or ≥10m (GPS/GPX/TCX).
- ParsedActivity gains `altitude_source` field ("barometric"/"gps"/"unknown")
- FIT parser sets "barometric" when enhanced_altitude is present, else "gps"
- GPX and TCX parsers always set "gps"
- metrics._elevation() uses the threshold matching the source
- 5 new parametric tests covering flat GPS noise, threshold differences, and real climbs
This commit is contained in:
@@ -70,7 +70,7 @@ def compute(activity: ParsedActivity) -> ComputedMetrics:
|
||||
|
||||
duration_s = _duration(pts)
|
||||
distance_m, moving_time_s, avg_speed_kmh, max_speed_kmh = _gps_stats(pts)
|
||||
gain, loss = _elevation(pts)
|
||||
gain, loss = _elevation(pts, activity.altitude_source)
|
||||
avg_hr, max_hr = _hr_stats(pts)
|
||||
avg_cad = _avg_nonnull([p.cadence_rpm for p in pts])
|
||||
avg_pow = _avg_nonnull([p.power_w for p in pts])
|
||||
@@ -347,17 +347,40 @@ def _duration(pts: list[DataPoint]) -> Optional[int]:
|
||||
return int((pts[-1].timestamp - pts[0].timestamp).total_seconds())
|
||||
|
||||
|
||||
def _elevation(pts: list[DataPoint]) -> tuple[Optional[float], Optional[float]]:
|
||||
# Hysteresis thresholds per altitude source.
|
||||
# Only commit a new elevation when it differs from the last committed value by
|
||||
# at least this amount, filtering out GPS noise and barometric quantization steps.
|
||||
_ELEVATION_THRESHOLD: dict[str, float] = {
|
||||
"barometric": 5.0, # barometric altimeter: smaller steps are real
|
||||
"gps": 10.0, # GPS altitude: noisier, needs wider dead-band
|
||||
"unknown": 10.0, # treat unknown as GPS to be conservative
|
||||
}
|
||||
|
||||
|
||||
def _elevation(
|
||||
pts: list[DataPoint],
|
||||
altitude_source: str = "unknown",
|
||||
) -> tuple[Optional[float], Optional[float]]:
|
||||
"""Hysteresis-based elevation accumulation.
|
||||
|
||||
Only commits a new elevation when it differs from the last committed value
|
||||
by at least the source-specific threshold, filtering GPS jitter and
|
||||
barometric quantization noise that would otherwise inflate the gain figure.
|
||||
"""
|
||||
elevations = [p.elevation_m for p in pts if p.elevation_m is not None]
|
||||
if len(elevations) < 2:
|
||||
return None, None
|
||||
threshold = _ELEVATION_THRESHOLD.get(altitude_source, 10.0)
|
||||
gain = loss = 0.0
|
||||
for a, b in zip(elevations, elevations[1:]):
|
||||
diff = b - a
|
||||
if diff > 0:
|
||||
gain += diff
|
||||
else:
|
||||
loss += diff
|
||||
committed = elevations[0]
|
||||
for e in elevations[1:]:
|
||||
diff = e - committed
|
||||
if abs(diff) >= threshold:
|
||||
if diff > 0:
|
||||
gain += diff
|
||||
else:
|
||||
loss += diff
|
||||
committed = e
|
||||
return gain, loss
|
||||
|
||||
|
||||
|
||||
@@ -57,3 +57,7 @@ class ParsedActivity:
|
||||
strava_id: Optional[str] = None
|
||||
privacy: Optional[str] = None # "public", "private", or None (caller decides)
|
||||
laps: list[LapData] = field(default_factory=list)
|
||||
# "barometric" = device has a barometric altimeter (FIT enhanced_altitude present)
|
||||
# "gps" = altitude derived from GPS triangulation (GPX, TCX, FIT altitude-only)
|
||||
# "unknown" = source not detected (treated as gps for threshold purposes)
|
||||
altitude_source: str = "unknown"
|
||||
|
||||
@@ -20,6 +20,8 @@ class FitParser:
|
||||
sub_sport: str | None = None
|
||||
device: str | None = None
|
||||
|
||||
has_baro_alt = False # True if any record used enhanced_altitude
|
||||
|
||||
with fitdecode.FitReader(io.BytesIO(raw_bytes)) as fit:
|
||||
for frame in fit:
|
||||
if not isinstance(frame, fitdecode.FitDataMessage):
|
||||
@@ -60,7 +62,9 @@ class FitParser:
|
||||
# enhanced_altitude is written by barometric altimeters (most
|
||||
# modern Garmins). Fall back to GPS-derived altitude if absent.
|
||||
_alt = _get(frame, "enhanced_altitude")
|
||||
if _alt is None:
|
||||
if _alt is not None:
|
||||
has_baro_alt = True
|
||||
else:
|
||||
_alt = _get(frame, "altitude")
|
||||
|
||||
dp = DataPoint(
|
||||
@@ -100,6 +104,8 @@ class FitParser:
|
||||
if not points:
|
||||
raise ValueError(f"No record messages found in {path.name}")
|
||||
|
||||
altitude_source = "barometric" if has_baro_alt else "gps"
|
||||
|
||||
return ParsedActivity(
|
||||
points=points,
|
||||
sport=sport,
|
||||
@@ -109,6 +115,7 @@ class FitParser:
|
||||
laps=laps,
|
||||
source_file=path.name,
|
||||
source_hash="",
|
||||
altitude_source=altitude_source,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -53,6 +53,7 @@ class GpxParser(BaseParser):
|
||||
started_at=started_at,
|
||||
source_file=path.name,
|
||||
source_hash="", # set by factory
|
||||
altitude_source="gps",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -83,6 +83,7 @@ class TcxParser:
|
||||
started_at=points[0].timestamp,
|
||||
source_file=path.name,
|
||||
source_hash="",
|
||||
altitude_source="gps",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import pytest
|
||||
from bincio.extract.metrics import (
|
||||
MMP_DURATIONS_S,
|
||||
_best_climb,
|
||||
_elevation,
|
||||
_fastest_time_for_distance,
|
||||
_haversine_m,
|
||||
compute,
|
||||
@@ -126,6 +127,77 @@ def test_compute_no_elevation():
|
||||
assert m.elevation_loss_m is None
|
||||
|
||||
|
||||
# ── elevation hysteresis ──────────────────────────────────────────────────────
|
||||
|
||||
def _ele_pts(elevations: list[float]) -> list[DataPoint]:
|
||||
return [_pt(i, elevation_m=e) for i, e in enumerate(elevations)]
|
||||
|
||||
|
||||
def test_elevation_hysteresis_large_step_always_counted():
|
||||
# A single 50m step is way above any threshold — both sources should count it.
|
||||
pts = _ele_pts([100.0, 150.0])
|
||||
gain_baro, _ = _elevation(pts, "barometric")
|
||||
gain_gps, _ = _elevation(pts, "gps")
|
||||
assert gain_baro == 50.0
|
||||
assert gain_gps == 50.0
|
||||
|
||||
|
||||
def test_elevation_hysteresis_flat_gps_noise_suppressed():
|
||||
# Flat coastal route: 16m of GPS noise oscillating within ±8m.
|
||||
# All steps are sub-1m — hysteresis should return ~0 gain.
|
||||
import math
|
||||
n = 1000
|
||||
elevations = [100.0 + 3.0 * math.sin(i * 0.1) for i in range(n)]
|
||||
pts = _ele_pts(elevations)
|
||||
gain, loss = _elevation(pts, "gps")
|
||||
# With threshold=10m no oscillation within ±3m should ever commit.
|
||||
assert gain == 0.0
|
||||
assert loss == 0.0
|
||||
|
||||
|
||||
def test_elevation_hysteresis_barometric_threshold_lower():
|
||||
# Steps of exactly 7m — above barometric (5m) but below GPS (10m) threshold.
|
||||
elevations = [0.0, 7.0, 0.0, 7.0]
|
||||
pts = _ele_pts(elevations)
|
||||
gain_baro, _ = _elevation(pts, "barometric")
|
||||
gain_gps, _ = _elevation(pts, "gps")
|
||||
assert gain_baro == 14.0 # both 7m steps committed
|
||||
assert gain_gps == 0.0 # 7m < 10m threshold → suppressed
|
||||
|
||||
|
||||
def test_elevation_hysteresis_real_climb_approximated():
|
||||
# Simulate a 200m climb with 0.2m barometric quantization noise.
|
||||
# Build a staircase: 1000 steps, mostly 0.2m up/down noise, with a 200m net climb.
|
||||
import random
|
||||
random.seed(42)
|
||||
elevations = [0.0]
|
||||
for i in range(999):
|
||||
# Mostly quantization noise, but drift upward at 0.2 m/step net
|
||||
step = random.choice([-0.2, 0.0, 0.0, 0.2, 0.2, 0.4])
|
||||
elevations.append(elevations[-1] + step)
|
||||
|
||||
# Force net gain ~200m by scaling
|
||||
scale = 200.0 / (elevations[-1] - elevations[0]) if elevations[-1] != elevations[0] else 1
|
||||
elevations = [e * scale for e in elevations]
|
||||
|
||||
pts = _ele_pts(elevations)
|
||||
gain, _ = _elevation(pts, "barometric")
|
||||
# Hysteresis should produce substantially less than naive accumulation
|
||||
# and land reasonably close to the 200m net climb.
|
||||
assert gain is not None
|
||||
assert gain < 500.0 # not inflated like naive sum
|
||||
assert gain > 100.0 # not zero either — real climbing exists
|
||||
|
||||
|
||||
def test_elevation_hysteresis_unknown_treated_as_gps():
|
||||
# "unknown" should apply the same 10m threshold as "gps"
|
||||
elevations = [0.0, 7.0, 0.0, 7.0] # 7m steps
|
||||
pts = _ele_pts(elevations)
|
||||
gain_unknown, _ = _elevation(pts, "unknown")
|
||||
gain_gps, _ = _elevation(pts, "gps")
|
||||
assert gain_unknown == gain_gps
|
||||
|
||||
|
||||
def test_compute_hr_stats():
|
||||
pts = [
|
||||
_pt(0, lat=48.0, lon=11.0, hr_bpm=120),
|
||||
|
||||
Reference in New Issue
Block a user