872651f471
GPS jitter and barometric quantization noise caused systematic overestimation
of elevation gain — in extreme cases 100% of reported gain was sub-1m noise.
Implements source-aware hysteresis: elevation is only committed when it
deviates from the last committed value by ≥5m (barometric) or ≥10m (GPS/GPX/TCX).
- ParsedActivity gains `altitude_source` field ("barometric"/"gps"/"unknown")
- FIT parser sets "barometric" when enhanced_altitude is present, else "gps"
- GPX and TCX parsers always set "gps"
- metrics._elevation() uses the threshold matching the source
- 5 new parametric tests covering flat GPS noise, threshold differences, and real climbs
383 lines
13 KiB
Python
383 lines
13 KiB
Python
"""Tests for bincio.extract.metrics."""
|
|
|
|
import math
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
|
|
from bincio.extract.metrics import (
|
|
MMP_DURATIONS_S,
|
|
_best_climb,
|
|
_elevation,
|
|
_fastest_time_for_distance,
|
|
_haversine_m,
|
|
compute,
|
|
compute_best_efforts,
|
|
compute_mmp,
|
|
)
|
|
from bincio.extract.models import DataPoint, ParsedActivity
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _ts(offset_s: int) -> datetime:
|
|
from datetime import timedelta
|
|
return datetime(2024, 6, 1, 8, 0, 0, tzinfo=timezone.utc) + timedelta(seconds=offset_s)
|
|
|
|
|
|
def _pt(offset_s: int, **kw) -> DataPoint:
|
|
return DataPoint(timestamp=_ts(offset_s), **kw)
|
|
|
|
|
|
def _activity(points: list[DataPoint], sport: str = "cycling") -> ParsedActivity:
|
|
return ParsedActivity(
|
|
points=points,
|
|
sport=sport,
|
|
started_at=_ts(0),
|
|
source_file="test.fit",
|
|
source_hash="sha256:abc",
|
|
)
|
|
|
|
|
|
# ── haversine ─────────────────────────────────────────────────────────────────
|
|
|
|
def test_haversine_same_point():
|
|
assert _haversine_m(48.0, 11.0, 48.0, 11.0) == 0.0
|
|
|
|
|
|
def test_haversine_known_distance():
|
|
# London (51.5074, -0.1278) to Paris (48.8566, 2.3522) ≈ 343 km
|
|
d = _haversine_m(51.5074, -0.1278, 48.8566, 2.3522)
|
|
assert 340_000 < d < 347_000
|
|
|
|
|
|
def test_haversine_symmetry():
|
|
a = _haversine_m(48.0, 11.0, 48.1, 11.1)
|
|
b = _haversine_m(48.1, 11.1, 48.0, 11.0)
|
|
assert abs(a - b) < 1e-6
|
|
|
|
|
|
def test_haversine_short_segment():
|
|
# ~111 m per 0.001 degrees latitude at equator
|
|
d = _haversine_m(0.0, 0.0, 0.001, 0.0)
|
|
assert 110 < d < 112
|
|
|
|
|
|
# ── compute() ─────────────────────────────────────────────────────────────────
|
|
|
|
def test_compute_empty_activity():
|
|
m = compute(_activity([]))
|
|
assert m.distance_m is None
|
|
assert m.duration_s is None
|
|
assert m.elevation_gain_m is None
|
|
|
|
|
|
def test_compute_duration():
|
|
pts = [_pt(0, lat=48.0, lon=11.0), _pt(3600, lat=48.1, lon=11.1)]
|
|
m = compute(_activity(pts))
|
|
assert m.duration_s == 3600
|
|
|
|
|
|
def test_compute_gps_distance():
|
|
# Two points ~111 m apart (0.001° lat), 10 s apart
|
|
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
|
|
m = compute(_activity(pts))
|
|
assert m.distance_m is not None
|
|
assert 100 < m.distance_m < 120
|
|
|
|
|
|
def test_compute_device_distance_preferred():
|
|
# Device reports a different cumulative distance — it should be used.
|
|
pts = [
|
|
_pt(0, lat=48.0, lon=11.0, distance_m=0.0),
|
|
_pt(10, lat=48.001, lon=11.0, distance_m=500.0),
|
|
]
|
|
m = compute(_activity(pts))
|
|
assert m.distance_m == 500.0
|
|
|
|
|
|
def test_compute_moving_time_excludes_stops():
|
|
# Three segments: moving, stopped, moving
|
|
pts = [
|
|
_pt(0, lat=48.0, lon=11.0),
|
|
_pt(10, lat=48.001, lon=11.0), # ~111 m in 10 s → moving
|
|
_pt(70, lat=48.001, lon=11.0), # 0 m in 60 s → stopped
|
|
_pt(80, lat=48.002, lon=11.0), # ~111 m in 10 s → moving
|
|
]
|
|
m = compute(_activity(pts))
|
|
assert m.moving_time_s is not None
|
|
assert m.moving_time_s < m.duration_s # stopped time excluded
|
|
|
|
|
|
def test_compute_elevation_gain():
|
|
pts = [
|
|
_pt(0, lat=48.0, lon=11.0, elevation_m=100.0),
|
|
_pt(10, lat=48.001, lon=11.0, elevation_m=150.0),
|
|
_pt(20, lat=48.002, lon=11.0, elevation_m=120.0),
|
|
]
|
|
m = compute(_activity(pts))
|
|
assert m.elevation_gain_m == 50.0
|
|
assert m.elevation_loss_m == 30.0
|
|
|
|
|
|
def test_compute_no_elevation():
|
|
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
|
|
m = compute(_activity(pts))
|
|
assert m.elevation_gain_m is None
|
|
assert m.elevation_loss_m is None
|
|
|
|
|
|
# ── elevation hysteresis ──────────────────────────────────────────────────────
|
|
|
|
def _ele_pts(elevations: list[float]) -> list[DataPoint]:
|
|
return [_pt(i, elevation_m=e) for i, e in enumerate(elevations)]
|
|
|
|
|
|
def test_elevation_hysteresis_large_step_always_counted():
|
|
# A single 50m step is way above any threshold — both sources should count it.
|
|
pts = _ele_pts([100.0, 150.0])
|
|
gain_baro, _ = _elevation(pts, "barometric")
|
|
gain_gps, _ = _elevation(pts, "gps")
|
|
assert gain_baro == 50.0
|
|
assert gain_gps == 50.0
|
|
|
|
|
|
def test_elevation_hysteresis_flat_gps_noise_suppressed():
|
|
# Flat coastal route: 16m of GPS noise oscillating within ±8m.
|
|
# All steps are sub-1m — hysteresis should return ~0 gain.
|
|
import math
|
|
n = 1000
|
|
elevations = [100.0 + 3.0 * math.sin(i * 0.1) for i in range(n)]
|
|
pts = _ele_pts(elevations)
|
|
gain, loss = _elevation(pts, "gps")
|
|
# With threshold=10m no oscillation within ±3m should ever commit.
|
|
assert gain == 0.0
|
|
assert loss == 0.0
|
|
|
|
|
|
def test_elevation_hysteresis_barometric_threshold_lower():
|
|
# Steps of exactly 7m — above barometric (5m) but below GPS (10m) threshold.
|
|
elevations = [0.0, 7.0, 0.0, 7.0]
|
|
pts = _ele_pts(elevations)
|
|
gain_baro, _ = _elevation(pts, "barometric")
|
|
gain_gps, _ = _elevation(pts, "gps")
|
|
assert gain_baro == 14.0 # both 7m steps committed
|
|
assert gain_gps == 0.0 # 7m < 10m threshold → suppressed
|
|
|
|
|
|
def test_elevation_hysteresis_real_climb_approximated():
|
|
# Simulate a 200m climb with 0.2m barometric quantization noise.
|
|
# Build a staircase: 1000 steps, mostly 0.2m up/down noise, with a 200m net climb.
|
|
import random
|
|
random.seed(42)
|
|
elevations = [0.0]
|
|
for i in range(999):
|
|
# Mostly quantization noise, but drift upward at 0.2 m/step net
|
|
step = random.choice([-0.2, 0.0, 0.0, 0.2, 0.2, 0.4])
|
|
elevations.append(elevations[-1] + step)
|
|
|
|
# Force net gain ~200m by scaling
|
|
scale = 200.0 / (elevations[-1] - elevations[0]) if elevations[-1] != elevations[0] else 1
|
|
elevations = [e * scale for e in elevations]
|
|
|
|
pts = _ele_pts(elevations)
|
|
gain, _ = _elevation(pts, "barometric")
|
|
# Hysteresis should produce substantially less than naive accumulation
|
|
# and land reasonably close to the 200m net climb.
|
|
assert gain is not None
|
|
assert gain < 500.0 # not inflated like naive sum
|
|
assert gain > 100.0 # not zero either — real climbing exists
|
|
|
|
|
|
def test_elevation_hysteresis_unknown_treated_as_gps():
|
|
# "unknown" should apply the same 10m threshold as "gps"
|
|
elevations = [0.0, 7.0, 0.0, 7.0] # 7m steps
|
|
pts = _ele_pts(elevations)
|
|
gain_unknown, _ = _elevation(pts, "unknown")
|
|
gain_gps, _ = _elevation(pts, "gps")
|
|
assert gain_unknown == gain_gps
|
|
|
|
|
|
def test_compute_hr_stats():
|
|
pts = [
|
|
_pt(0, lat=48.0, lon=11.0, hr_bpm=120),
|
|
_pt(10, lat=48.001, lon=11.0, hr_bpm=160),
|
|
_pt(20, lat=48.002, lon=11.0, hr_bpm=140),
|
|
]
|
|
m = compute(_activity(pts))
|
|
assert m.avg_hr_bpm == 140
|
|
assert m.max_hr_bpm == 160
|
|
|
|
|
|
def test_compute_hr_null_points_ignored():
|
|
pts = [
|
|
_pt(0, lat=48.0, lon=11.0, hr_bpm=None),
|
|
_pt(10, lat=48.001, lon=11.0, hr_bpm=150),
|
|
]
|
|
m = compute(_activity(pts))
|
|
assert m.avg_hr_bpm == 150
|
|
assert m.max_hr_bpm == 150
|
|
|
|
|
|
def test_compute_no_hr():
|
|
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
|
|
m = compute(_activity(pts))
|
|
assert m.avg_hr_bpm is None
|
|
assert m.max_hr_bpm is None
|
|
|
|
|
|
def test_compute_bbox():
|
|
pts = [
|
|
_pt(0, lat=48.0, lon=11.0),
|
|
_pt(10, lat=48.5, lon=11.8),
|
|
_pt(20, lat=48.2, lon=11.3),
|
|
]
|
|
m = compute(_activity(pts))
|
|
assert m.bbox == (11.0, 48.0, 11.8, 48.5) # min_lon, min_lat, max_lon, max_lat
|
|
|
|
|
|
def test_compute_start_end_latlng():
|
|
pts = [
|
|
_pt(0, lat=48.0, lon=11.0),
|
|
_pt(10, lat=48.5, lon=11.8),
|
|
]
|
|
m = compute(_activity(pts))
|
|
assert m.start_latlng == (48.0, 11.0)
|
|
assert m.end_latlng == (48.5, 11.8)
|
|
|
|
|
|
def test_compute_power_stats():
|
|
pts = [
|
|
_pt(0, lat=48.0, lon=11.0, power_w=200),
|
|
_pt(1, lat=48.0, lon=11.0, power_w=300),
|
|
_pt(2, lat=48.0, lon=11.0, power_w=250),
|
|
]
|
|
m = compute(_activity(pts))
|
|
assert m.avg_power_w == 250
|
|
assert m.max_power_w == 300
|
|
|
|
|
|
# ── MMP ───────────────────────────────────────────────────────────────────────
|
|
|
|
def test_mmp_no_power():
|
|
pts = [_pt(i, lat=48.0, lon=11.0) for i in range(10)]
|
|
m = compute_mmp(pts, _ts(0))
|
|
assert m is None
|
|
|
|
|
|
def test_mmp_constant_power():
|
|
# 60 s at 200 W → 1 s MMP = 200, 5 s MMP = 200, 30 s MMP = 200, 60 s MMP = 200
|
|
pts = [_pt(i, power_w=200) for i in range(61)]
|
|
result = compute_mmp(pts, _ts(0))
|
|
assert result is not None
|
|
by_dur = {d: w for d, w in result}
|
|
assert by_dur[1] == 200
|
|
assert by_dur[5] == 200
|
|
assert by_dur[30] == 200
|
|
assert by_dur[60] == 200
|
|
|
|
|
|
def test_mmp_peak_window():
|
|
# 120 s total: first 60 s at 100 W, last 60 s at 300 W
|
|
pts = [_pt(i, power_w=100) for i in range(60)]
|
|
pts += [_pt(i, power_w=300) for i in range(60, 121)]
|
|
result = compute_mmp(pts, _ts(0))
|
|
assert result is not None
|
|
by_dur = {d: w for d, w in result}
|
|
# 1 s MMP should be 300 (last segment)
|
|
assert by_dur[1] == 300
|
|
# 60 s MMP: best 60-second window is the last 60 s at 300 W
|
|
assert by_dur[60] == 300
|
|
|
|
|
|
def test_mmp_activity_shorter_than_all_durations():
|
|
# Only 5 seconds of data
|
|
pts = [_pt(i, power_w=200) for i in range(6)]
|
|
result = compute_mmp(pts, _ts(0))
|
|
assert result is not None
|
|
durations = [d for d, _ in result]
|
|
# Should only include durations ≤ 5 s
|
|
assert all(d <= 5 for d in durations)
|
|
assert 60 not in durations
|
|
|
|
|
|
# ── best efforts ─────────────────────────────────────────────────────────────
|
|
|
|
def test_fastest_time_for_distance_exact():
|
|
# 36 km/h for 100 s = 1 km exactly (36/3600 * 100 = 1.0 with no fp issues)
|
|
speed_1hz = [36.0] * 100
|
|
t = _fastest_time_for_distance(speed_1hz, 1.0)
|
|
assert t is not None
|
|
assert t <= 100
|
|
|
|
|
|
def test_fastest_time_for_distance_target_not_reached():
|
|
# Only 0.5 km of data at 10 km/h
|
|
speed_1hz = [10.0] * 180
|
|
t = _fastest_time_for_distance(speed_1hz, 1.0)
|
|
assert t is None
|
|
|
|
|
|
def test_fastest_time_picks_fastest_window():
|
|
# First 200 s at 1 km/h (barely moving), then 100 s at 36 km/h (= 1 km)
|
|
speed_1hz = [1.0] * 200 + [36.0] * 100
|
|
t = _fastest_time_for_distance(speed_1hz, 1.0)
|
|
# The fast window can cover 1 km; the slow window alone cannot.
|
|
# Algorithm uses inclusive right-left+1 counting so result may be 100 or 101.
|
|
assert t is not None
|
|
assert t <= 101
|
|
|
|
|
|
def test_best_efforts_running():
|
|
# 15 km/h for 3600 s = 15 km — should cover 1 km, 5 km, 10 km targets
|
|
pts = [_pt(i, lat=48.0 + i * 0.0001, lon=11.0, speed_kmh=15.0) for i in range(3601)]
|
|
efforts, _ = compute_best_efforts(pts, _ts(0), "running")
|
|
assert efforts is not None
|
|
covered = [d for d, _ in efforts]
|
|
assert 1.0 in covered
|
|
assert 5.0 in covered
|
|
assert 10.0 in covered
|
|
# 42.195 km not reachable in 3600 s at 15 km/h
|
|
assert 42.195 not in covered
|
|
|
|
|
|
def test_best_efforts_no_targets_for_sport():
|
|
pts = [_pt(i, lat=48.0, lon=11.0) for i in range(100)]
|
|
efforts, _ = compute_best_efforts(pts, _ts(0), "hiking")
|
|
assert efforts is None
|
|
|
|
|
|
# ── best climb ────────────────────────────────────────────────────────────────
|
|
|
|
def test_best_climb_simple_ascent():
|
|
# 0 → 100 m with no gaps
|
|
ele = [float(i) for i in range(101)]
|
|
result = _best_climb(ele)
|
|
assert result == 100.0
|
|
|
|
|
|
def test_best_climb_with_descent():
|
|
# Up 50, down 20, up 80 → best contiguous window = 80
|
|
ele = list(range(0, 51)) + list(range(50, 30, -1)) + list(range(30, 111))
|
|
result = _best_climb(ele)
|
|
assert result is not None
|
|
assert result >= 80.0
|
|
|
|
|
|
def test_best_climb_none_gap_resets_window():
|
|
# 50 m up, then a GPS gap, then 30 m up — windows don't bridge the gap
|
|
ele: list = list(range(0, 51)) + [None] + list(range(0, 31))
|
|
result = _best_climb(ele)
|
|
assert result == 50.0
|
|
|
|
|
|
def test_best_climb_only_descent():
|
|
ele = [100.0, 80.0, 60.0, 40.0]
|
|
result = _best_climb(ele)
|
|
assert result is None
|
|
|
|
|
|
def test_best_climb_too_few_samples():
|
|
assert _best_climb([]) is None
|
|
assert _best_climb([100.0]) is None
|