Files
bincio-activity/bincio/extract/timeseries.py
T
Davide Scaini 14a4a0b994 Activity detail: layout refactor + GPS-derived speed for map coloring
Layout: map + charts stacked left, stats panel (2-col) on the right.
Cadence moved to last stat. Charts sit directly below the map.

Speed coloring: most FIT files don't record per-second speed, leaving
timeseries speed_kmh all-null and the hover link dead. Fix: derive speed
from consecutive GPS coordinates (haversine + 5-pt moving average) when
the device didn't record it. Add --backfill-speed render flag to retrofit
existing timeseries files.
2026-05-16 23:24:29 +02:00

99 lines
3.6 KiB
Python

"""Downsample a list of DataPoints to at most 1 sample/second and build
the BAS timeseries object (parallel arrays)."""
from datetime import datetime
from math import atan2, cos, radians, sin, sqrt
from typing import Optional
from bincio.extract.models import DataPoint
def _gps_speed_kmh(
lat_vals: list[Optional[float]],
lon_vals: list[Optional[float]],
ts_vals: list[int],
) -> list[Optional[float]]:
"""Compute speed (km/h) from consecutive GPS coordinates via haversine.
Applies a 5-point centred moving-average to reduce GPS noise.
"""
n = len(ts_vals)
raw: list[Optional[float]] = [None] * n
for i in range(1, n):
la0, lo0 = lat_vals[i - 1], lon_vals[i - 1]
la1, lo1 = lat_vals[i], lon_vals[i]
dt = ts_vals[i] - ts_vals[i - 1]
if la0 is None or lo0 is None or la1 is None or lo1 is None or dt <= 0:
continue
dlat = radians(la1 - la0)
dlon = radians(lo1 - lo0)
a = sin(dlat / 2) ** 2 + cos(radians(la0)) * cos(radians(la1)) * sin(dlon / 2) ** 2
d_km = 2 * 6371.0 * atan2(sqrt(a), sqrt(1 - a))
raw[i] = d_km / dt * 3600.0
# 5-point centred moving average (skip None anchors)
half = 2
smoothed: list[Optional[float]] = [None] * n
for i in range(n):
vals = [raw[j] for j in range(max(0, i - half), min(n, i + half + 1)) if raw[j] is not None]
if vals:
smoothed[i] = round(sum(vals) / len(vals), 2)
return smoothed
def build_timeseries(
points: list[DataPoint],
started_at: datetime,
privacy: str = "public",
) -> dict:
"""Return the BAS `timeseries` object.
privacy='no_gps' → lat/lon set to null. All other privacy levels
(including 'unlisted') retain GPS in the timeseries.
Downsamples so at most one point per second is emitted.
"""
if not points:
return {"t": []}
include_gps = privacy not in ("no_gps", "private") # "private" = legacy alias for "unlisted"
# Downsample: keep at most one point per second
sampled: list[DataPoint] = []
last_t: Optional[int] = None
for p in points:
t = int((p.timestamp - started_at).total_seconds())
if t < 0:
continue
if last_t is not None and t <= last_t:
continue # skip sub-second duplicates and non-monotonic points
sampled.append(p)
last_t = t
ts_vals = [int((p.timestamp - started_at).total_seconds()) for p in sampled]
lat_vals = [round(p.lat, 7) if p.lat is not None else None for p in sampled] if include_gps else None
lon_vals = [round(p.lon, 7) if p.lon is not None else None for p in sampled] if include_gps else None
ele_vals = [round(p.elevation_m, 1) if p.elevation_m is not None else None for p in sampled]
spd_vals = [round(p.speed_kmh, 2) if p.speed_kmh is not None else None for p in sampled]
# Derive speed from GPS when the device didn't record per-second speed.
if include_gps and lat_vals and lon_vals and all(v is None for v in spd_vals):
spd_vals = _gps_speed_kmh(lat_vals, lon_vals, ts_vals)
hr_vals = [p.hr_bpm for p in sampled]
cad_vals = [p.cadence_rpm for p in sampled]
pwr_vals = [p.power_w for p in sampled]
tmp_vals = [round(p.temperature_c, 1) if p.temperature_c is not None else None for p in sampled]
result: dict = {
"t": ts_vals,
"lat": lat_vals,
"lon": lon_vals,
"elevation_m": ele_vals,
"speed_kmh": spd_vals,
"hr_bpm": hr_vals,
"cadence_rpm": cad_vals,
"power_w": pwr_vals,
"temperature_c": tmp_vals,
}
return result