Files
bincio-activity/tests/test_metrics.py
T
Davide Scaini 872651f471 metrics: replace naive elevation accumulation with hysteresis dead-band
GPS jitter and barometric quantization noise caused systematic overestimation
of elevation gain — in extreme cases 100% of reported gain was sub-1m noise.

Implements source-aware hysteresis: elevation is only committed when it
deviates from the last committed value by ≥5m (barometric) or ≥10m (GPS/GPX/TCX).

- ParsedActivity gains `altitude_source` field ("barometric"/"gps"/"unknown")
- FIT parser sets "barometric" when enhanced_altitude is present, else "gps"
- GPX and TCX parsers always set "gps"
- metrics._elevation() uses the threshold matching the source
- 5 new parametric tests covering flat GPS noise, threshold differences, and real climbs
2026-04-20 20:29:20 +02:00

383 lines
13 KiB
Python

"""Tests for bincio.extract.metrics."""
import math
from datetime import datetime, timezone
import pytest
from bincio.extract.metrics import (
MMP_DURATIONS_S,
_best_climb,
_elevation,
_fastest_time_for_distance,
_haversine_m,
compute,
compute_best_efforts,
compute_mmp,
)
from bincio.extract.models import DataPoint, ParsedActivity
# ── Helpers ───────────────────────────────────────────────────────────────────
def _ts(offset_s: int) -> datetime:
from datetime import timedelta
return datetime(2024, 6, 1, 8, 0, 0, tzinfo=timezone.utc) + timedelta(seconds=offset_s)
def _pt(offset_s: int, **kw) -> DataPoint:
return DataPoint(timestamp=_ts(offset_s), **kw)
def _activity(points: list[DataPoint], sport: str = "cycling") -> ParsedActivity:
return ParsedActivity(
points=points,
sport=sport,
started_at=_ts(0),
source_file="test.fit",
source_hash="sha256:abc",
)
# ── haversine ─────────────────────────────────────────────────────────────────
def test_haversine_same_point():
assert _haversine_m(48.0, 11.0, 48.0, 11.0) == 0.0
def test_haversine_known_distance():
# London (51.5074, -0.1278) to Paris (48.8566, 2.3522) ≈ 343 km
d = _haversine_m(51.5074, -0.1278, 48.8566, 2.3522)
assert 340_000 < d < 347_000
def test_haversine_symmetry():
a = _haversine_m(48.0, 11.0, 48.1, 11.1)
b = _haversine_m(48.1, 11.1, 48.0, 11.0)
assert abs(a - b) < 1e-6
def test_haversine_short_segment():
# ~111 m per 0.001 degrees latitude at equator
d = _haversine_m(0.0, 0.0, 0.001, 0.0)
assert 110 < d < 112
# ── compute() ─────────────────────────────────────────────────────────────────
def test_compute_empty_activity():
m = compute(_activity([]))
assert m.distance_m is None
assert m.duration_s is None
assert m.elevation_gain_m is None
def test_compute_duration():
pts = [_pt(0, lat=48.0, lon=11.0), _pt(3600, lat=48.1, lon=11.1)]
m = compute(_activity(pts))
assert m.duration_s == 3600
def test_compute_gps_distance():
# Two points ~111 m apart (0.001° lat), 10 s apart
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
m = compute(_activity(pts))
assert m.distance_m is not None
assert 100 < m.distance_m < 120
def test_compute_device_distance_preferred():
# Device reports a different cumulative distance — it should be used.
pts = [
_pt(0, lat=48.0, lon=11.0, distance_m=0.0),
_pt(10, lat=48.001, lon=11.0, distance_m=500.0),
]
m = compute(_activity(pts))
assert m.distance_m == 500.0
def test_compute_moving_time_excludes_stops():
# Three segments: moving, stopped, moving
pts = [
_pt(0, lat=48.0, lon=11.0),
_pt(10, lat=48.001, lon=11.0), # ~111 m in 10 s → moving
_pt(70, lat=48.001, lon=11.0), # 0 m in 60 s → stopped
_pt(80, lat=48.002, lon=11.0), # ~111 m in 10 s → moving
]
m = compute(_activity(pts))
assert m.moving_time_s is not None
assert m.moving_time_s < m.duration_s # stopped time excluded
def test_compute_elevation_gain():
pts = [
_pt(0, lat=48.0, lon=11.0, elevation_m=100.0),
_pt(10, lat=48.001, lon=11.0, elevation_m=150.0),
_pt(20, lat=48.002, lon=11.0, elevation_m=120.0),
]
m = compute(_activity(pts))
assert m.elevation_gain_m == 50.0
assert m.elevation_loss_m == 30.0
def test_compute_no_elevation():
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
m = compute(_activity(pts))
assert m.elevation_gain_m is None
assert m.elevation_loss_m is None
# ── elevation hysteresis ──────────────────────────────────────────────────────
def _ele_pts(elevations: list[float]) -> list[DataPoint]:
return [_pt(i, elevation_m=e) for i, e in enumerate(elevations)]
def test_elevation_hysteresis_large_step_always_counted():
# A single 50m step is way above any threshold — both sources should count it.
pts = _ele_pts([100.0, 150.0])
gain_baro, _ = _elevation(pts, "barometric")
gain_gps, _ = _elevation(pts, "gps")
assert gain_baro == 50.0
assert gain_gps == 50.0
def test_elevation_hysteresis_flat_gps_noise_suppressed():
# Flat coastal route: 16m of GPS noise oscillating within ±8m.
# All steps are sub-1m — hysteresis should return ~0 gain.
import math
n = 1000
elevations = [100.0 + 3.0 * math.sin(i * 0.1) for i in range(n)]
pts = _ele_pts(elevations)
gain, loss = _elevation(pts, "gps")
# With threshold=10m no oscillation within ±3m should ever commit.
assert gain == 0.0
assert loss == 0.0
def test_elevation_hysteresis_barometric_threshold_lower():
# Steps of exactly 7m — above barometric (5m) but below GPS (10m) threshold.
elevations = [0.0, 7.0, 0.0, 7.0]
pts = _ele_pts(elevations)
gain_baro, _ = _elevation(pts, "barometric")
gain_gps, _ = _elevation(pts, "gps")
assert gain_baro == 14.0 # both 7m steps committed
assert gain_gps == 0.0 # 7m < 10m threshold → suppressed
def test_elevation_hysteresis_real_climb_approximated():
# Simulate a 200m climb with 0.2m barometric quantization noise.
# Build a staircase: 1000 steps, mostly 0.2m up/down noise, with a 200m net climb.
import random
random.seed(42)
elevations = [0.0]
for i in range(999):
# Mostly quantization noise, but drift upward at 0.2 m/step net
step = random.choice([-0.2, 0.0, 0.0, 0.2, 0.2, 0.4])
elevations.append(elevations[-1] + step)
# Force net gain ~200m by scaling
scale = 200.0 / (elevations[-1] - elevations[0]) if elevations[-1] != elevations[0] else 1
elevations = [e * scale for e in elevations]
pts = _ele_pts(elevations)
gain, _ = _elevation(pts, "barometric")
# Hysteresis should produce substantially less than naive accumulation
# and land reasonably close to the 200m net climb.
assert gain is not None
assert gain < 500.0 # not inflated like naive sum
assert gain > 100.0 # not zero either — real climbing exists
def test_elevation_hysteresis_unknown_treated_as_gps():
# "unknown" should apply the same 10m threshold as "gps"
elevations = [0.0, 7.0, 0.0, 7.0] # 7m steps
pts = _ele_pts(elevations)
gain_unknown, _ = _elevation(pts, "unknown")
gain_gps, _ = _elevation(pts, "gps")
assert gain_unknown == gain_gps
def test_compute_hr_stats():
pts = [
_pt(0, lat=48.0, lon=11.0, hr_bpm=120),
_pt(10, lat=48.001, lon=11.0, hr_bpm=160),
_pt(20, lat=48.002, lon=11.0, hr_bpm=140),
]
m = compute(_activity(pts))
assert m.avg_hr_bpm == 140
assert m.max_hr_bpm == 160
def test_compute_hr_null_points_ignored():
pts = [
_pt(0, lat=48.0, lon=11.0, hr_bpm=None),
_pt(10, lat=48.001, lon=11.0, hr_bpm=150),
]
m = compute(_activity(pts))
assert m.avg_hr_bpm == 150
assert m.max_hr_bpm == 150
def test_compute_no_hr():
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
m = compute(_activity(pts))
assert m.avg_hr_bpm is None
assert m.max_hr_bpm is None
def test_compute_bbox():
pts = [
_pt(0, lat=48.0, lon=11.0),
_pt(10, lat=48.5, lon=11.8),
_pt(20, lat=48.2, lon=11.3),
]
m = compute(_activity(pts))
assert m.bbox == (11.0, 48.0, 11.8, 48.5) # min_lon, min_lat, max_lon, max_lat
def test_compute_start_end_latlng():
pts = [
_pt(0, lat=48.0, lon=11.0),
_pt(10, lat=48.5, lon=11.8),
]
m = compute(_activity(pts))
assert m.start_latlng == (48.0, 11.0)
assert m.end_latlng == (48.5, 11.8)
def test_compute_power_stats():
pts = [
_pt(0, lat=48.0, lon=11.0, power_w=200),
_pt(1, lat=48.0, lon=11.0, power_w=300),
_pt(2, lat=48.0, lon=11.0, power_w=250),
]
m = compute(_activity(pts))
assert m.avg_power_w == 250
assert m.max_power_w == 300
# ── MMP ───────────────────────────────────────────────────────────────────────
def test_mmp_no_power():
pts = [_pt(i, lat=48.0, lon=11.0) for i in range(10)]
m = compute_mmp(pts, _ts(0))
assert m is None
def test_mmp_constant_power():
# 60 s at 200 W → 1 s MMP = 200, 5 s MMP = 200, 30 s MMP = 200, 60 s MMP = 200
pts = [_pt(i, power_w=200) for i in range(61)]
result = compute_mmp(pts, _ts(0))
assert result is not None
by_dur = {d: w for d, w in result}
assert by_dur[1] == 200
assert by_dur[5] == 200
assert by_dur[30] == 200
assert by_dur[60] == 200
def test_mmp_peak_window():
# 120 s total: first 60 s at 100 W, last 60 s at 300 W
pts = [_pt(i, power_w=100) for i in range(60)]
pts += [_pt(i, power_w=300) for i in range(60, 121)]
result = compute_mmp(pts, _ts(0))
assert result is not None
by_dur = {d: w for d, w in result}
# 1 s MMP should be 300 (last segment)
assert by_dur[1] == 300
# 60 s MMP: best 60-second window is the last 60 s at 300 W
assert by_dur[60] == 300
def test_mmp_activity_shorter_than_all_durations():
# Only 5 seconds of data
pts = [_pt(i, power_w=200) for i in range(6)]
result = compute_mmp(pts, _ts(0))
assert result is not None
durations = [d for d, _ in result]
# Should only include durations ≤ 5 s
assert all(d <= 5 for d in durations)
assert 60 not in durations
# ── best efforts ─────────────────────────────────────────────────────────────
def test_fastest_time_for_distance_exact():
# 36 km/h for 100 s = 1 km exactly (36/3600 * 100 = 1.0 with no fp issues)
speed_1hz = [36.0] * 100
t = _fastest_time_for_distance(speed_1hz, 1.0)
assert t is not None
assert t <= 100
def test_fastest_time_for_distance_target_not_reached():
# Only 0.5 km of data at 10 km/h
speed_1hz = [10.0] * 180
t = _fastest_time_for_distance(speed_1hz, 1.0)
assert t is None
def test_fastest_time_picks_fastest_window():
# First 200 s at 1 km/h (barely moving), then 100 s at 36 km/h (= 1 km)
speed_1hz = [1.0] * 200 + [36.0] * 100
t = _fastest_time_for_distance(speed_1hz, 1.0)
# The fast window can cover 1 km; the slow window alone cannot.
# Algorithm uses inclusive right-left+1 counting so result may be 100 or 101.
assert t is not None
assert t <= 101
def test_best_efforts_running():
# 15 km/h for 3600 s = 15 km — should cover 1 km, 5 km, 10 km targets
pts = [_pt(i, lat=48.0 + i * 0.0001, lon=11.0, speed_kmh=15.0) for i in range(3601)]
efforts, _ = compute_best_efforts(pts, _ts(0), "running")
assert efforts is not None
covered = [d for d, _ in efforts]
assert 1.0 in covered
assert 5.0 in covered
assert 10.0 in covered
# 42.195 km not reachable in 3600 s at 15 km/h
assert 42.195 not in covered
def test_best_efforts_no_targets_for_sport():
pts = [_pt(i, lat=48.0, lon=11.0) for i in range(100)]
efforts, _ = compute_best_efforts(pts, _ts(0), "hiking")
assert efforts is None
# ── best climb ────────────────────────────────────────────────────────────────
def test_best_climb_simple_ascent():
# 0 → 100 m with no gaps
ele = [float(i) for i in range(101)]
result = _best_climb(ele)
assert result == 100.0
def test_best_climb_with_descent():
# Up 50, down 20, up 80 → best contiguous window = 80
ele = list(range(0, 51)) + list(range(50, 30, -1)) + list(range(30, 111))
result = _best_climb(ele)
assert result is not None
assert result >= 80.0
def test_best_climb_none_gap_resets_window():
# 50 m up, then a GPS gap, then 30 m up — windows don't bridge the gap
ele: list = list(range(0, 51)) + [None] + list(range(0, 31))
result = _best_climb(ele)
assert result == 50.0
def test_best_climb_only_descent():
ele = [100.0, 80.0, 60.0, 40.0]
result = _best_climb(ele)
assert result is None
def test_best_climb_too_few_samples():
assert _best_climb([]) is None
assert _best_climb([100.0]) is None