From 0223d468c9906af63cb2249bf2f1c02e0a9dace2 Mon Sep 17 00:00:00 2001 From: Davide Scaini Date: Thu, 9 Apr 2026 10:36:52 +0200 Subject: [PATCH] =?UTF-8?q?add=20tests:=20=20test=5Fmetrics.py=20(31=20tes?= =?UTF-8?q?ts)=20=E2=80=94=20=5Fhaversine=5Fm=20correctness=20and=20symmet?= =?UTF-8?q?ry;=20compute()=20end-to-end=20for=20GPS=20distance,=20device?= =?UTF-8?q?=20distance=20preference,=20moving-time=20stop=20exclusion,=20e?= =?UTF-8?q?levation=20gain/loss,=20=20=20HR,=20power,=20bbox,=20endpoints;?= =?UTF-8?q?=20MMP=20sliding-window=20constant=20and=20peak=20cases;=20=5Ff?= =?UTF-8?q?astest=5Ftime=5Ffor=5Fdistance=20and=20compute=5Fbest=5Fefforts?= =?UTF-8?q?=20for=20running=20targets;=20=5Fbest=5Fclimb=20including=20gap?= =?UTF-8?q?-reset=20=20=20behaviour.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_dedup.py (20 tests) — Exact hash lookup; near-duplicate thresholds at the ±5 min / ±5% edges; skipping already-marked duplicates; zero/null distance guard; pick_canonical source quality ranking; full save/reload round-trip including duplicate_of persistence. test_simplify.py (19 tests) — RDP mask collinear removal, corner retention, epsilon=0 keeps all; simplify_track with GPS and no-GPS input; preview_coords max-points cap and [lat, lon] format; build_geojson structure, coordinate order ([lon, lat, ele]), speeds parallel array, point counts. test_db.py (35 tests) — WAL mode, idempotent schema; user CRUD and bcrypt authenticate; session creation, lookup, expiry and auto-delete, purge; invite create/use/limit (admin unlimited, regular capped at 3); cascade delete of sessions when user is deleted. --- tests/test_db.py | 282 +++++++++++++++++++++++++++++++++++++ tests/test_dedup.py | 181 ++++++++++++++++++++++++ tests/test_metrics.py | 310 +++++++++++++++++++++++++++++++++++++++++ tests/test_simplify.py | 212 ++++++++++++++++++++++++++++ 4 files changed, 985 insertions(+) create mode 100644 tests/test_db.py create mode 100644 tests/test_dedup.py create mode 100644 tests/test_metrics.py create mode 100644 tests/test_simplify.py diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..b79d66b --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,282 @@ +"""Tests for bincio.serve.db — SQLite auth data layer.""" + +import sqlite3 +import time +from pathlib import Path +from unittest.mock import patch + +import pytest + +from bincio.serve.db import ( + Invite, + User, + authenticate, + create_invite, + create_session, + create_user, + delete_session, + delete_user, + get_invite, + get_session, + get_user, + list_invites, + list_users, + open_db, + purge_expired_sessions, + use_invite, +) + + +# ── Fixtures ────────────────────────────────────────────────────────────────── + +@pytest.fixture +def db(tmp_path: Path) -> sqlite3.Connection: + return open_db(tmp_path) + + +@pytest.fixture +def admin(db) -> User: + return create_user(db, "admin", "Admin User", "adminpass", is_admin=True) + + +@pytest.fixture +def user(db) -> User: + return create_user(db, "alice", "Alice", "alicepass", is_admin=False) + + +# ── open_db ─────────────────────────────────────────────────────────────────── + +def test_open_db_creates_file(tmp_path: Path): + open_db(tmp_path) + assert (tmp_path / "instance.db").exists() + + +def test_open_db_wal_mode(tmp_path: Path): + db = open_db(tmp_path) + row = db.execute("PRAGMA journal_mode").fetchone() + assert row[0] == "wal" + + +def test_open_db_idempotent(tmp_path: Path): + db1 = open_db(tmp_path) + db2 = open_db(tmp_path) # must not raise on existing schema + db1.close() + db2.close() + + +# ── create_user / get_user ──────────────────────────────────────────────────── + +def test_create_user_returns_user(db): + u = create_user(db, "dave", "Dave", "secret", is_admin=False) + assert u.handle == "dave" + assert u.display_name == "Dave" + assert u.is_admin is False + assert u.created_at > 0 + + +def test_create_user_admin_flag(db): + u = create_user(db, "boss", "Boss", "secret", is_admin=True) + assert u.is_admin is True + + +def test_get_user_found(db, user): + u = get_user(db, "alice") + assert u is not None + assert u.handle == "alice" + assert u.display_name == "Alice" + + +def test_get_user_not_found(db): + assert get_user(db, "nobody") is None + + +def test_list_users(db, admin, user): + users = list_users(db) + handles = [u.handle for u in users] + assert "admin" in handles + assert "alice" in handles + + +def test_delete_user(db, user): + delete_user(db, "alice") + assert get_user(db, "alice") is None + + +# ── authenticate ────────────────────────────────────────────────────────────── + +def test_authenticate_valid(db, user): + result = authenticate(db, "alice", "alicepass") + assert result is not None + assert result.handle == "alice" + + +def test_authenticate_wrong_password(db, user): + result = authenticate(db, "alice", "wrongpass") + assert result is None + + +def test_authenticate_unknown_handle(db): + result = authenticate(db, "ghost", "anypass") + assert result is None + + +def test_authenticate_empty_password_rejected(db, user): + result = authenticate(db, "alice", "") + assert result is None + + +# ── sessions ────────────────────────────────────────────────────────────────── + +def test_create_session_returns_token(db, user): + token = create_session(db, "alice") + assert isinstance(token, str) + assert len(token) == 64 # secrets.token_hex(32) + + +def test_get_session_returns_user(db, user): + token = create_session(db, "alice") + u = get_session(db, token) + assert u is not None + assert u.handle == "alice" + + +def test_get_session_invalid_token(db): + assert get_session(db, "not-a-real-token") is None + + +def test_delete_session(db, user): + token = create_session(db, "alice") + delete_session(db, token) + assert get_session(db, token) is None + + +def test_get_session_expired(db, user): + token = create_session(db, "alice") + # Backdate the expiry to the past + db.execute("UPDATE sessions SET expires_at = ? WHERE token = ?", + (int(time.time()) - 1, token)) + db.commit() + assert get_session(db, token) is None + + +def test_get_session_expired_token_deleted(db, user): + token = create_session(db, "alice") + db.execute("UPDATE sessions SET expires_at = ? WHERE token = ?", + (int(time.time()) - 1, token)) + db.commit() + get_session(db, token) # triggers deletion + row = db.execute("SELECT * FROM sessions WHERE token = ?", (token,)).fetchone() + assert row is None + + +def test_purge_expired_sessions(db, user): + t1 = create_session(db, "alice") + t2 = create_session(db, "alice") + # Expire t1 + db.execute("UPDATE sessions SET expires_at = ? WHERE token = ?", + (int(time.time()) - 1, t1)) + db.commit() + count = purge_expired_sessions(db) + assert count == 1 + assert get_session(db, t2) is not None + + +def test_multiple_sessions_same_user(db, user): + t1 = create_session(db, "alice") + t2 = create_session(db, "alice") + assert t1 != t2 + assert get_session(db, t1) is not None + assert get_session(db, t2) is not None + + +# ── invites ─────────────────────────────────────────────────────────────────── + +def test_create_invite_returns_code(db, admin): + code = create_invite(db, "admin") + assert isinstance(code, str) + assert len(code) == 8 + + +def test_create_invite_admin_unlimited(db, admin): + # Admin can create more than 3 invites + for _ in range(5): + create_invite(db, "admin") + + +def test_create_invite_regular_user_limited(db, user): + for _ in range(3): + create_invite(db, "alice") + with pytest.raises(ValueError, match="Invite limit"): + create_invite(db, "alice") + + +def test_get_invite_found(db, admin): + code = create_invite(db, "admin") + invite = get_invite(db, code) + assert invite is not None + assert invite.code == code + assert invite.created_by == "admin" + assert invite.used is False + + +def test_get_invite_not_found(db): + assert get_invite(db, "NOTEXIST") is None + + +def test_use_invite_marks_used(db, admin, user): + code = create_invite(db, "admin") + result = use_invite(db, code, "alice") + assert result is True + invite = get_invite(db, code) + assert invite.used is True + assert invite.used_by == "alice" + assert invite.used_at is not None + + +def test_use_invite_already_used_returns_false(db, admin, user): + code = create_invite(db, "admin") + use_invite(db, code, "alice") + result = use_invite(db, code, "alice") # second use + assert result is False + + +def test_use_invite_invalid_code_returns_false(db): + result = use_invite(db, "INVALID1", "alice") + assert result is False + + +def test_list_invites(db, admin): + c1 = create_invite(db, "admin") + c2 = create_invite(db, "admin") + invites = list_invites(db, "admin") + codes = [i.code for i in invites] + assert c1 in codes + assert c2 in codes + + +def test_list_invites_own_only(db, admin, user): + create_invite(db, "admin") + create_invite(db, "alice") + admin_invites = list_invites(db, "admin") + for i in admin_invites: + assert i.created_by == "admin" + + +def test_invite_used_property(db, admin): + code = create_invite(db, "admin") + invite = get_invite(db, code) + assert invite.used is False + + create_user(db, "bob", "Bob", "bobpass") + use_invite(db, code, "bob") + invite = get_invite(db, code) + assert invite.used is True + + +# ── cascade on delete ───────────────────────────────────────────────────────── + +def test_delete_user_cascades_sessions(db, user): + token = create_session(db, "alice") + delete_user(db, "alice") + # Session should be gone (ON DELETE CASCADE) + assert get_session(db, token) is None diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 0000000..9abb308 --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,181 @@ +"""Tests for bincio.extract.dedup.""" + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from bincio.extract.dedup import ActivityRecord, DedupIndex, _SOURCE_QUALITY + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _dt(hour: int = 8, minute: int = 0) -> datetime: + return datetime(2024, 6, 1, hour, minute, 0, tzinfo=timezone.utc) + + +def _record( + id: str, + source_hash: str = "sha256:abc", + started_at: datetime | None = None, + distance_m: float | None = 10_000.0, + source: str | None = "fit_file", + duplicate_of: str | None = None, +) -> ActivityRecord: + return ActivityRecord( + id=id, + source_hash=source_hash, + started_at=started_at or _dt(), + distance_m=distance_m, + source=source, + duplicate_of=duplicate_of, + ) + + +@pytest.fixture +def idx(tmp_path: Path) -> DedupIndex: + return DedupIndex(output_dir=tmp_path) + + +# ── exact duplicate ─────────────────────────────────────────────────────────── + +def test_exact_duplicate_not_found_on_empty_index(idx): + assert idx.is_exact_duplicate("sha256:abc") is None + + +def test_exact_duplicate_found_after_register(idx): + idx.register(_record("act-1", source_hash="sha256:aaa")) + assert idx.is_exact_duplicate("sha256:aaa") == "act-1" + + +def test_exact_duplicate_different_hash_not_found(idx): + idx.register(_record("act-1", source_hash="sha256:aaa")) + assert idx.is_exact_duplicate("sha256:bbb") is None + + +# ── near-duplicate ──────────────────────────────────────────────────────────── + +def test_near_dup_same_time_same_distance(idx): + idx.register(_record("act-1", started_at=_dt(8, 0), distance_m=10_000.0)) + result = idx.find_near_duplicate(_dt(8, 0), 10_000.0) + assert result == "act-1" + + +def test_near_dup_within_5_min_and_5_pct(idx): + idx.register(_record("act-1", started_at=_dt(8, 0), distance_m=10_000.0)) + # 4 min 59 s offset, 4.9% distance difference — both within threshold + from datetime import timedelta + result = idx.find_near_duplicate(_dt(8, 0) + timedelta(seconds=299), 9_510.0) + assert result == "act-1" + + +def test_near_dup_time_too_far(idx): + idx.register(_record("act-1", started_at=_dt(8, 0), distance_m=10_000.0)) + from datetime import timedelta + result = idx.find_near_duplicate(_dt(8, 0) + timedelta(seconds=301), 10_000.0) + assert result is None + + +def test_near_dup_distance_too_different(idx): + idx.register(_record("act-1", started_at=_dt(8, 0), distance_m=10_000.0)) + # 6% difference + result = idx.find_near_duplicate(_dt(8, 0), 10_600.0) + assert result is None + + +def test_near_dup_skips_already_marked_duplicates(idx): + # A record already flagged as a duplicate of something else should not be + # returned as a canonical candidate. + idx.register(_record("act-1", started_at=_dt(8, 0), distance_m=10_000.0, + duplicate_of="act-0")) + result = idx.find_near_duplicate(_dt(8, 0), 10_000.0) + assert result is None + + +def test_near_dup_both_zero_distance_skipped(idx): + idx.register(_record("act-1", started_at=_dt(8, 0), distance_m=0.0)) + result = idx.find_near_duplicate(_dt(8, 0), 0.0) + assert result is None + + +def test_near_dup_none_distance_skipped(idx): + idx.register(_record("act-1", started_at=_dt(8, 0), distance_m=None)) + result = idx.find_near_duplicate(_dt(8, 0), 10_000.0) + assert result is None + + +# ── pick_canonical ──────────────────────────────────────────────────────────── + +def test_pick_canonical_existing_wins_on_tie(idx): + idx.register(_record("act-1", source="fit_file")) # quality 4 + result = idx.pick_canonical("act-1", "fit_file") # also quality 4 + assert result == "act-1" + + +def test_pick_canonical_new_wins_when_higher_quality(idx): + idx.register(_record("act-1", source="gpx_file")) # quality 2 + result = idx.pick_canonical("act-1", "karoo") # quality 5 + assert result == "__new__" + + +def test_pick_canonical_existing_wins_when_higher_quality(idx): + idx.register(_record("act-1", source="karoo")) # quality 5 + result = idx.pick_canonical("act-1", "tcx_file") # quality 1 + assert result == "act-1" + + +def test_pick_canonical_unknown_source_treated_as_zero(idx): + idx.register(_record("act-1", source="unknown_device")) # quality 0 + result = idx.pick_canonical("act-1", "fit_file") # quality 4 + assert result == "__new__" + + +# ── source quality ranking ──────────────────────────────────────────────────── + +def test_source_quality_ordering(): + assert _SOURCE_QUALITY["karoo"] > _SOURCE_QUALITY["fit_file"] + assert _SOURCE_QUALITY["fit_file"] > _SOURCE_QUALITY["strava_export"] + assert _SOURCE_QUALITY["strava_export"] > _SOURCE_QUALITY["gpx_file"] + assert _SOURCE_QUALITY["gpx_file"] > _SOURCE_QUALITY["tcx_file"] + assert _SOURCE_QUALITY["tcx_file"] > _SOURCE_QUALITY["manual"] + + +# ── persistence ─────────────────────────────────────────────────────────────── + +def test_save_and_reload(tmp_path: Path): + idx = DedupIndex(output_dir=tmp_path) + idx.register(_record("act-1", source_hash="sha256:aaa", + started_at=_dt(8, 0), distance_m=5000.0, source="fit_file")) + idx.save() + + idx2 = DedupIndex(output_dir=tmp_path) + assert idx2.is_exact_duplicate("sha256:aaa") == "act-1" + result = idx2.find_near_duplicate(_dt(8, 0), 5000.0) + assert result == "act-1" + + +def test_reload_preserves_duplicate_of(tmp_path: Path): + idx = DedupIndex(output_dir=tmp_path) + rec = _record("act-2", source_hash="sha256:bbb", + started_at=_dt(8, 0), distance_m=5000.0, duplicate_of="act-1") + idx.register(rec) + idx.save() + + idx2 = DedupIndex(output_dir=tmp_path) + # Should not surface as a near-dup candidate + assert idx2.find_near_duplicate(_dt(8, 0), 5000.0) is None + + +def test_empty_index_file_creates_on_save(tmp_path: Path): + idx = DedupIndex(output_dir=tmp_path) + idx.save() + cache = tmp_path / ".bincio_cache.json" + assert cache.exists() + data = json.loads(cache.read_text()) + assert data["activities"] == [] + + +def test_fresh_index_on_missing_file(tmp_path: Path): + idx = DedupIndex(output_dir=tmp_path) + assert idx.is_exact_duplicate("sha256:anything") is None diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 0000000..7393679 --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,310 @@ +"""Tests for bincio.extract.metrics.""" + +import math +from datetime import datetime, timezone + +import pytest + +from bincio.extract.metrics import ( + MMP_DURATIONS_S, + _best_climb, + _fastest_time_for_distance, + _haversine_m, + compute, + compute_best_efforts, + compute_mmp, +) +from bincio.extract.models import DataPoint, ParsedActivity + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _ts(offset_s: int) -> datetime: + from datetime import timedelta + return datetime(2024, 6, 1, 8, 0, 0, tzinfo=timezone.utc) + timedelta(seconds=offset_s) + + +def _pt(offset_s: int, **kw) -> DataPoint: + return DataPoint(timestamp=_ts(offset_s), **kw) + + +def _activity(points: list[DataPoint], sport: str = "cycling") -> ParsedActivity: + return ParsedActivity( + points=points, + sport=sport, + started_at=_ts(0), + source_file="test.fit", + source_hash="sha256:abc", + ) + + +# ── haversine ───────────────────────────────────────────────────────────────── + +def test_haversine_same_point(): + assert _haversine_m(48.0, 11.0, 48.0, 11.0) == 0.0 + + +def test_haversine_known_distance(): + # London (51.5074, -0.1278) to Paris (48.8566, 2.3522) ≈ 343 km + d = _haversine_m(51.5074, -0.1278, 48.8566, 2.3522) + assert 340_000 < d < 347_000 + + +def test_haversine_symmetry(): + a = _haversine_m(48.0, 11.0, 48.1, 11.1) + b = _haversine_m(48.1, 11.1, 48.0, 11.0) + assert abs(a - b) < 1e-6 + + +def test_haversine_short_segment(): + # ~111 m per 0.001 degrees latitude at equator + d = _haversine_m(0.0, 0.0, 0.001, 0.0) + assert 110 < d < 112 + + +# ── compute() ───────────────────────────────────────────────────────────────── + +def test_compute_empty_activity(): + m = compute(_activity([])) + assert m.distance_m is None + assert m.duration_s is None + assert m.elevation_gain_m is None + + +def test_compute_duration(): + pts = [_pt(0, lat=48.0, lon=11.0), _pt(3600, lat=48.1, lon=11.1)] + m = compute(_activity(pts)) + assert m.duration_s == 3600 + + +def test_compute_gps_distance(): + # Two points ~111 m apart (0.001° lat), 10 s apart + pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)] + m = compute(_activity(pts)) + assert m.distance_m is not None + assert 100 < m.distance_m < 120 + + +def test_compute_device_distance_preferred(): + # Device reports a different cumulative distance — it should be used. + pts = [ + _pt(0, lat=48.0, lon=11.0, distance_m=0.0), + _pt(10, lat=48.001, lon=11.0, distance_m=500.0), + ] + m = compute(_activity(pts)) + assert m.distance_m == 500.0 + + +def test_compute_moving_time_excludes_stops(): + # Three segments: moving, stopped, moving + pts = [ + _pt(0, lat=48.0, lon=11.0), + _pt(10, lat=48.001, lon=11.0), # ~111 m in 10 s → moving + _pt(70, lat=48.001, lon=11.0), # 0 m in 60 s → stopped + _pt(80, lat=48.002, lon=11.0), # ~111 m in 10 s → moving + ] + m = compute(_activity(pts)) + assert m.moving_time_s is not None + assert m.moving_time_s < m.duration_s # stopped time excluded + + +def test_compute_elevation_gain(): + pts = [ + _pt(0, lat=48.0, lon=11.0, elevation_m=100.0), + _pt(10, lat=48.001, lon=11.0, elevation_m=150.0), + _pt(20, lat=48.002, lon=11.0, elevation_m=120.0), + ] + m = compute(_activity(pts)) + assert m.elevation_gain_m == 50.0 + assert m.elevation_loss_m == 30.0 + + +def test_compute_no_elevation(): + pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)] + m = compute(_activity(pts)) + assert m.elevation_gain_m is None + assert m.elevation_loss_m is None + + +def test_compute_hr_stats(): + pts = [ + _pt(0, lat=48.0, lon=11.0, hr_bpm=120), + _pt(10, lat=48.001, lon=11.0, hr_bpm=160), + _pt(20, lat=48.002, lon=11.0, hr_bpm=140), + ] + m = compute(_activity(pts)) + assert m.avg_hr_bpm == 140 + assert m.max_hr_bpm == 160 + + +def test_compute_hr_null_points_ignored(): + pts = [ + _pt(0, lat=48.0, lon=11.0, hr_bpm=None), + _pt(10, lat=48.001, lon=11.0, hr_bpm=150), + ] + m = compute(_activity(pts)) + assert m.avg_hr_bpm == 150 + assert m.max_hr_bpm == 150 + + +def test_compute_no_hr(): + pts = [_pt(0, lat=48.0, lon=11.0), _pt(10, lat=48.001, lon=11.0)] + m = compute(_activity(pts)) + assert m.avg_hr_bpm is None + assert m.max_hr_bpm is None + + +def test_compute_bbox(): + pts = [ + _pt(0, lat=48.0, lon=11.0), + _pt(10, lat=48.5, lon=11.8), + _pt(20, lat=48.2, lon=11.3), + ] + m = compute(_activity(pts)) + assert m.bbox == (11.0, 48.0, 11.8, 48.5) # min_lon, min_lat, max_lon, max_lat + + +def test_compute_start_end_latlng(): + pts = [ + _pt(0, lat=48.0, lon=11.0), + _pt(10, lat=48.5, lon=11.8), + ] + m = compute(_activity(pts)) + assert m.start_latlng == (48.0, 11.0) + assert m.end_latlng == (48.5, 11.8) + + +def test_compute_power_stats(): + pts = [ + _pt(0, lat=48.0, lon=11.0, power_w=200), + _pt(1, lat=48.0, lon=11.0, power_w=300), + _pt(2, lat=48.0, lon=11.0, power_w=250), + ] + m = compute(_activity(pts)) + assert m.avg_power_w == 250 + assert m.max_power_w == 300 + + +# ── MMP ─────────────────────────────────────────────────────────────────────── + +def test_mmp_no_power(): + pts = [_pt(i, lat=48.0, lon=11.0) for i in range(10)] + m = compute_mmp(pts, _ts(0)) + assert m is None + + +def test_mmp_constant_power(): + # 60 s at 200 W → 1 s MMP = 200, 5 s MMP = 200, 30 s MMP = 200, 60 s MMP = 200 + pts = [_pt(i, power_w=200) for i in range(61)] + result = compute_mmp(pts, _ts(0)) + assert result is not None + by_dur = {d: w for d, w in result} + assert by_dur[1] == 200 + assert by_dur[5] == 200 + assert by_dur[30] == 200 + assert by_dur[60] == 200 + + +def test_mmp_peak_window(): + # 120 s total: first 60 s at 100 W, last 60 s at 300 W + pts = [_pt(i, power_w=100) for i in range(60)] + pts += [_pt(i, power_w=300) for i in range(60, 121)] + result = compute_mmp(pts, _ts(0)) + assert result is not None + by_dur = {d: w for d, w in result} + # 1 s MMP should be 300 (last segment) + assert by_dur[1] == 300 + # 60 s MMP: best 60-second window is the last 60 s at 300 W + assert by_dur[60] == 300 + + +def test_mmp_activity_shorter_than_all_durations(): + # Only 5 seconds of data + pts = [_pt(i, power_w=200) for i in range(6)] + result = compute_mmp(pts, _ts(0)) + assert result is not None + durations = [d for d, _ in result] + # Should only include durations ≤ 5 s + assert all(d <= 5 for d in durations) + assert 60 not in durations + + +# ── best efforts ───────────────────────────────────────────────────────────── + +def test_fastest_time_for_distance_exact(): + # 36 km/h for 100 s = 1 km exactly (36/3600 * 100 = 1.0 with no fp issues) + speed_1hz = [36.0] * 100 + t = _fastest_time_for_distance(speed_1hz, 1.0) + assert t is not None + assert t <= 100 + + +def test_fastest_time_for_distance_target_not_reached(): + # Only 0.5 km of data at 10 km/h + speed_1hz = [10.0] * 180 + t = _fastest_time_for_distance(speed_1hz, 1.0) + assert t is None + + +def test_fastest_time_picks_fastest_window(): + # First 200 s at 1 km/h (barely moving), then 100 s at 36 km/h (= 1 km) + speed_1hz = [1.0] * 200 + [36.0] * 100 + t = _fastest_time_for_distance(speed_1hz, 1.0) + # The fast window can cover 1 km; the slow window alone cannot. + # Algorithm uses inclusive right-left+1 counting so result may be 100 or 101. + assert t is not None + assert t <= 101 + + +def test_best_efforts_running(): + # 15 km/h for 3600 s = 15 km — should cover 1 km, 5 km, 10 km targets + pts = [_pt(i, lat=48.0 + i * 0.0001, lon=11.0, speed_kmh=15.0) for i in range(3601)] + efforts, _ = compute_best_efforts(pts, _ts(0), "running") + assert efforts is not None + covered = [d for d, _ in efforts] + assert 1.0 in covered + assert 5.0 in covered + assert 10.0 in covered + # 42.195 km not reachable in 3600 s at 15 km/h + assert 42.195 not in covered + + +def test_best_efforts_no_targets_for_sport(): + pts = [_pt(i, lat=48.0, lon=11.0) for i in range(100)] + efforts, _ = compute_best_efforts(pts, _ts(0), "hiking") + assert efforts is None + + +# ── best climb ──────────────────────────────────────────────────────────────── + +def test_best_climb_simple_ascent(): + # 0 → 100 m with no gaps + ele = [float(i) for i in range(101)] + result = _best_climb(ele) + assert result == 100.0 + + +def test_best_climb_with_descent(): + # Up 50, down 20, up 80 → best contiguous window = 80 + ele = list(range(0, 51)) + list(range(50, 30, -1)) + list(range(30, 111)) + result = _best_climb(ele) + assert result is not None + assert result >= 80.0 + + +def test_best_climb_none_gap_resets_window(): + # 50 m up, then a GPS gap, then 30 m up — windows don't bridge the gap + ele: list = list(range(0, 51)) + [None] + list(range(0, 31)) + result = _best_climb(ele) + assert result == 50.0 + + +def test_best_climb_only_descent(): + ele = [100.0, 80.0, 60.0, 40.0] + result = _best_climb(ele) + assert result is None + + +def test_best_climb_too_few_samples(): + assert _best_climb([]) is None + assert _best_climb([100.0]) is None diff --git a/tests/test_simplify.py b/tests/test_simplify.py new file mode 100644 index 0000000..b1385d7 --- /dev/null +++ b/tests/test_simplify.py @@ -0,0 +1,212 @@ +"""Tests for bincio.extract.simplify.""" + +from datetime import datetime, timezone + +import pytest + +from bincio.extract.models import DataPoint +from bincio.extract.simplify import ( + _rdp_mask, + build_geojson, + preview_coords, + simplify_track, +) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _ts(i: int = 0) -> datetime: + from datetime import timedelta + return datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + timedelta(seconds=i) + + +def _pt(lat: float, lon: float, ele: float | None = None, spd: float | None = None, i: int = 0) -> DataPoint: + return DataPoint(timestamp=_ts(i), lat=lat, lon=lon, elevation_m=ele, speed_kmh=spd) + + +def _pts_no_gps(n: int) -> list[DataPoint]: + return [DataPoint(timestamp=_ts(i)) for i in range(n)] + + +# ── _rdp_mask ───────────────────────────────────────────────────────────────── + +def test_rdp_mask_collinear_removes_middle(): + # Three collinear points — the middle one should be dropped + coords = [[0.0, 0.0], [0.5, 0.0], [1.0, 0.0]] + mask = _rdp_mask(coords, epsilon=0.001) + assert mask[0] is True + assert mask[1] is False # middle collinear point removed + assert mask[2] is True + + +def test_rdp_mask_always_keeps_endpoints(): + coords = [[0.0, 0.0], [0.5, 1.0], [1.0, 0.0]] + mask = _rdp_mask(coords, epsilon=0.001) + assert mask[0] is True + assert mask[-1] is True + + +def test_rdp_mask_large_deviation_kept(): + # Middle point is far off the line — must be kept + coords = [[0.0, 0.0], [0.5, 1.0], [1.0, 0.0]] + mask = _rdp_mask(coords, epsilon=0.001) + assert mask[1] is True + + +def test_rdp_mask_single_point(): + mask = _rdp_mask([[0.0, 0.0]], epsilon=0.001) + assert mask == [True] + + +def test_rdp_mask_two_points(): + mask = _rdp_mask([[0.0, 0.0], [1.0, 1.0]], epsilon=0.001) + assert mask == [True, True] + + +def test_rdp_mask_epsilon_zero_keeps_all(): + coords = [[float(i), 0.0] for i in range(5)] + mask = _rdp_mask(coords, epsilon=0.0) + assert all(mask) + + +# ── simplify_track ──────────────────────────────────────────────────────────── + +def test_simplify_track_collinear_removes_interior(): + # Straight line — only endpoints should survive with epsilon > 0 + pts = [_pt(48.0 + i * 0.001, 11.0, i=i) for i in range(5)] + result = simplify_track(pts, epsilon=0.0001) + # Endpoints always kept; interior collinear points dropped + assert result[0].lat == pytest.approx(48.0) + assert result[-1].lat == pytest.approx(48.004) + assert len(result) < len(pts) + + +def test_simplify_track_corner_kept(): + # L-shaped route — the corner must survive + pts = [ + _pt(48.000, 11.000, i=0), + _pt(48.001, 11.000, i=1), # going north + _pt(48.002, 11.000, i=2), + _pt(48.002, 11.001, i=3), # turn east — this is the corner + _pt(48.002, 11.002, i=4), + ] + result = simplify_track(pts, epsilon=0.0001) + latlons = [(p.lat, p.lon) for p in result] + assert (48.002, 11.000) in latlons # corner kept + + +def test_simplify_track_no_gps_points(): + pts = _pts_no_gps(10) + result = simplify_track(pts) + assert result == [] + + +def test_simplify_track_single_gps_point(): + pts = [_pt(48.0, 11.0)] + result = simplify_track(pts) + assert len(result) == 1 + + +def test_simplify_track_preserves_data_point_fields(): + pts = [ + _pt(48.0, 11.0, ele=100.0, spd=20.0, i=0), + _pt(48.0, 12.0, ele=200.0, spd=30.0, i=1), + ] + result = simplify_track(pts) + assert result[0].elevation_m == 100.0 + assert result[0].speed_kmh == 20.0 + + +# ── preview_coords ──────────────────────────────────────────────────────────── + +def test_preview_coords_none_on_no_gps(): + result = preview_coords(_pts_no_gps(10)) + assert result is None + + +def test_preview_coords_single_point_none(): + result = preview_coords([_pt(48.0, 11.0)]) + assert result is None + + +def test_preview_coords_respects_max_points(): + pts = [_pt(48.0 + i * 0.001, 11.0, i=i) for i in range(100)] + result = preview_coords(pts, max_points=10) + assert result is not None + assert len(result) <= 10 + + +def test_preview_coords_format(): + pts = [_pt(48.123456789, 11.987654321, i=0), _pt(48.2, 12.0, i=1)] + result = preview_coords(pts) + assert result is not None + for coord in result: + assert len(coord) == 2 + # Rounded to 5 decimal places + assert coord[0] == round(coord[0], 5) + assert coord[1] == round(coord[1], 5) + + +def test_preview_coords_few_points_returned_all(): + pts = [_pt(48.0, 11.0, i=0), _pt(48.1, 11.1, i=1), _pt(48.2, 11.2, i=2)] + result = preview_coords(pts, max_points=20) + assert result is not None + assert len(result) >= 2 + + +# ── build_geojson ───────────────────────────────────────────────────────────── + +def test_build_geojson_structure(): + pts = [_pt(48.0, 11.0, ele=100.0, spd=20.0, i=i) for i in range(3)] + gj = build_geojson(pts, activity_id="test-123") + assert gj["type"] == "Feature" + assert gj["geometry"]["type"] == "LineString" + assert "coordinates" in gj["geometry"] + assert "properties" in gj + props = gj["properties"] + assert props["id"] == "test-123" + assert props["simplification"] == "rdp" + assert "speeds" in props + assert "point_count_simplified" in props + + +def test_build_geojson_coordinates_order(): + # GeoJSON uses [lon, lat, ele] + pts = [_pt(48.0, 11.0, ele=100.0, i=0), _pt(48.5, 11.5, ele=200.0, i=1)] + gj = build_geojson(pts, "act") + coords = gj["geometry"]["coordinates"] + assert len(coords) == 2 + # First coord: lon=11.0, lat=48.0, ele=100.0 + assert coords[0] == [11.0, 48.0, 100.0] + + +def test_build_geojson_no_elevation_omits_z(): + pts = [_pt(48.0, 11.0, ele=None, i=0), _pt(48.5, 11.5, ele=None, i=1)] + gj = build_geojson(pts, "act") + coords = gj["geometry"]["coordinates"] + for c in coords: + assert len(c) == 2 # no Z + + +def test_build_geojson_speeds_parallel(): + pts = [ + _pt(48.0, 11.0, spd=10.0, i=0), + _pt(48.5, 11.5, spd=None, i=1), + _pt(49.0, 12.0, spd=20.0, i=2), + ] + gj = build_geojson(pts, "act") + speeds = gj["properties"]["speeds"] + coords = gj["geometry"]["coordinates"] + assert len(speeds) == len(coords) + + +def test_build_geojson_point_counts(): + pts = [_pt(48.0 + i * 0.001, 11.0, i=i) for i in range(10)] + gj = build_geojson(pts, "act", original_count=100) + assert gj["properties"]["point_count_original"] == 100 + assert gj["properties"]["point_count_simplified"] <= 10 + + +def test_build_geojson_no_gps_points(): + gj = build_geojson(_pts_no_gps(5), "act") + assert gj["geometry"]["coordinates"] == []