308 lines
11 KiB
Python
308 lines
11 KiB
Python
"""Segment effort detection.
|
|
|
|
Matches GPS tracks against stored segment polylines and produces SegmentEffort
|
|
records. Works from either a live ParsedActivity (ingest path) or from a
|
|
stored timeseries JSON (retroactive path).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from bincio.segments.models import Segment, SegmentEffort
|
|
|
|
# ── tuning constants ──────────────────────────────────────────────────────────
|
|
|
|
MATCH_RADIUS_M = 25 # max distance to segment start/end to open/close an effort
|
|
CONFORMANCE_MAX_DEV_M = 50 # max allowed deviation for each interior segment point
|
|
CONFORMANCE_MAX_FRAC = 0.30 # max fraction of interior points allowed to deviate
|
|
|
|
# Minimum geometric speed (segment_distance / elapsed_s) per sport, in m/s.
|
|
# Rejects false matches from long circuit rides where the track passes the
|
|
# segment start early and the segment end hours later.
|
|
_MIN_SPEED_MS: dict[str, float] = {
|
|
'cycling': 2.0, # ~7.2 km/h — below any realistic cyclist even on brutal climbs
|
|
'running': 0.8, # ~2.9 km/h
|
|
}
|
|
_MIN_SPEED_DEFAULT = 0.3 # hiking / walking / unknown
|
|
|
|
# Maximum geometric speed per sport in m/s — rejects GPS glitch matches.
|
|
_MAX_SPEED_MS: dict[str, float] = {
|
|
'cycling': 30.0, # ~108 km/h
|
|
'running': 12.0, # ~43 km/h
|
|
}
|
|
_MAX_SPEED_DEFAULT = 20.0
|
|
|
|
# ── fast distance approximation ───────────────────────────────────────────────
|
|
|
|
_R = 6_371_000.0 # Earth radius in metres
|
|
|
|
|
|
def _dist(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""Equirectangular approximation — fast, accurate to <0.1% within 100 km."""
|
|
dlat = math.radians(lat2 - lat1)
|
|
dlon = math.radians(lon2 - lon1)
|
|
mlat = math.radians((lat1 + lat2) / 2.0)
|
|
return math.hypot(dlat * _R, dlon * _R * math.cos(mlat))
|
|
|
|
|
|
# ── activity track representation ────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class ActivityTrack:
|
|
"""Common internal representation for detection, independent of source format."""
|
|
activity_id: str
|
|
sport: str
|
|
started_at: datetime
|
|
# Parallel arrays — all same length, GPS-only points (lat/lon not None).
|
|
lats: list[float]
|
|
lons: list[float]
|
|
times: list[int] # seconds from started_at
|
|
speeds: list[Optional[float]]
|
|
hrs: list[Optional[int]]
|
|
powers: list[Optional[int]]
|
|
bbox: list[float] = field(default_factory=list) # [lon_min, lat_min, lon_max, lat_max]
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.lats and not self.bbox:
|
|
self.bbox = [
|
|
min(self.lons), min(self.lats),
|
|
max(self.lons), max(self.lats),
|
|
]
|
|
|
|
|
|
def track_from_parsed(parsed: "ParsedActivity", activity_id: str) -> Optional[ActivityTrack]: # noqa: F821
|
|
"""Build an ActivityTrack from a ParsedActivity (used during ingest)."""
|
|
lats, lons, times, speeds, hrs, powers = [], [], [], [], [], []
|
|
last_t = -1
|
|
for p in parsed.points:
|
|
if p.lat is None or p.lon is None:
|
|
continue
|
|
t = int((p.timestamp - parsed.started_at).total_seconds())
|
|
if t < 0 or t == last_t:
|
|
continue
|
|
last_t = t
|
|
lats.append(p.lat)
|
|
lons.append(p.lon)
|
|
times.append(t)
|
|
speeds.append(p.speed_kmh)
|
|
hrs.append(p.hr_bpm)
|
|
powers.append(p.power_w)
|
|
if len(lats) < 2:
|
|
return None
|
|
return ActivityTrack(
|
|
activity_id=activity_id,
|
|
sport=parsed.sport,
|
|
started_at=parsed.started_at,
|
|
lats=lats, lons=lons, times=times,
|
|
speeds=speeds, hrs=hrs, powers=powers,
|
|
)
|
|
|
|
|
|
def track_from_timeseries_json(
|
|
ts: dict,
|
|
activity_id: str,
|
|
sport: str,
|
|
started_at: datetime,
|
|
) -> Optional[ActivityTrack]:
|
|
"""Build an ActivityTrack from a stored timeseries JSON dict."""
|
|
raw_lats = ts.get("lat") or []
|
|
raw_lons = ts.get("lon") or []
|
|
raw_t = ts.get("t") or []
|
|
raw_spd = ts.get("speed_kmh") or []
|
|
raw_hr = ts.get("hr_bpm") or []
|
|
raw_pwr = ts.get("power_w") or []
|
|
n = len(raw_t)
|
|
if n < 2 or not raw_lats or len(raw_lats) != n:
|
|
return None
|
|
|
|
def _pad(arr: list, length: int) -> list:
|
|
return arr + [None] * (length - len(arr))
|
|
|
|
raw_spd = _pad(raw_spd, n)
|
|
raw_hr = _pad(raw_hr, n)
|
|
raw_pwr = _pad(raw_pwr, n)
|
|
|
|
lats, lons, times, speeds, hrs, powers = [], [], [], [], [], []
|
|
for i in range(n):
|
|
if raw_lats[i] is None or raw_lons[i] is None:
|
|
continue
|
|
lats.append(float(raw_lats[i]))
|
|
lons.append(float(raw_lons[i]))
|
|
times.append(int(raw_t[i]))
|
|
speeds.append(raw_spd[i])
|
|
hrs.append(raw_hr[i])
|
|
powers.append(raw_pwr[i])
|
|
|
|
if len(lats) < 2:
|
|
return None
|
|
return ActivityTrack(
|
|
activity_id=activity_id,
|
|
sport=sport,
|
|
started_at=started_at,
|
|
lats=lats, lons=lons, times=times,
|
|
speeds=speeds, hrs=hrs, powers=powers,
|
|
)
|
|
|
|
|
|
# ── effort metric helpers ─────────────────────────────────────────────────────
|
|
|
|
def _avg_nonnull(vals: list, lo: int, hi: int) -> Optional[float]:
|
|
nums = [v for v in vals[lo:hi + 1] if v is not None]
|
|
return sum(nums) / len(nums) if nums else None
|
|
|
|
|
|
def _np_power(powers: list[Optional[int]], lo: int, hi: int) -> Optional[int]:
|
|
"""Coggan NP from a slice of 1Hz power data (may have gaps/nulls)."""
|
|
WIN = 30
|
|
chunk = powers[lo:hi + 1]
|
|
filled = [v if v is not None else 0 for v in chunk]
|
|
n = len(filled)
|
|
if n < WIN:
|
|
# Too short for rolling average — just return avg power.
|
|
non_null = [v for v in chunk if v is not None]
|
|
return int(round(sum(non_null) / len(non_null))) if non_null else None
|
|
half = WIN // 2
|
|
window_sum = sum(filled[:WIN])
|
|
fourth_powers = []
|
|
for i in range(half, n - half):
|
|
fourth_powers.append((window_sum / WIN) ** 4)
|
|
if i + half + 1 < n:
|
|
window_sum += filled[i + half + 1] - filled[i - half]
|
|
if not fourth_powers:
|
|
return None
|
|
return int(round((sum(fourth_powers) / len(fourth_powers)) ** 0.25))
|
|
|
|
|
|
# ── detection algorithm ───────────────────────────────────────────────────────
|
|
|
|
def _bboxes_overlap(a: list[float], b: list[float]) -> bool:
|
|
return not (a[2] < b[0] or b[2] < a[0] or a[3] < b[1] or b[3] < a[1])
|
|
|
|
|
|
def _conformance_ok(
|
|
track: ActivityTrack,
|
|
seg: Segment,
|
|
i: int,
|
|
j: int,
|
|
) -> bool:
|
|
"""Check that the track slice [i..j] follows the segment polyline."""
|
|
interior = seg.polyline[1:-1]
|
|
if not interior:
|
|
return True # trivial 2-point segment
|
|
failing = 0
|
|
for sp in interior:
|
|
slat, slon = sp[0], sp[1]
|
|
min_d = min(
|
|
_dist(slat, slon, track.lats[k], track.lons[k])
|
|
for k in range(i, j + 1)
|
|
)
|
|
if min_d > CONFORMANCE_MAX_DEV_M:
|
|
failing += 1
|
|
return (failing / len(interior)) <= CONFORMANCE_MAX_FRAC
|
|
|
|
|
|
def _extract_effort(
|
|
track: ActivityTrack,
|
|
seg: Segment,
|
|
i: int,
|
|
j: int,
|
|
) -> SegmentEffort:
|
|
elapsed_s = track.times[j] - track.times[i]
|
|
started_at = (track.started_at + timedelta(seconds=track.times[i])).replace(microsecond=0)
|
|
# Always derive avg speed from segment distance / elapsed time. Device-recorded
|
|
# speed is unreliable across formats (m/s vs km/h in older FIT files) and
|
|
# averaging instantaneous GPS speed over a slice gives different results anyway.
|
|
avg_speed = (seg.distance_m / elapsed_s * 3.6) if elapsed_s > 0 else None
|
|
avg_hr_raw = _avg_nonnull(track.hrs, i, j)
|
|
avg_hr = int(round(avg_hr_raw)) if avg_hr_raw is not None else None
|
|
avg_pwr_raw = _avg_nonnull(track.powers, i, j)
|
|
avg_pwr = int(round(avg_pwr_raw)) if avg_pwr_raw is not None else None
|
|
np_pwr = _np_power(track.powers, i, j) if any(v is not None for v in track.powers[i:j + 1]) else None
|
|
return SegmentEffort(
|
|
activity_id=track.activity_id,
|
|
started_at=started_at,
|
|
elapsed_s=max(1, elapsed_s),
|
|
avg_speed_kmh=round(avg_speed, 2) if avg_speed is not None else None,
|
|
avg_hr_bpm=avg_hr,
|
|
avg_power_w=avg_pwr,
|
|
np_power_w=np_pwr,
|
|
detected_at=datetime.now(timezone.utc),
|
|
)
|
|
|
|
|
|
def detect_one(track: ActivityTrack, seg: Segment) -> list[SegmentEffort]:
|
|
"""Return all matching efforts for a single segment against a track."""
|
|
if not track.bbox or not _bboxes_overlap(track.bbox, seg.bbox):
|
|
return []
|
|
if seg.sport and seg.sport != track.sport:
|
|
return []
|
|
|
|
seg_start_lat, seg_start_lon = seg.polyline[0][0], seg.polyline[0][1]
|
|
seg_end_lat, seg_end_lon = seg.polyline[-1][0], seg.polyline[-1][1]
|
|
n = len(track.lats)
|
|
efforts: list[SegmentEffort] = []
|
|
|
|
search_from = 0
|
|
while search_from < n - 1:
|
|
# Find next start candidate from search_from.
|
|
start_idx = None
|
|
for i in range(search_from, n):
|
|
if _dist(seg_start_lat, seg_start_lon, track.lats[i], track.lons[i]) <= MATCH_RADIUS_M:
|
|
start_idx = i
|
|
break
|
|
if start_idx is None:
|
|
break
|
|
|
|
# Scan forward from start_idx for an end candidate.
|
|
end_idx = None
|
|
for j in range(start_idx + 1, n):
|
|
if _dist(seg_end_lat, seg_end_lon, track.lats[j], track.lons[j]) <= MATCH_RADIUS_M:
|
|
end_idx = j
|
|
break
|
|
|
|
if end_idx is None:
|
|
# No end found — no more efforts possible starting at or after start_idx.
|
|
break
|
|
|
|
# Reject implausibly slow or fast matches.
|
|
elapsed = track.times[end_idx] - track.times[start_idx]
|
|
if elapsed > 0:
|
|
geo_speed = seg.distance_m / elapsed
|
|
min_speed = _MIN_SPEED_MS.get(track.sport, _MIN_SPEED_DEFAULT)
|
|
max_speed = _MAX_SPEED_MS.get(track.sport, _MAX_SPEED_DEFAULT)
|
|
if geo_speed < min_speed or geo_speed > max_speed:
|
|
search_from = start_idx + 1
|
|
continue
|
|
|
|
if _conformance_ok(track, seg, start_idx, end_idx):
|
|
efforts.append(_extract_effort(track, seg, start_idx, end_idx))
|
|
search_from = end_idx + 1
|
|
else:
|
|
# Conformance failed; try next start candidate after start_idx.
|
|
search_from = start_idx + 1
|
|
|
|
return efforts
|
|
|
|
|
|
def detect_all(
|
|
track: ActivityTrack,
|
|
handle: str,
|
|
data_dir: Path,
|
|
) -> int:
|
|
"""Detect efforts for all segments and persist them. Returns effort count."""
|
|
from bincio.segments import store as _store
|
|
|
|
segments = _store.list_segments(data_dir)
|
|
total = 0
|
|
for seg in segments:
|
|
efforts = detect_one(track, seg)
|
|
for effort in efforts:
|
|
_store.add_effort(data_dir, handle, seg.id, effort)
|
|
total += len(efforts)
|
|
return total
|