add tests: test_metrics.py (31 tests) — _haversine_m correctness and symmetry; compute() end-to-end for GPS distance, device distance preference, moving-time stop exclusion, elevation gain/loss,

HR, power, bbox, endpoints; MMP sliding-window constant and peak cases; _fastest_time_for_distance and compute_best_efforts for running targets; _best_climb including gap-reset
  behaviour.

  test_dedup.py (20 tests) — Exact hash lookup; near-duplicate thresholds at the ±5 min / ±5% edges; skipping already-marked duplicates; zero/null distance guard; pick_canonical source
  quality ranking; full save/reload round-trip including duplicate_of persistence.

  test_simplify.py (19 tests) — RDP mask collinear removal, corner retention, epsilon=0 keeps all; simplify_track with GPS and no-GPS input; preview_coords max-points cap and [lat, lon]
  format; build_geojson structure, coordinate order ([lon, lat, ele]), speeds parallel array, point counts.

  test_db.py (35 tests) — WAL mode, idempotent schema; user CRUD and bcrypt authenticate; session creation, lookup, expiry and auto-delete, purge; invite create/use/limit (admin
  unlimited, regular capped at 3); cascade delete of sessions when user is deleted.
This commit is contained in:
Davide Scaini
2026-04-09 10:36:52 +02:00
parent e662bb6426
commit 0223d468c9
4 changed files with 985 additions and 0 deletions
+310
View File
@@ -0,0 +1,310 @@
"""Tests for bincio.extract.metrics."""
import math
from datetime import datetime, timezone
import pytest
from bincio.extract.metrics import (
MMP_DURATIONS_S,
_best_climb,
_fastest_time_for_distance,
_haversine_m,
compute,
compute_best_efforts,
compute_mmp,
)
from bincio.extract.models import DataPoint, ParsedActivity
# ── Helpers ───────────────────────────────────────────────────────────────────
def _ts(offset_s: int) -> datetime:
from datetime import timedelta
return datetime(2024, 6, 1, 8, 0, 0, tzinfo=timezone.utc) + timedelta(seconds=offset_s)
def _pt(offset_s: int, **kw) -> DataPoint:
return DataPoint(timestamp=_ts(offset_s), **kw)
def _activity(points: list[DataPoint], sport: str = "cycling") -> ParsedActivity:
return ParsedActivity(
points=points,
sport=sport,
started_at=_ts(0),
source_file="test.fit",
source_hash="sha256:abc",
)
# ── haversine ─────────────────────────────────────────────────────────────────
def test_haversine_same_point():
assert _haversine_m(48.0, 11.0, 48.0, 11.0) == 0.0
def test_haversine_known_distance():
# London (51.5074, -0.1278) to Paris (48.8566, 2.3522) ≈ 343 km
d = _haversine_m(51.5074, -0.1278, 48.8566, 2.3522)
assert 340_000 < d < 347_000
def test_haversine_symmetry():
a = _haversine_m(48.0, 11.0, 48.1, 11.1)
b = _haversine_m(48.1, 11.1, 48.0, 11.0)
assert abs(a - b) < 1e-6
def test_haversine_short_segment():
# ~111 m per 0.001 degrees latitude at equator
d = _haversine_m(0.0, 0.0, 0.001, 0.0)
assert 110 < d < 112
# ── compute() ─────────────────────────────────────────────────────────────────
def test_compute_empty_activity():
m = compute(_activity([]))
assert m.distance_m is None
assert m.duration_s is None
assert m.elevation_gain_m is None
def test_compute_duration():
pts = [_pt(0, lat=48.0, lon=11.0), _pt(3600, lat=48.1, lon=11.1)]
m = compute(_activity(pts))
assert m.duration_s == 3600
def test_compute_gps_distance():
# Two points ~111 m apart (0.001° lat), 10 s apart
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
m = compute(_activity(pts))
assert m.distance_m is not None
assert 100 < m.distance_m < 120
def test_compute_device_distance_preferred():
# Device reports a different cumulative distance — it should be used.
pts = [
_pt(0, lat=48.0, lon=11.0, distance_m=0.0),
_pt(10, lat=48.001, lon=11.0, distance_m=500.0),
]
m = compute(_activity(pts))
assert m.distance_m == 500.0
def test_compute_moving_time_excludes_stops():
# Three segments: moving, stopped, moving
pts = [
_pt(0, lat=48.0, lon=11.0),
_pt(10, lat=48.001, lon=11.0), # ~111 m in 10 s → moving
_pt(70, lat=48.001, lon=11.0), # 0 m in 60 s → stopped
_pt(80, lat=48.002, lon=11.0), # ~111 m in 10 s → moving
]
m = compute(_activity(pts))
assert m.moving_time_s is not None
assert m.moving_time_s < m.duration_s # stopped time excluded
def test_compute_elevation_gain():
pts = [
_pt(0, lat=48.0, lon=11.0, elevation_m=100.0),
_pt(10, lat=48.001, lon=11.0, elevation_m=150.0),
_pt(20, lat=48.002, lon=11.0, elevation_m=120.0),
]
m = compute(_activity(pts))
assert m.elevation_gain_m == 50.0
assert m.elevation_loss_m == 30.0
def test_compute_no_elevation():
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
m = compute(_activity(pts))
assert m.elevation_gain_m is None
assert m.elevation_loss_m is None
def test_compute_hr_stats():
pts = [
_pt(0, lat=48.0, lon=11.0, hr_bpm=120),
_pt(10, lat=48.001, lon=11.0, hr_bpm=160),
_pt(20, lat=48.002, lon=11.0, hr_bpm=140),
]
m = compute(_activity(pts))
assert m.avg_hr_bpm == 140
assert m.max_hr_bpm == 160
def test_compute_hr_null_points_ignored():
pts = [
_pt(0, lat=48.0, lon=11.0, hr_bpm=None),
_pt(10, lat=48.001, lon=11.0, hr_bpm=150),
]
m = compute(_activity(pts))
assert m.avg_hr_bpm == 150
assert m.max_hr_bpm == 150
def test_compute_no_hr():
pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)]
m = compute(_activity(pts))
assert m.avg_hr_bpm is None
assert m.max_hr_bpm is None
def test_compute_bbox():
pts = [
_pt(0, lat=48.0, lon=11.0),
_pt(10, lat=48.5, lon=11.8),
_pt(20, lat=48.2, lon=11.3),
]
m = compute(_activity(pts))
assert m.bbox == (11.0, 48.0, 11.8, 48.5) # min_lon, min_lat, max_lon, max_lat
def test_compute_start_end_latlng():
pts = [
_pt(0, lat=48.0, lon=11.0),
_pt(10, lat=48.5, lon=11.8),
]
m = compute(_activity(pts))
assert m.start_latlng == (48.0, 11.0)
assert m.end_latlng == (48.5, 11.8)
def test_compute_power_stats():
pts = [
_pt(0, lat=48.0, lon=11.0, power_w=200),
_pt(1, lat=48.0, lon=11.0, power_w=300),
_pt(2, lat=48.0, lon=11.0, power_w=250),
]
m = compute(_activity(pts))
assert m.avg_power_w == 250
assert m.max_power_w == 300
# ── MMP ───────────────────────────────────────────────────────────────────────
def test_mmp_no_power():
pts = [_pt(i, lat=48.0, lon=11.0) for i in range(10)]
m = compute_mmp(pts, _ts(0))
assert m is None
def test_mmp_constant_power():
# 60 s at 200 W → 1 s MMP = 200, 5 s MMP = 200, 30 s MMP = 200, 60 s MMP = 200
pts = [_pt(i, power_w=200) for i in range(61)]
result = compute_mmp(pts, _ts(0))
assert result is not None
by_dur = {d: w for d, w in result}
assert by_dur[1] == 200
assert by_dur[5] == 200
assert by_dur[30] == 200
assert by_dur[60] == 200
def test_mmp_peak_window():
# 120 s total: first 60 s at 100 W, last 60 s at 300 W
pts = [_pt(i, power_w=100) for i in range(60)]
pts += [_pt(i, power_w=300) for i in range(60, 121)]
result = compute_mmp(pts, _ts(0))
assert result is not None
by_dur = {d: w for d, w in result}
# 1 s MMP should be 300 (last segment)
assert by_dur[1] == 300
# 60 s MMP: best 60-second window is the last 60 s at 300 W
assert by_dur[60] == 300
def test_mmp_activity_shorter_than_all_durations():
# Only 5 seconds of data
pts = [_pt(i, power_w=200) for i in range(6)]
result = compute_mmp(pts, _ts(0))
assert result is not None
durations = [d for d, _ in result]
# Should only include durations ≤ 5 s
assert all(d <= 5 for d in durations)
assert 60 not in durations
# ── best efforts ─────────────────────────────────────────────────────────────
def test_fastest_time_for_distance_exact():
# 36 km/h for 100 s = 1 km exactly (36/3600 * 100 = 1.0 with no fp issues)
speed_1hz = [36.0] * 100
t = _fastest_time_for_distance(speed_1hz, 1.0)
assert t is not None
assert t <= 100
def test_fastest_time_for_distance_target_not_reached():
# Only 0.5 km of data at 10 km/h
speed_1hz = [10.0] * 180
t = _fastest_time_for_distance(speed_1hz, 1.0)
assert t is None
def test_fastest_time_picks_fastest_window():
# First 200 s at 1 km/h (barely moving), then 100 s at 36 km/h (= 1 km)
speed_1hz = [1.0] * 200 + [36.0] * 100
t = _fastest_time_for_distance(speed_1hz, 1.0)
# The fast window can cover 1 km; the slow window alone cannot.
# Algorithm uses inclusive right-left+1 counting so result may be 100 or 101.
assert t is not None
assert t <= 101
def test_best_efforts_running():
# 15 km/h for 3600 s = 15 km — should cover 1 km, 5 km, 10 km targets
pts = [_pt(i, lat=48.0 + i * 0.0001, lon=11.0, speed_kmh=15.0) for i in range(3601)]
efforts, _ = compute_best_efforts(pts, _ts(0), "running")
assert efforts is not None
covered = [d for d, _ in efforts]
assert 1.0 in covered
assert 5.0 in covered
assert 10.0 in covered
# 42.195 km not reachable in 3600 s at 15 km/h
assert 42.195 not in covered
def test_best_efforts_no_targets_for_sport():
pts = [_pt(i, lat=48.0, lon=11.0) for i in range(100)]
efforts, _ = compute_best_efforts(pts, _ts(0), "hiking")
assert efforts is None
# ── best climb ────────────────────────────────────────────────────────────────
def test_best_climb_simple_ascent():
# 0 → 100 m with no gaps
ele = [float(i) for i in range(101)]
result = _best_climb(ele)
assert result == 100.0
def test_best_climb_with_descent():
# Up 50, down 20, up 80 → best contiguous window = 80
ele = list(range(0, 51)) + list(range(50, 30, -1)) + list(range(30, 111))
result = _best_climb(ele)
assert result is not None
assert result >= 80.0
def test_best_climb_none_gap_resets_window():
# 50 m up, then a GPS gap, then 30 m up — windows don't bridge the gap
ele: list = list(range(0, 51)) + [None] + list(range(0, 31))
result = _best_climb(ele)
assert result == 50.0
def test_best_climb_only_descent():
ele = [100.0, 80.0, 60.0, 40.0]
result = _best_climb(ele)
assert result is None
def test_best_climb_too_few_samples():
assert _best_climb([]) is None
assert _best_climb([100.0]) is None