"""DEM (Digital Elevation Model) lookup and elevation recalculation. Queries any Open-Elevation-compatible HTTP API to replace noisy GPS altitude with terrain altitude, then re-applies hysteresis-based accumulation. Compatible APIs: - https://api.open-elevation.com (free, SRTM, accepts large batches) - https://api.opentopodata.org/v1/srtm30m (more reliable, max 100 pts/req) Pass the base URL (without path) to bincio serve/edit via --dem-url. """ from __future__ import annotations import json import urllib.error import urllib.request from pathlib import Path from typing import Optional # Sample one GPS point per N seconds when building the DEM query. # SRTM30 resolution is ~30 m; at 30 km/h cycling that's ~3 s per tile — # sampling every 10 s is more than enough. _DEFAULT_SAMPLE_INTERVAL_S = 10 # Maximum locations per API request. Open-Elevation supports ~512 per POST. _DEFAULT_BATCH_SIZE = 512 # Hysteresis threshold after DEM correction. DEM data is already smooth # (30 m horizontal resolution) so 5 m is a generous dead-band. _DEM_HYSTERESIS_M = 5.0 def lookup_elevations( latlons: list[tuple[float, float]], api_url: str, batch_size: int = _DEFAULT_BATCH_SIZE, timeout_s: int = 30, ) -> list[Optional[float]]: """Query a DEM API for terrain elevation at the given (lat, lon) pairs. Uses the Open-Elevation API format:: POST {api_url}/api/v1/lookup {"locations": [{"latitude": lat, "longitude": lon}, ...]} Returns a list the same length as *latlons*. Elements are ``None`` wherever the API returned no data (network error, ocean, etc.). """ if not latlons: return [] results: list[Optional[float]] = [None] * len(latlons) url = f"{api_url.rstrip('/')}/api/v1/lookup" for start in range(0, len(latlons), batch_size): batch = latlons[start : start + batch_size] payload = json.dumps( {"locations": [{"latitude": lat, "longitude": lon} for lat, lon in batch]} ).encode("utf-8") req = urllib.request.Request( url, data=payload, headers={"Content-Type": "application/json", "Accept": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: data = json.loads(resp.read().decode("utf-8")) for i, item in enumerate(data.get("results", [])): elev = item.get("elevation") if elev is not None: results[start + i] = float(elev) except (urllib.error.URLError, json.JSONDecodeError, KeyError, ValueError): pass # leave this batch as None; caller checks overall coverage return results def recalculate_elevation( user_dir: Path, activity_id: str, dem_url: str, sample_interval_s: int = _DEFAULT_SAMPLE_INTERVAL_S, ) -> dict: """Replace GPS elevation with DEM terrain elevation and recompute gain/loss. Algorithm --------- 1. Read the activity's 1 Hz timeseries for lat / lon / t arrays. 2. Subsample GPS points every *sample_interval_s* seconds. 3. Query the DEM API for those points (batched). 4. Linearly interpolate DEM elevation back to every GPS-valid second. 5. Apply :data:`_DEM_HYSTERESIS_M` hysteresis to compute gain / loss. 6. Write the updated ``elevation_m`` array to the timeseries JSON. 7. Patch ``elevation_gain_m`` / ``elevation_loss_m`` in the detail JSON. 8. Patch ``elevation_gain_m`` in ``index.json`` (summary entry for the feed). Returns ------- dict with keys ``elevation_gain_m``, ``elevation_loss_m``, ``points_queried`` (DEM responses received). Raises ------ FileNotFoundError Activity detail or timeseries file not found. ValueError Activity has no GPS coordinates, or the DEM API returned too few results. """ acts_dir = user_dir / "activities" json_path = acts_dir / f"{activity_id}.json" ts_path = acts_dir / f"{activity_id}.timeseries.json" if not json_path.exists(): raise FileNotFoundError(f"Activity not found: {activity_id}") if not ts_path.exists(): raise ValueError("Activity has no timeseries data") ts = json.loads(ts_path.read_text(encoding="utf-8")) lat_arr: list[Optional[float]] = ts.get("lat") or [] lon_arr: list[Optional[float]] = ts.get("lon") or [] t_arr: list[int] = ts.get("t") or [] if not lat_arr or not lon_arr: raise ValueError( "Activity has no GPS coordinates " "(privacy=no_gps or data recorded without GPS)" ) n = len(t_arr) # ── 1. Subsample GPS-valid indices ──────────────────────────────────────── gps_idx = [ i for i in range(n) if lat_arr[i] is not None and lon_arr[i] is not None ] if len(gps_idx) < 2: raise ValueError("Too few GPS points for DEM lookup") sampled_idx = gps_idx[::sample_interval_s] if gps_idx[-1] not in sampled_idx: sampled_idx.append(gps_idx[-1]) # always include the last point # ── 2. Query DEM ────────────────────────────────────────────────────────── latlons = [(float(lat_arr[i]), float(lon_arr[i])) for i in sampled_idx] # type: ignore[arg-type] dem_elev = lookup_elevations(latlons, dem_url) # Build (t, elevation) pairs for valid DEM responses only valid_pairs: list[tuple[int, float]] = [ (t_arr[sampled_idx[k]], dem_elev[k]) for k in range(len(sampled_idx)) if dem_elev[k] is not None ] n_queried = len(valid_pairs) if n_queried < 2: raise ValueError( f"DEM API returned too few results " f"({n_queried} of {len(sampled_idx)} points). " f"Check the DEM URL: {dem_url}" ) # ── 3. Linear interpolation → full 1 Hz series ─────────────────────────── new_ele: list[Optional[float]] = [None] * n j = 0 for i in gps_idx: t = t_arr[i] # Advance j so valid_pairs[j] is the left anchor for t while j + 1 < len(valid_pairs) - 1 and valid_pairs[j + 1][0] <= t: j += 1 t0, e0 = valid_pairs[j] if j + 1 < len(valid_pairs): t1, e1 = valid_pairs[j + 1] frac = max(0.0, min(1.0, (t - t0) / (t1 - t0))) if t1 > t0 else 0.0 new_ele[i] = round(e0 + frac * (e1 - e0), 1) else: new_ele[i] = round(e0, 1) # ── 4. Hysteresis accumulation ──────────────────────────────────────────── valid_eles = [e for e in new_ele if e is not None] gain = loss = 0.0 committed = valid_eles[0] for e in valid_eles[1:]: diff = e - committed if abs(diff) >= _DEM_HYSTERESIS_M: if diff > 0: gain += diff else: loss += diff committed = e gain_r = round(gain, 1) loss_r = round(abs(loss), 1) # ── 5. Write timeseries ─────────────────────────────────────────────────── ts["elevation_m"] = new_ele ts_path.write_text(json.dumps(ts, indent=2, ensure_ascii=False), encoding="utf-8") # ── 6. Patch activity detail JSON ───────────────────────────────────────── detail = json.loads(json_path.read_text(encoding="utf-8")) detail["elevation_gain_m"] = gain_r detail["elevation_loss_m"] = loss_r json_path.write_text(json.dumps(detail, indent=2, ensure_ascii=False), encoding="utf-8") # ── 7. Patch index.json summary (so merge_all picks up the new value) ───── index_path = user_dir / "index.json" if index_path.exists(): index = json.loads(index_path.read_text(encoding="utf-8")) for s in index.get("activities", []): if s.get("id") == activity_id: s["elevation_gain_m"] = gain_r break index_path.write_text( json.dumps(index, indent=2, ensure_ascii=False), encoding="utf-8" ) return { "elevation_gain_m": gain_r, "elevation_loss_m": loss_r, "points_queried": n_queried, }