Files
Davide Scaini df496a017f fix: refine hysteresis recalculation with MA pre-smoothing and lower thresholds
- dem.py: pre-smooth elevation with 30s moving average before hysteresis
  in recalculate_elevation_hysteresis(); thresholds drop from 5m/10m to
  1m (barometric) / 3m (GPS) — accurate after noise is smoothed out
- dem.py: widen DEM median-filter window 45s → 60s
- dem.py: rename response key source → altitude_source for consistency
- writer.py: write altitude_source into detail JSON at extract time
- tests/test_dem.py: 21 unit tests for pure functions and file-level hysteresis
- tests/test_edit_server.py: 11 TestClient API tests for both recalculate endpoints
- add httpx as dev dependency (required by FastAPI TestClient)
2026-04-22 10:57:28 +02:00

233 lines
8.0 KiB
Python

"""Tests for bincio.extract.dem — pure functions and file-level hysteresis.
No API calls, no extract pipeline, no large data.
"""
from __future__ import annotations
import json
import math
from pathlib import Path
import pytest
from bincio.extract.dem import (
_hysteresis_gain_loss,
_median_filter,
_moving_average,
recalculate_elevation_hysteresis,
)
# ── _moving_average ───────────────────────────────────────────────────────────
def test_moving_average_flat():
data = [5.0] * 20
result = _moving_average(data, 5)
assert result == pytest.approx(data)
def test_moving_average_ramp():
# A perfect ramp should be preserved (MA of linear is linear).
data = [float(i) for i in range(20)]
result = _moving_average(data, 5)
# Interior points should be exact; edges shrink the window so they may
# differ slightly — just check the middle is right.
for i in range(2, 18):
assert result[i] == pytest.approx(data[i], abs=1e-9)
def test_moving_average_spike():
# A single spike should be strongly attenuated.
data = [100.0] * 60
data[30] = 200.0 # +100 m spike
result = _moving_average(data, 30)
# At the spike position the average over 30 samples pulls it down a lot
assert result[30] < 110.0
def test_moving_average_length_preserved():
data = [1.0, 2.0, 3.0, 4.0, 5.0]
assert len(_moving_average(data, 3)) == 5
def test_moving_average_single():
assert _moving_average([42.0], 5) == [42.0]
# ── _median_filter ────────────────────────────────────────────────────────────
def test_median_filter_flat():
data = [10.0] * 30
assert _median_filter(data, 5) == pytest.approx(data)
def test_median_filter_spike_removed():
data = [100.0] * 61
data[30] = 300.0 # outlier spike
result = _median_filter(data, 45)
# The spike should be completely removed by the median
assert result[30] == pytest.approx(100.0)
def test_median_filter_length_preserved():
data = list(range(10, 20, 1))
assert len(_median_filter([float(x) for x in data], 5)) == 10
# ── _hysteresis_gain_loss ─────────────────────────────────────────────────────
def test_hysteresis_flat():
data = [100.0] * 100
gain, loss = _hysteresis_gain_loss(data, 5.0)
assert gain == 0.0
assert loss == 0.0
def test_hysteresis_single_climb():
# 50 m climb, well above any threshold.
data = [0.0] * 50 + [50.0] * 50
gain, loss = _hysteresis_gain_loss(data, 5.0)
assert gain == pytest.approx(50.0)
assert loss == pytest.approx(0.0)
def test_hysteresis_up_and_down():
data = [0.0, 20.0, 0.0]
gain, loss = _hysteresis_gain_loss(data, 5.0)
assert gain == pytest.approx(20.0)
assert loss == pytest.approx(20.0)
def test_hysteresis_noise_suppressed():
# Oscillation below threshold → nothing accumulates.
data = [100.0 + (3.0 if i % 2 == 0 else 0.0) for i in range(100)]
gain, loss = _hysteresis_gain_loss(data, 5.0)
assert gain == 0.0
assert loss == 0.0
def test_hysteresis_noise_passes_low_threshold():
# Same oscillation does accumulate with a threshold below it.
data = [100.0 + (3.0 if i % 2 == 0 else 0.0) for i in range(100)]
gain, loss = _hysteresis_gain_loss(data, 1.0)
assert gain > 0.0
def test_hysteresis_both_positive():
data = [0.0, 30.0, 10.0, 40.0]
gain, loss = _hysteresis_gain_loss(data, 5.0)
assert gain > 0.0
assert loss > 0.0
# ── recalculate_elevation_hysteresis (file-level) ─────────────────────────────
def _write_activity(tmp_path: Path, activity_id: str, elevations: list[float],
altitude_source: str = "barometric",
with_original_backup: bool = False) -> Path:
"""Write minimal activity + timeseries JSON files for testing."""
acts = tmp_path / "activities"
acts.mkdir()
detail = {
"id": activity_id,
"elevation_gain_m": 0.0,
"elevation_loss_m": 0.0,
"altitude_source": altitude_source,
}
(acts / f"{activity_id}.json").write_text(json.dumps(detail))
ts: dict = {"t": list(range(len(elevations))), "elevation_m": elevations}
if with_original_backup:
ts["elevation_m_original"] = elevations
(acts / f"{activity_id}.timeseries.json").write_text(json.dumps(ts))
return tmp_path
def test_hysteresis_recalc_barometric(tmp_path):
# Long ramp (1800 s = 30 min, +1 m/s) so the 30s MA edge effect is small.
# Edge effect ≈ window/2 metres on each side = ~15 m total on 1800 m climb.
elevations = [float(i) for i in range(1801)] # 0→1800 m
_write_activity(tmp_path, "test-act", elevations, altitude_source="barometric")
result = recalculate_elevation_hysteresis(tmp_path, "test-act")
assert result["altitude_source"] == "barometric"
assert result["threshold_m"] == pytest.approx(1.0)
# Edge effect is ≤1% on a 30-min ramp
assert result["elevation_gain_m"] == pytest.approx(1800.0, rel=0.02)
assert result["elevation_loss_m"] == pytest.approx(0.0, abs=1.0)
def test_hysteresis_recalc_gps(tmp_path):
elevations = [float(i) for i in range(1801)]
_write_activity(tmp_path, "test-act", elevations, altitude_source="gps")
result = recalculate_elevation_hysteresis(tmp_path, "test-act")
assert result["threshold_m"] == pytest.approx(3.0)
assert result["elevation_gain_m"] == pytest.approx(1800.0, rel=0.02)
def test_hysteresis_recalc_uses_original_backup(tmp_path):
# Simulate: DEM already replaced elevation_m with flat terrain,
# but elevation_m_original holds the real barometric climb.
acts = tmp_path / "activities"
acts.mkdir()
aid = "test-act"
original = [float(i) for i in range(1801)] # real 1800 m climb
dem_flat = [900.0] * 1801 # DEM said flat
detail = {"id": aid, "elevation_gain_m": 0.0, "elevation_loss_m": 0.0,
"altitude_source": "barometric"}
(acts / f"{aid}.json").write_text(json.dumps(detail))
ts = {"t": list(range(1801)), "elevation_m": dem_flat,
"elevation_m_original": original}
(acts / f"{aid}.timeseries.json").write_text(json.dumps(ts))
result = recalculate_elevation_hysteresis(tmp_path, aid)
# Should use the original backup (1800 m climb), not the flat DEM array (0 m)
assert result["elevation_gain_m"] == pytest.approx(1800.0, rel=0.02)
def test_hysteresis_recalc_patches_detail_json(tmp_path):
elevations = [float(i) for i in range(101)]
_write_activity(tmp_path, "test-act", elevations)
recalculate_elevation_hysteresis(tmp_path, "test-act")
detail = json.loads((tmp_path / "activities" / "test-act.json").read_text())
assert "elevation_gain_m" in detail
assert detail["elevation_gain_m"] > 0
def test_hysteresis_recalc_patches_index(tmp_path):
elevations = [float(i) for i in range(101)]
_write_activity(tmp_path, "test-act", elevations)
index = {"activities": [{"id": "test-act", "elevation_gain_m": 0.0}]}
(tmp_path / "index.json").write_text(json.dumps(index))
recalculate_elevation_hysteresis(tmp_path, "test-act")
updated = json.loads((tmp_path / "index.json").read_text())
assert updated["activities"][0]["elevation_gain_m"] > 0
def test_hysteresis_recalc_missing_activity(tmp_path):
(tmp_path / "activities").mkdir()
with pytest.raises(FileNotFoundError):
recalculate_elevation_hysteresis(tmp_path, "nonexistent")
def test_hysteresis_recalc_no_timeseries(tmp_path):
acts = tmp_path / "activities"
acts.mkdir()
(acts / "test-act.json").write_text(json.dumps({"id": "test-act"}))
with pytest.raises(ValueError, match="timeseries"):
recalculate_elevation_hysteresis(tmp_path, "test-act")