Files
Davide Scaini 84eff1f3b0 perf: spatial 10 m downsampling for timeseries
Extract _haversine_m from the inline block in _gps_speed_kmh, add
_spatial_downsample (keep one sample per 10 m traveled, GPS haversine
primary / speed×Δt fallback, indoor activities unchanged), and wire it
into build_timeseries() after the 1 s dedup loop.

Add --downsample-timeseries migration flag to bincio render that applies
the same downsampling to existing stored timeseries files without
re-extracting from original FIT/GPX files.
2026-05-19 20:11:00 +02:00

160 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Downsample a list of DataPoints to at most 1 sample/second and build
the BAS timeseries object (parallel arrays)."""
from datetime import datetime
from math import atan2, cos, radians, sin, sqrt
from typing import Optional
from bincio.extract.models import DataPoint
def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance in metres between two GPS points."""
dlat = radians(lat2 - lat1)
dlon = radians(lon2 - lon1)
a = sin(dlat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2) ** 2
return 2 * 6_371_000.0 * atan2(sqrt(a), sqrt(1 - a))
_SPATIAL_RESOLUTION_M = 10.0
def _spatial_downsample(
sampled: list[DataPoint],
resolution_m: float = _SPATIAL_RESOLUTION_M,
) -> list[DataPoint]:
"""Keep one sample per `resolution_m` of cumulative distance traveled.
Distance source priority:
1. GPS haversine (lat/lon present on both consecutive points)
2. speed_kmh × Δt (fallback when GPS absent or gapped)
If neither source is available (indoor, no speed data), returns `sampled`
unchanged. Always retains the first and last points.
"""
if len(sampled) < 2:
return sampled
has_gps = any(p.lat is not None and p.lon is not None for p in sampled)
has_speed = any(p.speed_kmh is not None for p in sampled)
if not has_gps and not has_speed:
return sampled
result: list[DataPoint] = [sampled[0]]
cum_dist = 0.0
last_kept = 0.0
prev_speed = 0.0
for i in range(1, len(sampled)):
prev, cur = sampled[i - 1], sampled[i]
dt = (cur.timestamp - prev.timestamp).total_seconds()
if (has_gps
and prev.lat is not None and prev.lon is not None
and cur.lat is not None and cur.lon is not None):
dist_m = _haversine_m(prev.lat, prev.lon, cur.lat, cur.lon)
else:
spd = cur.speed_kmh if cur.speed_kmh is not None else prev_speed
dist_m = (spd / 3.6) * max(dt, 0)
if cur.speed_kmh is not None:
prev_speed = cur.speed_kmh
cum_dist += dist_m
if cum_dist - last_kept >= resolution_m:
result.append(cur)
last_kept = cum_dist
if result[-1] is not sampled[-1]:
result.append(sampled[-1])
return result
def _gps_speed_kmh(
lat_vals: list[Optional[float]],
lon_vals: list[Optional[float]],
ts_vals: list[int],
) -> list[Optional[float]]:
"""Compute speed (km/h) from consecutive GPS coordinates via haversine.
Applies a 5-point centred moving-average to reduce GPS noise.
"""
n = len(ts_vals)
raw: list[Optional[float]] = [None] * n
for i in range(1, n):
la0, lo0 = lat_vals[i - 1], lon_vals[i - 1]
la1, lo1 = lat_vals[i], lon_vals[i]
dt = ts_vals[i] - ts_vals[i - 1]
if la0 is None or lo0 is None or la1 is None or lo1 is None or dt <= 0:
continue
d_km = _haversine_m(la0, lo0, la1, lo1) / 1000.0
raw[i] = d_km / dt * 3600.0
# 5-point centred moving average (skip None anchors)
half = 2
smoothed: list[Optional[float]] = [None] * n
for i in range(n):
vals = [raw[j] for j in range(max(0, i - half), min(n, i + half + 1)) if raw[j] is not None]
if vals:
smoothed[i] = round(sum(vals) / len(vals), 2)
return smoothed
def build_timeseries(
points: list[DataPoint],
started_at: datetime,
privacy: str = "public",
) -> dict:
"""Return the BAS `timeseries` object.
privacy='no_gps' → lat/lon set to null. All other privacy levels
(including 'unlisted') retain GPS in the timeseries.
Downsamples so at most one point per second is emitted.
"""
if not points:
return {"t": []}
include_gps = privacy not in ("no_gps", "private") # "private" = legacy alias for "unlisted"
# Downsample: keep at most one point per second
sampled: list[DataPoint] = []
last_t: Optional[int] = None
for p in points:
t = int((p.timestamp - started_at).total_seconds())
if t < 0:
continue
if last_t is not None and t <= last_t:
continue # skip sub-second duplicates and non-monotonic points
sampled.append(p)
last_t = t
sampled = _spatial_downsample(sampled)
ts_vals = [int((p.timestamp - started_at).total_seconds()) for p in sampled]
lat_vals = [round(p.lat, 7) if p.lat is not None else None for p in sampled] if include_gps else None
lon_vals = [round(p.lon, 7) if p.lon is not None else None for p in sampled] if include_gps else None
ele_vals = [round(p.elevation_m, 1) if p.elevation_m is not None else None for p in sampled]
spd_vals = [round(p.speed_kmh, 2) if p.speed_kmh is not None else None for p in sampled]
# Derive speed from GPS when the device didn't record per-second speed.
if include_gps and lat_vals and lon_vals and all(v is None for v in spd_vals):
spd_vals = _gps_speed_kmh(lat_vals, lon_vals, ts_vals)
hr_vals = [p.hr_bpm for p in sampled]
cad_vals = [p.cadence_rpm for p in sampled]
pwr_vals = [p.power_w for p in sampled]
tmp_vals = [round(p.temperature_c, 1) if p.temperature_c is not None else None for p in sampled]
result: dict = {
"t": ts_vals,
"lat": lat_vals,
"lon": lon_vals,
"elevation_m": ele_vals,
"speed_kmh": spd_vals,
"hr_bpm": hr_vals,
"cadence_rpm": cad_vals,
"power_w": pwr_vals,
"temperature_c": tmp_vals,
}
return result