"""Downsample a list of DataPoints to at most 1 sample/second and build the BAS timeseries object (parallel arrays).""" from datetime import datetime from math import atan2, cos, radians, sin, sqrt from typing import Optional from bincio.extract.models import DataPoint def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Great-circle distance in metres between two GPS points.""" dlat = radians(lat2 - lat1) dlon = radians(lon2 - lon1) a = sin(dlat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2) ** 2 return 2 * 6_371_000.0 * atan2(sqrt(a), sqrt(1 - a)) _SPATIAL_RESOLUTION_M = 10.0 def _spatial_downsample( sampled: list[DataPoint], resolution_m: float = _SPATIAL_RESOLUTION_M, ) -> list[DataPoint]: """Keep one sample per `resolution_m` of cumulative distance traveled. Distance source priority: 1. GPS haversine (lat/lon present on both consecutive points) 2. speed_kmh × Δt (fallback when GPS absent or gapped) If neither source is available (indoor, no speed data), returns `sampled` unchanged. Always retains the first and last points. """ if len(sampled) < 2: return sampled has_gps = any(p.lat is not None and p.lon is not None for p in sampled) has_speed = any(p.speed_kmh is not None for p in sampled) if not has_gps and not has_speed: return sampled result: list[DataPoint] = [sampled[0]] cum_dist = 0.0 last_kept = 0.0 prev_speed = 0.0 for i in range(1, len(sampled)): prev, cur = sampled[i - 1], sampled[i] dt = (cur.timestamp - prev.timestamp).total_seconds() if (has_gps and prev.lat is not None and prev.lon is not None and cur.lat is not None and cur.lon is not None): dist_m = _haversine_m(prev.lat, prev.lon, cur.lat, cur.lon) else: spd = cur.speed_kmh if cur.speed_kmh is not None else prev_speed dist_m = (spd / 3.6) * max(dt, 0) if cur.speed_kmh is not None: prev_speed = cur.speed_kmh cum_dist += dist_m if cum_dist - last_kept >= resolution_m: result.append(cur) last_kept = cum_dist if result[-1] is not sampled[-1]: result.append(sampled[-1]) return result def _gps_speed_kmh( lat_vals: list[Optional[float]], lon_vals: list[Optional[float]], ts_vals: list[int], ) -> list[Optional[float]]: """Compute speed (km/h) from consecutive GPS coordinates via haversine. Applies a 5-point centred moving-average to reduce GPS noise. """ n = len(ts_vals) raw: list[Optional[float]] = [None] * n for i in range(1, n): la0, lo0 = lat_vals[i - 1], lon_vals[i - 1] la1, lo1 = lat_vals[i], lon_vals[i] dt = ts_vals[i] - ts_vals[i - 1] if la0 is None or lo0 is None or la1 is None or lo1 is None or dt <= 0: continue d_km = _haversine_m(la0, lo0, la1, lo1) / 1000.0 raw[i] = d_km / dt * 3600.0 # 5-point centred moving average (skip None anchors) half = 2 smoothed: list[Optional[float]] = [None] * n for i in range(n): vals = [raw[j] for j in range(max(0, i - half), min(n, i + half + 1)) if raw[j] is not None] if vals: smoothed[i] = round(sum(vals) / len(vals), 2) return smoothed def build_timeseries( points: list[DataPoint], started_at: datetime, privacy: str = "public", ) -> dict: """Return the BAS `timeseries` object. privacy='no_gps' → lat/lon set to null. All other privacy levels (including 'unlisted') retain GPS in the timeseries. Downsamples so at most one point per second is emitted. """ if not points: return {"t": []} include_gps = privacy not in ("no_gps", "private") # "private" = legacy alias for "unlisted" # Downsample: keep at most one point per second sampled: list[DataPoint] = [] last_t: Optional[int] = None for p in points: t = int((p.timestamp - started_at).total_seconds()) if t < 0: continue if last_t is not None and t <= last_t: continue # skip sub-second duplicates and non-monotonic points sampled.append(p) last_t = t sampled = _spatial_downsample(sampled) ts_vals = [int((p.timestamp - started_at).total_seconds()) for p in sampled] lat_vals = [round(p.lat, 7) if p.lat is not None else None for p in sampled] if include_gps else None lon_vals = [round(p.lon, 7) if p.lon is not None else None for p in sampled] if include_gps else None ele_vals = [round(p.elevation_m, 1) if p.elevation_m is not None else None for p in sampled] spd_vals = [round(p.speed_kmh, 2) if p.speed_kmh is not None else None for p in sampled] # Derive speed from GPS when the device didn't record per-second speed. if include_gps and lat_vals and lon_vals and all(v is None for v in spd_vals): spd_vals = _gps_speed_kmh(lat_vals, lon_vals, ts_vals) hr_vals = [p.hr_bpm for p in sampled] cad_vals = [p.cadence_rpm for p in sampled] pwr_vals = [p.power_w for p in sampled] tmp_vals = [round(p.temperature_c, 1) if p.temperature_c is not None else None for p in sampled] result: dict = { "t": ts_vals, "lat": lat_vals, "lon": lon_vals, "elevation_m": ele_vals, "speed_kmh": spd_vals, "hr_bpm": hr_vals, "cadence_rpm": cad_vals, "power_w": pwr_vals, "temperature_c": tmp_vals, } return result