From 872651f4714b4754992beb75c8d528cc7b0da263 Mon Sep 17 00:00:00 2001 From: Davide Scaini Date: Mon, 20 Apr 2026 20:29:20 +0200 Subject: [PATCH] metrics: replace naive elevation accumulation with hysteresis dead-band MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GPS jitter and barometric quantization noise caused systematic overestimation of elevation gain — in extreme cases 100% of reported gain was sub-1m noise. Implements source-aware hysteresis: elevation is only committed when it deviates from the last committed value by ≥5m (barometric) or ≥10m (GPS/GPX/TCX). - ParsedActivity gains `altitude_source` field ("barometric"/"gps"/"unknown") - FIT parser sets "barometric" when enhanced_altitude is present, else "gps" - GPX and TCX parsers always set "gps" - metrics._elevation() uses the threshold matching the source - 5 new parametric tests covering flat GPS noise, threshold differences, and real climbs --- bincio/extract/metrics.py | 39 +++++++++++++++---- bincio/extract/models.py | 4 ++ bincio/extract/parsers/fit.py | 9 ++++- bincio/extract/parsers/gpx.py | 1 + bincio/extract/parsers/tcx.py | 1 + tests/test_metrics.py | 72 +++++++++++++++++++++++++++++++++++ 6 files changed, 117 insertions(+), 9 deletions(-) diff --git a/bincio/extract/metrics.py b/bincio/extract/metrics.py index d5bb97a..cc0072a 100644 --- a/bincio/extract/metrics.py +++ b/bincio/extract/metrics.py @@ -70,7 +70,7 @@ def compute(activity: ParsedActivity) -> ComputedMetrics: duration_s = _duration(pts) distance_m, moving_time_s, avg_speed_kmh, max_speed_kmh = _gps_stats(pts) - gain, loss = _elevation(pts) + gain, loss = _elevation(pts, activity.altitude_source) avg_hr, max_hr = _hr_stats(pts) avg_cad = _avg_nonnull([p.cadence_rpm for p in pts]) avg_pow = _avg_nonnull([p.power_w for p in pts]) @@ -347,17 +347,40 @@ def _duration(pts: list[DataPoint]) -> Optional[int]: return int((pts[-1].timestamp - pts[0].timestamp).total_seconds()) -def _elevation(pts: list[DataPoint]) -> tuple[Optional[float], Optional[float]]: +# Hysteresis thresholds per altitude source. +# Only commit a new elevation when it differs from the last committed value by +# at least this amount, filtering out GPS noise and barometric quantization steps. +_ELEVATION_THRESHOLD: dict[str, float] = { + "barometric": 5.0, # barometric altimeter: smaller steps are real + "gps": 10.0, # GPS altitude: noisier, needs wider dead-band + "unknown": 10.0, # treat unknown as GPS to be conservative +} + + +def _elevation( + pts: list[DataPoint], + altitude_source: str = "unknown", +) -> tuple[Optional[float], Optional[float]]: + """Hysteresis-based elevation accumulation. + + Only commits a new elevation when it differs from the last committed value + by at least the source-specific threshold, filtering GPS jitter and + barometric quantization noise that would otherwise inflate the gain figure. + """ elevations = [p.elevation_m for p in pts if p.elevation_m is not None] if len(elevations) < 2: return None, None + threshold = _ELEVATION_THRESHOLD.get(altitude_source, 10.0) gain = loss = 0.0 - for a, b in zip(elevations, elevations[1:]): - diff = b - a - if diff > 0: - gain += diff - else: - loss += diff + committed = elevations[0] + for e in elevations[1:]: + diff = e - committed + if abs(diff) >= threshold: + if diff > 0: + gain += diff + else: + loss += diff + committed = e return gain, loss diff --git a/bincio/extract/models.py b/bincio/extract/models.py index b5821b0..8ea3004 100644 --- a/bincio/extract/models.py +++ b/bincio/extract/models.py @@ -57,3 +57,7 @@ class ParsedActivity: strava_id: Optional[str] = None privacy: Optional[str] = None # "public", "private", or None (caller decides) laps: list[LapData] = field(default_factory=list) + # "barometric" = device has a barometric altimeter (FIT enhanced_altitude present) + # "gps" = altitude derived from GPS triangulation (GPX, TCX, FIT altitude-only) + # "unknown" = source not detected (treated as gps for threshold purposes) + altitude_source: str = "unknown" diff --git a/bincio/extract/parsers/fit.py b/bincio/extract/parsers/fit.py index 438fa21..c723d4a 100644 --- a/bincio/extract/parsers/fit.py +++ b/bincio/extract/parsers/fit.py @@ -20,6 +20,8 @@ class FitParser: sub_sport: str | None = None device: str | None = None + has_baro_alt = False # True if any record used enhanced_altitude + with fitdecode.FitReader(io.BytesIO(raw_bytes)) as fit: for frame in fit: if not isinstance(frame, fitdecode.FitDataMessage): @@ -60,7 +62,9 @@ class FitParser: # enhanced_altitude is written by barometric altimeters (most # modern Garmins). Fall back to GPS-derived altitude if absent. _alt = _get(frame, "enhanced_altitude") - if _alt is None: + if _alt is not None: + has_baro_alt = True + else: _alt = _get(frame, "altitude") dp = DataPoint( @@ -100,6 +104,8 @@ class FitParser: if not points: raise ValueError(f"No record messages found in {path.name}") + altitude_source = "barometric" if has_baro_alt else "gps" + return ParsedActivity( points=points, sport=sport, @@ -109,6 +115,7 @@ class FitParser: laps=laps, source_file=path.name, source_hash="", + altitude_source=altitude_source, ) diff --git a/bincio/extract/parsers/gpx.py b/bincio/extract/parsers/gpx.py index 9208e87..e17449c 100644 --- a/bincio/extract/parsers/gpx.py +++ b/bincio/extract/parsers/gpx.py @@ -53,6 +53,7 @@ class GpxParser(BaseParser): started_at=started_at, source_file=path.name, source_hash="", # set by factory + altitude_source="gps", ) diff --git a/bincio/extract/parsers/tcx.py b/bincio/extract/parsers/tcx.py index 5d52c2e..1c60f2b 100644 --- a/bincio/extract/parsers/tcx.py +++ b/bincio/extract/parsers/tcx.py @@ -83,6 +83,7 @@ class TcxParser: started_at=points[0].timestamp, source_file=path.name, source_hash="", + altitude_source="gps", ) diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 7393679..0a184bd 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -8,6 +8,7 @@ import pytest from bincio.extract.metrics import ( MMP_DURATIONS_S, _best_climb, + _elevation, _fastest_time_for_distance, _haversine_m, compute, @@ -126,6 +127,77 @@ def test_compute_no_elevation(): assert m.elevation_loss_m is None +# ── elevation hysteresis ────────────────────────────────────────────────────── + +def _ele_pts(elevations: list[float]) -> list[DataPoint]: + return [_pt(i, elevation_m=e) for i, e in enumerate(elevations)] + + +def test_elevation_hysteresis_large_step_always_counted(): + # A single 50m step is way above any threshold — both sources should count it. + pts = _ele_pts([100.0, 150.0]) + gain_baro, _ = _elevation(pts, "barometric") + gain_gps, _ = _elevation(pts, "gps") + assert gain_baro == 50.0 + assert gain_gps == 50.0 + + +def test_elevation_hysteresis_flat_gps_noise_suppressed(): + # Flat coastal route: 16m of GPS noise oscillating within ±8m. + # All steps are sub-1m — hysteresis should return ~0 gain. + import math + n = 1000 + elevations = [100.0 + 3.0 * math.sin(i * 0.1) for i in range(n)] + pts = _ele_pts(elevations) + gain, loss = _elevation(pts, "gps") + # With threshold=10m no oscillation within ±3m should ever commit. + assert gain == 0.0 + assert loss == 0.0 + + +def test_elevation_hysteresis_barometric_threshold_lower(): + # Steps of exactly 7m — above barometric (5m) but below GPS (10m) threshold. + elevations = [0.0, 7.0, 0.0, 7.0] + pts = _ele_pts(elevations) + gain_baro, _ = _elevation(pts, "barometric") + gain_gps, _ = _elevation(pts, "gps") + assert gain_baro == 14.0 # both 7m steps committed + assert gain_gps == 0.0 # 7m < 10m threshold → suppressed + + +def test_elevation_hysteresis_real_climb_approximated(): + # Simulate a 200m climb with 0.2m barometric quantization noise. + # Build a staircase: 1000 steps, mostly 0.2m up/down noise, with a 200m net climb. + import random + random.seed(42) + elevations = [0.0] + for i in range(999): + # Mostly quantization noise, but drift upward at 0.2 m/step net + step = random.choice([-0.2, 0.0, 0.0, 0.2, 0.2, 0.4]) + elevations.append(elevations[-1] + step) + + # Force net gain ~200m by scaling + scale = 200.0 / (elevations[-1] - elevations[0]) if elevations[-1] != elevations[0] else 1 + elevations = [e * scale for e in elevations] + + pts = _ele_pts(elevations) + gain, _ = _elevation(pts, "barometric") + # Hysteresis should produce substantially less than naive accumulation + # and land reasonably close to the 200m net climb. + assert gain is not None + assert gain < 500.0 # not inflated like naive sum + assert gain > 100.0 # not zero either — real climbing exists + + +def test_elevation_hysteresis_unknown_treated_as_gps(): + # "unknown" should apply the same 10m threshold as "gps" + elevations = [0.0, 7.0, 0.0, 7.0] # 7m steps + pts = _ele_pts(elevations) + gain_unknown, _ = _elevation(pts, "unknown") + gain_gps, _ = _elevation(pts, "gps") + assert gain_unknown == gain_gps + + def test_compute_hr_stats(): pts = [ _pt(0, lat=48.0, lon=11.0, hr_bpm=120),