Files
Davide Scaini df496a017f fix: refine hysteresis recalculation with MA pre-smoothing and lower thresholds
- dem.py: pre-smooth elevation with 30s moving average before hysteresis
  in recalculate_elevation_hysteresis(); thresholds drop from 5m/10m to
  1m (barometric) / 3m (GPS) — accurate after noise is smoothed out
- dem.py: widen DEM median-filter window 45s → 60s
- dem.py: rename response key source → altitude_source for consistency
- writer.py: write altitude_source into detail JSON at extract time
- tests/test_dem.py: 21 unit tests for pure functions and file-level hysteresis
- tests/test_edit_server.py: 11 TestClient API tests for both recalculate endpoints
- add httpx as dev dependency (required by FastAPI TestClient)
2026-04-22 10:57:28 +02:00

383 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""DEM (Digital Elevation Model) lookup and elevation recalculation.
Queries any Open-Elevation-compatible HTTP API to replace noisy GPS altitude
with terrain altitude, then re-applies hysteresis-based accumulation.
Compatible APIs:
- https://api.open-elevation.com (free, SRTM, accepts large batches)
- https://api.opentopodata.org/v1/srtm30m (more reliable, max 100 pts/req)
Pass the base URL (without path) to bincio serve/edit via --dem-url.
"""
from __future__ import annotations
import json
import statistics
import urllib.error
import urllib.request
from pathlib import Path
from typing import Optional
# Sample one GPS point per N seconds when building the DEM query.
# SRTM30 resolution is ~30 m; at 30 km/h cycling that's ~3 s per tile —
# sampling every 10 s is more than enough.
_DEFAULT_SAMPLE_INTERVAL_S = 10
# Maximum locations per API request. Open-Elevation supports ~512 per POST.
_DEFAULT_BATCH_SIZE = 512
# Hysteresis threshold after DEM correction.
# SRTM30 at 1 Hz produces tile-boundary steps of ~13 m every few seconds.
# A 5 m threshold barely filters them; 10 m suppresses them reliably.
_DEM_HYSTERESIS_M = 10.0
# Median filter window (seconds / samples at 1 Hz) applied to DEM-interpolated
# series before hysteresis. 45 s smooths SRTM tile steps while keeping real
# climbs (typical cycling ramp > 100 m over > 2 min).
_MEDIAN_WINDOW_S = 60
# Moving-average window (seconds) applied to the 1 Hz elevation series before
# hysteresis in the on-demand recalculation. Pre-smoothing lets us use a
# much lower dead-band (capturing real small climbs) while still suppressing
# GPS jitter and barometric quantization noise.
_MA_WINDOW_S = 30
def _moving_average(values: list[float], window: int) -> list[float]:
"""Apply a centred sliding-window moving average to *values*.
Edge handling: window shrinks symmetrically at both ends (same effective
behaviour as scipy's 'nearest' / numpy's 'reflect' mode).
"""
half = window // 2
n = len(values)
out: list[float] = []
cumsum = [0.0] * (n + 1)
for i, v in enumerate(values):
cumsum[i + 1] = cumsum[i] + v
for i in range(n):
lo = max(0, i - half)
hi = min(n, i + half + 1)
out.append((cumsum[hi] - cumsum[lo]) / (hi - lo))
return out
def _median_filter(values: list[float], window: int) -> list[float]:
"""Apply a sliding-window median filter to *values*.
The window is centred on each sample; edges are handled by shrinking the
window symmetrically (same as scipy's 'reflect' / 'nearest' default).
"""
half = window // 2
n = len(values)
out: list[float] = []
for i in range(n):
lo = max(0, i - half)
hi = min(n, i + half + 1)
out.append(statistics.median(values[lo:hi]))
return out
def lookup_elevations(
latlons: list[tuple[float, float]],
api_url: str,
batch_size: int = _DEFAULT_BATCH_SIZE,
timeout_s: int = 30,
) -> list[Optional[float]]:
"""Query a DEM API for terrain elevation at the given (lat, lon) pairs.
Uses the Open-Elevation API format::
POST {api_url}/api/v1/lookup
{"locations": [{"latitude": lat, "longitude": lon}, ...]}
Returns a list the same length as *latlons*. Elements are ``None``
wherever the API returned no data (network error, ocean, etc.).
"""
if not latlons:
return []
results: list[Optional[float]] = [None] * len(latlons)
url = f"{api_url.rstrip('/')}/api/v1/lookup"
for start in range(0, len(latlons), batch_size):
batch = latlons[start : start + batch_size]
payload = json.dumps(
{"locations": [{"latitude": lat, "longitude": lon} for lat, lon in batch]}
).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "application/json", "Accept": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
data = json.loads(resp.read().decode("utf-8"))
for i, item in enumerate(data.get("results", [])):
elev = item.get("elevation")
if elev is not None:
results[start + i] = float(elev)
except (urllib.error.URLError, json.JSONDecodeError, KeyError, ValueError):
pass # leave this batch as None; caller checks overall coverage
return results
def _hysteresis_gain_loss(
elevations: list[float], threshold_m: float
) -> tuple[float, float]:
"""Compute elevation gain and loss using a hysteresis dead-band.
Only commits a new elevation level when it differs from the last committed
value by at least *threshold_m*. Returns (gain, loss) in metres, both
as positive numbers.
"""
gain = loss = 0.0
committed = elevations[0]
for e in elevations[1:]:
diff = e - committed
if abs(diff) >= threshold_m:
if diff > 0:
gain += diff
else:
loss += abs(diff)
committed = e
return gain, loss
def recalculate_elevation(
user_dir: Path,
activity_id: str,
dem_url: str,
sample_interval_s: int = _DEFAULT_SAMPLE_INTERVAL_S,
) -> dict:
"""Replace GPS elevation with DEM terrain elevation and recompute gain/loss.
Algorithm
---------
1. Read the activity's 1 Hz timeseries for lat / lon / t arrays.
2. Subsample GPS points every *sample_interval_s* seconds.
3. Query the DEM API for those points (batched).
4. Linearly interpolate DEM elevation back to every GPS-valid second.
5. Apply a 45 s median filter to smooth SRTM tile-boundary noise.
6. Apply :data:`_DEM_HYSTERESIS_M` (10 m) hysteresis to compute gain/loss.
7. Preserve the original elevation as ``elevation_m_original`` in the
timeseries (only on the first DEM run — never overwrites a prior backup).
8. Write the smoothed DEM elevation array as ``elevation_m``.
9. Patch ``elevation_gain_m`` / ``elevation_loss_m`` in the detail JSON.
10. Patch ``elevation_gain_m`` in ``index.json`` (summary entry for feed).
Returns
-------
dict with keys ``elevation_gain_m``, ``elevation_loss_m``,
``points_queried`` (DEM responses received).
Raises
------
FileNotFoundError
Activity detail or timeseries file not found.
ValueError
Activity has no GPS coordinates, or the DEM API returned too few results.
"""
acts_dir = user_dir / "activities"
json_path = acts_dir / f"{activity_id}.json"
ts_path = acts_dir / f"{activity_id}.timeseries.json"
if not json_path.exists():
raise FileNotFoundError(f"Activity not found: {activity_id}")
if not ts_path.exists():
raise ValueError("Activity has no timeseries data")
ts = json.loads(ts_path.read_text(encoding="utf-8"))
lat_arr: list[Optional[float]] = ts.get("lat") or []
lon_arr: list[Optional[float]] = ts.get("lon") or []
t_arr: list[int] = ts.get("t") or []
if not lat_arr or not lon_arr:
raise ValueError(
"Activity has no GPS coordinates "
"(privacy=no_gps or data recorded without GPS)"
)
n = len(t_arr)
# ── 1. Subsample GPS-valid indices ────────────────────────────────────────
gps_idx = [
i for i in range(n)
if lat_arr[i] is not None and lon_arr[i] is not None
]
if len(gps_idx) < 2:
raise ValueError("Too few GPS points for DEM lookup")
sampled_idx = gps_idx[::sample_interval_s]
if gps_idx[-1] not in sampled_idx:
sampled_idx.append(gps_idx[-1]) # always include the last point
# ── 2. Query DEM ──────────────────────────────────────────────────────────
latlons = [(float(lat_arr[i]), float(lon_arr[i])) for i in sampled_idx] # type: ignore[arg-type]
dem_elev = lookup_elevations(latlons, dem_url)
# Build (t, elevation) pairs for valid DEM responses only
valid_pairs: list[tuple[int, float]] = [
(t_arr[sampled_idx[k]], dem_elev[k])
for k in range(len(sampled_idx))
if dem_elev[k] is not None
]
n_queried = len(valid_pairs)
if n_queried < 2:
raise ValueError(
f"DEM API returned too few results "
f"({n_queried} of {len(sampled_idx)} points). "
f"Check the DEM URL: {dem_url}"
)
# ── 3. Linear interpolation → full 1 Hz series ───────────────────────────
new_ele: list[Optional[float]] = [None] * n
j = 0
for i in gps_idx:
t = t_arr[i]
while j + 1 < len(valid_pairs) - 1 and valid_pairs[j + 1][0] <= t:
j += 1
t0, e0 = valid_pairs[j]
if j + 1 < len(valid_pairs):
t1, e1 = valid_pairs[j + 1]
frac = max(0.0, min(1.0, (t - t0) / (t1 - t0))) if t1 > t0 else 0.0
new_ele[i] = round(e0 + frac * (e1 - e0), 1)
else:
new_ele[i] = round(e0, 1)
# ── 4. Median filter to suppress SRTM tile-boundary steps ────────────────
valid_indices = [i for i, e in enumerate(new_ele) if e is not None]
valid_eles_raw = [new_ele[i] for i in valid_indices] # type: ignore[misc]
smoothed = _median_filter(valid_eles_raw, _MEDIAN_WINDOW_S) # type: ignore[arg-type]
# Write smoothed values back into new_ele
for idx, e in zip(valid_indices, smoothed):
new_ele[idx] = round(e, 1)
# ── 5. Hysteresis accumulation on smoothed series ─────────────────────────
smoothed_valid = [e for e in new_ele if e is not None]
gain, loss = _hysteresis_gain_loss(smoothed_valid, _DEM_HYSTERESIS_M) # type: ignore[arg-type]
gain_r = round(gain, 1)
loss_r = round(loss, 1)
# ── 6. Preserve original elevation (only if not already backed up) ────────
if "elevation_m_original" not in ts:
ts["elevation_m_original"] = ts.get("elevation_m")
# ── 7. Write timeseries ───────────────────────────────────────────────────
ts["elevation_m"] = new_ele
ts_path.write_text(json.dumps(ts, indent=2, ensure_ascii=False), encoding="utf-8")
# ── 8. Patch activity detail JSON ─────────────────────────────────────────
detail = json.loads(json_path.read_text(encoding="utf-8"))
detail["elevation_gain_m"] = gain_r
detail["elevation_loss_m"] = loss_r
json_path.write_text(json.dumps(detail, indent=2, ensure_ascii=False), encoding="utf-8")
# ── 9. Patch index.json summary ───────────────────────────────────────────
index_path = user_dir / "index.json"
if index_path.exists():
index = json.loads(index_path.read_text(encoding="utf-8"))
for s in index.get("activities", []):
if s.get("id") == activity_id:
s["elevation_gain_m"] = gain_r
break
index_path.write_text(
json.dumps(index, indent=2, ensure_ascii=False), encoding="utf-8"
)
return {
"elevation_gain_m": gain_r,
"elevation_loss_m": loss_r,
"points_queried": n_queried,
}
def recalculate_elevation_hysteresis(user_dir: Path, activity_id: str) -> dict:
"""Recompute elevation gain/loss from the original recorded elevation data.
Algorithm
---------
1. Read ``elevation_m_original`` (backup from a prior DEM run) if present,
otherwise read ``elevation_m`` from the timeseries.
2. Apply a :data:`_MA_WINDOW_S` (30 s) moving average to smooth out
barometric quantization steps and GPS jitter.
3. Apply a low dead-band threshold to the smoothed series:
- **1 m** for barometric altimeters (FIT files with ``enhanced_altitude``)
- **3 m** for GPS-derived altitude (GPX, TCX, FIT without enhanced_altitude)
The 30 s pre-smoothing makes the low thresholds safe: after averaging,
0.2 m barometric quantization noise and short-period GPS jitter are
suppressed below the threshold, while real terrain changes (which persist
across the window) are preserved.
The elevation array in the timeseries is **not** modified — only the
summary stats in the detail JSON and ``index.json`` are patched.
``altitude_source`` is read from the detail JSON (written by the extractor
for activities recorded after this field was added). For older activities
it falls back to ``"unknown"`` → 3 m GPS threshold.
Returns
-------
dict with keys ``elevation_gain_m``, ``elevation_loss_m``,
``threshold_m``, ``altitude_source``.
"""
acts_dir = user_dir / "activities"
json_path = acts_dir / f"{activity_id}.json"
ts_path = acts_dir / f"{activity_id}.timeseries.json"
if not json_path.exists():
raise FileNotFoundError(f"Activity not found: {activity_id}")
if not ts_path.exists():
raise ValueError("Activity has no timeseries data")
ts = json.loads(ts_path.read_text(encoding="utf-8"))
# Prefer the pre-DEM backup; fall back to the current elevation array
ele_arr: list[Optional[float]] = (
ts.get("elevation_m_original") or ts.get("elevation_m") or []
)
elevations = [e for e in ele_arr if e is not None]
if len(elevations) < 2:
raise ValueError("Not enough elevation data to compute gain/loss")
# Determine source-aware threshold
detail = json.loads(json_path.read_text(encoding="utf-8"))
altitude_source = detail.get("altitude_source", "unknown")
threshold = 1.0 if altitude_source == "barometric" else 3.0
# Pre-smooth to suppress noise, then accumulate with low dead-band
smoothed = _moving_average(elevations, _MA_WINDOW_S)
gain, loss = _hysteresis_gain_loss(smoothed, threshold)
gain_r = round(gain, 1)
loss_r = round(loss, 1)
# Patch detail JSON
detail["elevation_gain_m"] = gain_r
detail["elevation_loss_m"] = loss_r
json_path.write_text(json.dumps(detail, indent=2, ensure_ascii=False), encoding="utf-8")
# Patch index.json summary
index_path = user_dir / "index.json"
if index_path.exists():
index = json.loads(index_path.read_text(encoding="utf-8"))
for s in index.get("activities", []):
if s.get("id") == activity_id:
s["elevation_gain_m"] = gain_r
break
index_path.write_text(
json.dumps(index, indent=2, ensure_ascii=False), encoding="utf-8"
)
return {
"elevation_gain_m": gain_r,
"elevation_loss_m": loss_r,
"threshold_m": threshold,
"altitude_source": altitude_source,
}