fix: refine hysteresis recalculation with MA pre-smoothing and lower thresholds
- dem.py: pre-smooth elevation with 30s moving average before hysteresis in recalculate_elevation_hysteresis(); thresholds drop from 5m/10m to 1m (barometric) / 3m (GPS) — accurate after noise is smoothed out - dem.py: widen DEM median-filter window 45s → 60s - dem.py: rename response key source → altitude_source for consistency - writer.py: write altitude_source into detail JSON at extract time - tests/test_dem.py: 21 unit tests for pure functions and file-level hysteresis - tests/test_edit_server.py: 11 TestClient API tests for both recalculate endpoints - add httpx as dev dependency (required by FastAPI TestClient)
This commit is contained in:
@@ -0,0 +1,232 @@
|
||||
"""Tests for bincio.extract.dem — pure functions and file-level hysteresis.
|
||||
|
||||
No API calls, no extract pipeline, no large data.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from bincio.extract.dem import (
|
||||
_hysteresis_gain_loss,
|
||||
_median_filter,
|
||||
_moving_average,
|
||||
recalculate_elevation_hysteresis,
|
||||
)
|
||||
|
||||
|
||||
# ── _moving_average ───────────────────────────────────────────────────────────
|
||||
|
||||
def test_moving_average_flat():
|
||||
data = [5.0] * 20
|
||||
result = _moving_average(data, 5)
|
||||
assert result == pytest.approx(data)
|
||||
|
||||
|
||||
def test_moving_average_ramp():
|
||||
# A perfect ramp should be preserved (MA of linear is linear).
|
||||
data = [float(i) for i in range(20)]
|
||||
result = _moving_average(data, 5)
|
||||
# Interior points should be exact; edges shrink the window so they may
|
||||
# differ slightly — just check the middle is right.
|
||||
for i in range(2, 18):
|
||||
assert result[i] == pytest.approx(data[i], abs=1e-9)
|
||||
|
||||
|
||||
def test_moving_average_spike():
|
||||
# A single spike should be strongly attenuated.
|
||||
data = [100.0] * 60
|
||||
data[30] = 200.0 # +100 m spike
|
||||
result = _moving_average(data, 30)
|
||||
# At the spike position the average over 30 samples pulls it down a lot
|
||||
assert result[30] < 110.0
|
||||
|
||||
|
||||
def test_moving_average_length_preserved():
|
||||
data = [1.0, 2.0, 3.0, 4.0, 5.0]
|
||||
assert len(_moving_average(data, 3)) == 5
|
||||
|
||||
|
||||
def test_moving_average_single():
|
||||
assert _moving_average([42.0], 5) == [42.0]
|
||||
|
||||
|
||||
# ── _median_filter ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_median_filter_flat():
|
||||
data = [10.0] * 30
|
||||
assert _median_filter(data, 5) == pytest.approx(data)
|
||||
|
||||
|
||||
def test_median_filter_spike_removed():
|
||||
data = [100.0] * 61
|
||||
data[30] = 300.0 # outlier spike
|
||||
result = _median_filter(data, 45)
|
||||
# The spike should be completely removed by the median
|
||||
assert result[30] == pytest.approx(100.0)
|
||||
|
||||
|
||||
def test_median_filter_length_preserved():
|
||||
data = list(range(10, 20, 1))
|
||||
assert len(_median_filter([float(x) for x in data], 5)) == 10
|
||||
|
||||
|
||||
# ── _hysteresis_gain_loss ─────────────────────────────────────────────────────
|
||||
|
||||
def test_hysteresis_flat():
|
||||
data = [100.0] * 100
|
||||
gain, loss = _hysteresis_gain_loss(data, 5.0)
|
||||
assert gain == 0.0
|
||||
assert loss == 0.0
|
||||
|
||||
|
||||
def test_hysteresis_single_climb():
|
||||
# 50 m climb, well above any threshold.
|
||||
data = [0.0] * 50 + [50.0] * 50
|
||||
gain, loss = _hysteresis_gain_loss(data, 5.0)
|
||||
assert gain == pytest.approx(50.0)
|
||||
assert loss == pytest.approx(0.0)
|
||||
|
||||
|
||||
def test_hysteresis_up_and_down():
|
||||
data = [0.0, 20.0, 0.0]
|
||||
gain, loss = _hysteresis_gain_loss(data, 5.0)
|
||||
assert gain == pytest.approx(20.0)
|
||||
assert loss == pytest.approx(20.0)
|
||||
|
||||
|
||||
def test_hysteresis_noise_suppressed():
|
||||
# Oscillation below threshold → nothing accumulates.
|
||||
data = [100.0 + (3.0 if i % 2 == 0 else 0.0) for i in range(100)]
|
||||
gain, loss = _hysteresis_gain_loss(data, 5.0)
|
||||
assert gain == 0.0
|
||||
assert loss == 0.0
|
||||
|
||||
|
||||
def test_hysteresis_noise_passes_low_threshold():
|
||||
# Same oscillation does accumulate with a threshold below it.
|
||||
data = [100.0 + (3.0 if i % 2 == 0 else 0.0) for i in range(100)]
|
||||
gain, loss = _hysteresis_gain_loss(data, 1.0)
|
||||
assert gain > 0.0
|
||||
|
||||
|
||||
def test_hysteresis_both_positive():
|
||||
data = [0.0, 30.0, 10.0, 40.0]
|
||||
gain, loss = _hysteresis_gain_loss(data, 5.0)
|
||||
assert gain > 0.0
|
||||
assert loss > 0.0
|
||||
|
||||
|
||||
# ── recalculate_elevation_hysteresis (file-level) ─────────────────────────────
|
||||
|
||||
def _write_activity(tmp_path: Path, activity_id: str, elevations: list[float],
|
||||
altitude_source: str = "barometric",
|
||||
with_original_backup: bool = False) -> Path:
|
||||
"""Write minimal activity + timeseries JSON files for testing."""
|
||||
acts = tmp_path / "activities"
|
||||
acts.mkdir()
|
||||
|
||||
detail = {
|
||||
"id": activity_id,
|
||||
"elevation_gain_m": 0.0,
|
||||
"elevation_loss_m": 0.0,
|
||||
"altitude_source": altitude_source,
|
||||
}
|
||||
(acts / f"{activity_id}.json").write_text(json.dumps(detail))
|
||||
|
||||
ts: dict = {"t": list(range(len(elevations))), "elevation_m": elevations}
|
||||
if with_original_backup:
|
||||
ts["elevation_m_original"] = elevations
|
||||
(acts / f"{activity_id}.timeseries.json").write_text(json.dumps(ts))
|
||||
|
||||
return tmp_path
|
||||
|
||||
|
||||
def test_hysteresis_recalc_barometric(tmp_path):
|
||||
# Long ramp (1800 s = 30 min, +1 m/s) so the 30s MA edge effect is small.
|
||||
# Edge effect ≈ window/2 metres on each side = ~15 m total on 1800 m climb.
|
||||
elevations = [float(i) for i in range(1801)] # 0→1800 m
|
||||
_write_activity(tmp_path, "test-act", elevations, altitude_source="barometric")
|
||||
|
||||
result = recalculate_elevation_hysteresis(tmp_path, "test-act")
|
||||
|
||||
assert result["altitude_source"] == "barometric"
|
||||
assert result["threshold_m"] == pytest.approx(1.0)
|
||||
# Edge effect is ≤1% on a 30-min ramp
|
||||
assert result["elevation_gain_m"] == pytest.approx(1800.0, rel=0.02)
|
||||
assert result["elevation_loss_m"] == pytest.approx(0.0, abs=1.0)
|
||||
|
||||
|
||||
def test_hysteresis_recalc_gps(tmp_path):
|
||||
elevations = [float(i) for i in range(1801)]
|
||||
_write_activity(tmp_path, "test-act", elevations, altitude_source="gps")
|
||||
|
||||
result = recalculate_elevation_hysteresis(tmp_path, "test-act")
|
||||
|
||||
assert result["threshold_m"] == pytest.approx(3.0)
|
||||
assert result["elevation_gain_m"] == pytest.approx(1800.0, rel=0.02)
|
||||
|
||||
|
||||
def test_hysteresis_recalc_uses_original_backup(tmp_path):
|
||||
# Simulate: DEM already replaced elevation_m with flat terrain,
|
||||
# but elevation_m_original holds the real barometric climb.
|
||||
acts = tmp_path / "activities"
|
||||
acts.mkdir()
|
||||
aid = "test-act"
|
||||
|
||||
original = [float(i) for i in range(1801)] # real 1800 m climb
|
||||
dem_flat = [900.0] * 1801 # DEM said flat
|
||||
|
||||
detail = {"id": aid, "elevation_gain_m": 0.0, "elevation_loss_m": 0.0,
|
||||
"altitude_source": "barometric"}
|
||||
(acts / f"{aid}.json").write_text(json.dumps(detail))
|
||||
|
||||
ts = {"t": list(range(1801)), "elevation_m": dem_flat,
|
||||
"elevation_m_original": original}
|
||||
(acts / f"{aid}.timeseries.json").write_text(json.dumps(ts))
|
||||
|
||||
result = recalculate_elevation_hysteresis(tmp_path, aid)
|
||||
|
||||
# Should use the original backup (1800 m climb), not the flat DEM array (0 m)
|
||||
assert result["elevation_gain_m"] == pytest.approx(1800.0, rel=0.02)
|
||||
|
||||
|
||||
def test_hysteresis_recalc_patches_detail_json(tmp_path):
|
||||
elevations = [float(i) for i in range(101)]
|
||||
_write_activity(tmp_path, "test-act", elevations)
|
||||
|
||||
recalculate_elevation_hysteresis(tmp_path, "test-act")
|
||||
|
||||
detail = json.loads((tmp_path / "activities" / "test-act.json").read_text())
|
||||
assert "elevation_gain_m" in detail
|
||||
assert detail["elevation_gain_m"] > 0
|
||||
|
||||
|
||||
def test_hysteresis_recalc_patches_index(tmp_path):
|
||||
elevations = [float(i) for i in range(101)]
|
||||
_write_activity(tmp_path, "test-act", elevations)
|
||||
|
||||
index = {"activities": [{"id": "test-act", "elevation_gain_m": 0.0}]}
|
||||
(tmp_path / "index.json").write_text(json.dumps(index))
|
||||
|
||||
recalculate_elevation_hysteresis(tmp_path, "test-act")
|
||||
|
||||
updated = json.loads((tmp_path / "index.json").read_text())
|
||||
assert updated["activities"][0]["elevation_gain_m"] > 0
|
||||
|
||||
|
||||
def test_hysteresis_recalc_missing_activity(tmp_path):
|
||||
(tmp_path / "activities").mkdir()
|
||||
with pytest.raises(FileNotFoundError):
|
||||
recalculate_elevation_hysteresis(tmp_path, "nonexistent")
|
||||
|
||||
|
||||
def test_hysteresis_recalc_no_timeseries(tmp_path):
|
||||
acts = tmp_path / "activities"
|
||||
acts.mkdir()
|
||||
(acts / "test-act.json").write_text(json.dumps({"id": "test-act"}))
|
||||
with pytest.raises(ValueError, match="timeseries"):
|
||||
recalculate_elevation_hysteresis(tmp_path, "test-act")
|
||||
@@ -0,0 +1,158 @@
|
||||
"""API tests for the /recalculate-elevation/* endpoints in bincio.edit.server.
|
||||
|
||||
Uses httpx TestClient — no real network, no uvicorn process.
|
||||
The module-level `data_dir` variable is patched to a tmp_path fixture.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
import bincio.edit.server as edit_server
|
||||
from bincio.edit.server import app
|
||||
|
||||
CLIENT = TestClient(app, raise_server_exceptions=False)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_activity(
|
||||
data_dir: Path,
|
||||
activity_id: str,
|
||||
elevations: list[float],
|
||||
altitude_source: str = "barometric",
|
||||
elevation_m_original: list[float] | None = None,
|
||||
) -> None:
|
||||
acts = data_dir / "activities"
|
||||
acts.mkdir(exist_ok=True)
|
||||
|
||||
detail = {
|
||||
"id": activity_id,
|
||||
"elevation_gain_m": 0.0,
|
||||
"elevation_loss_m": 0.0,
|
||||
"altitude_source": altitude_source,
|
||||
}
|
||||
(acts / f"{activity_id}.json").write_text(json.dumps(detail))
|
||||
|
||||
ts: dict = {"t": list(range(len(elevations))), "elevation_m": elevations}
|
||||
if elevation_m_original is not None:
|
||||
ts["elevation_m_original"] = elevation_m_original
|
||||
(acts / f"{activity_id}.timeseries.json").write_text(json.dumps(ts))
|
||||
|
||||
# Minimal index.json so merge_one doesn't crash
|
||||
index_path = data_dir / "index.json"
|
||||
if not index_path.exists():
|
||||
index_path.write_text(json.dumps({"activities": [
|
||||
{"id": activity_id, "elevation_gain_m": 0.0}
|
||||
]}))
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_data_dir(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(edit_server, "data_dir", tmp_path)
|
||||
return tmp_path
|
||||
|
||||
|
||||
# ── /recalculate-elevation/hysteresis ─────────────────────────────────────────
|
||||
|
||||
class TestHysteresisEndpoint:
|
||||
AID = "2024-01-01T080000Z-test-climb"
|
||||
|
||||
def test_returns_200_with_gain_loss(self, tmp_path):
|
||||
elevations = [float(i) for i in range(1801)]
|
||||
_make_activity(tmp_path, self.AID, elevations, altitude_source="barometric")
|
||||
|
||||
r = CLIENT.post(f"/api/activity/{self.AID}/recalculate-elevation/hysteresis")
|
||||
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert "elevation_gain_m" in body
|
||||
assert "elevation_loss_m" in body
|
||||
assert body["elevation_gain_m"] > 0
|
||||
assert body["altitude_source"] == "barometric"
|
||||
assert body["threshold_m"] == pytest.approx(1.0)
|
||||
|
||||
def test_gps_source_uses_3m_threshold(self, tmp_path):
|
||||
elevations = [float(i) for i in range(1801)]
|
||||
_make_activity(tmp_path, self.AID, elevations, altitude_source="gps")
|
||||
|
||||
r = CLIENT.post(f"/api/activity/{self.AID}/recalculate-elevation/hysteresis")
|
||||
|
||||
assert r.status_code == 200
|
||||
assert r.json()["threshold_m"] == pytest.approx(3.0)
|
||||
|
||||
def test_unknown_source_falls_back_to_gps_threshold(self, tmp_path):
|
||||
elevations = [float(i) for i in range(1801)]
|
||||
_make_activity(tmp_path, self.AID, elevations, altitude_source="unknown")
|
||||
|
||||
r = CLIENT.post(f"/api/activity/{self.AID}/recalculate-elevation/hysteresis")
|
||||
|
||||
assert r.status_code == 200
|
||||
assert r.json()["threshold_m"] == pytest.approx(3.0)
|
||||
|
||||
def test_uses_original_elevation_when_dem_backup_present(self, tmp_path):
|
||||
original = [float(i) for i in range(1801)] # real 1800 m climb
|
||||
dem_flat = [900.0] * 1801 # DEM flattened it
|
||||
_make_activity(tmp_path, self.AID, dem_flat,
|
||||
altitude_source="barometric",
|
||||
elevation_m_original=original)
|
||||
|
||||
r = CLIENT.post(f"/api/activity/{self.AID}/recalculate-elevation/hysteresis")
|
||||
|
||||
assert r.status_code == 200
|
||||
assert r.json()["elevation_gain_m"] == pytest.approx(1800.0, rel=0.02)
|
||||
|
||||
def test_patches_detail_json_on_disk(self, tmp_path):
|
||||
elevations = [float(i) for i in range(1801)]
|
||||
_make_activity(tmp_path, self.AID, elevations)
|
||||
|
||||
CLIENT.post(f"/api/activity/{self.AID}/recalculate-elevation/hysteresis")
|
||||
|
||||
detail = json.loads(
|
||||
(tmp_path / "activities" / f"{self.AID}.json").read_text()
|
||||
)
|
||||
assert detail["elevation_gain_m"] > 0
|
||||
|
||||
def test_404_for_missing_activity(self, tmp_path):
|
||||
(tmp_path / "activities").mkdir()
|
||||
r = CLIENT.post("/api/activity/2024-01-01T080000Z-no-such/recalculate-elevation/hysteresis")
|
||||
assert r.status_code == 404
|
||||
|
||||
def test_422_for_missing_timeseries(self, tmp_path):
|
||||
acts = tmp_path / "activities"
|
||||
acts.mkdir()
|
||||
aid = self.AID
|
||||
(acts / f"{aid}.json").write_text(json.dumps({"id": aid, "altitude_source": "gps"}))
|
||||
# No timeseries file
|
||||
|
||||
r = CLIENT.post(f"/api/activity/{aid}/recalculate-elevation/hysteresis")
|
||||
assert r.status_code == 422
|
||||
|
||||
def test_400_for_invalid_id(self):
|
||||
r = CLIENT.post("/api/activity/../etc/passwd/recalculate-elevation/hysteresis")
|
||||
assert r.status_code in (400, 404, 422)
|
||||
|
||||
|
||||
# ── /recalculate-elevation/dem ────────────────────────────────────────────────
|
||||
|
||||
class TestDemEndpoint:
|
||||
AID = "2024-01-01T080000Z-test-climb"
|
||||
|
||||
def test_503_when_dem_url_not_configured(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(edit_server, "dem_url", "")
|
||||
r = CLIENT.post(f"/api/activity/{self.AID}/recalculate-elevation/dem")
|
||||
assert r.status_code == 503
|
||||
|
||||
def test_404_for_missing_activity(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(edit_server, "dem_url", "https://api.open-elevation.com")
|
||||
(tmp_path / "activities").mkdir()
|
||||
r = CLIENT.post("/api/activity/2024-01-01T080000Z-no-such/recalculate-elevation/dem")
|
||||
assert r.status_code == 404
|
||||
|
||||
def test_400_for_invalid_id(self, monkeypatch):
|
||||
monkeypatch.setattr(edit_server, "dem_url", "https://api.open-elevation.com")
|
||||
r = CLIENT.post("/api/activity/../../evil/recalculate-elevation/dem")
|
||||
assert r.status_code in (400, 404, 422)
|
||||
Reference in New Issue
Block a user