"""Segment effort detection. Matches GPS tracks against stored segment polylines and produces SegmentEffort records. Works from either a live ParsedActivity (ingest path) or from a stored timeseries JSON (retroactive path). """ from __future__ import annotations import math from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Optional from bincio.segments.models import Segment, SegmentEffort # ── tuning constants ────────────────────────────────────────────────────────── MATCH_RADIUS_M = 25 # max distance to segment start/end to open/close an effort CONFORMANCE_MAX_DEV_M = 50 # max allowed deviation for each interior segment point CONFORMANCE_MAX_FRAC = 0.30 # max fraction of interior points allowed to deviate # ── fast distance approximation ─────────────────────────────────────────────── _R = 6_371_000.0 # Earth radius in metres def _dist(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Equirectangular approximation — fast, accurate to <0.1% within 100 km.""" dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) mlat = math.radians((lat1 + lat2) / 2.0) return math.hypot(dlat * _R, dlon * _R * math.cos(mlat)) # ── activity track representation ──────────────────────────────────────────── @dataclass class ActivityTrack: """Common internal representation for detection, independent of source format.""" activity_id: str sport: str started_at: datetime # Parallel arrays — all same length, GPS-only points (lat/lon not None). lats: list[float] lons: list[float] times: list[int] # seconds from started_at speeds: list[Optional[float]] hrs: list[Optional[int]] powers: list[Optional[int]] bbox: list[float] = field(default_factory=list) # [lon_min, lat_min, lon_max, lat_max] def __post_init__(self) -> None: if self.lats and not self.bbox: self.bbox = [ min(self.lons), min(self.lats), max(self.lons), max(self.lats), ] def track_from_parsed(parsed: "ParsedActivity", activity_id: str) -> Optional[ActivityTrack]: # noqa: F821 """Build an ActivityTrack from a ParsedActivity (used during ingest).""" lats, lons, times, speeds, hrs, powers = [], [], [], [], [], [] last_t = -1 for p in parsed.points: if p.lat is None or p.lon is None: continue t = int((p.timestamp - parsed.started_at).total_seconds()) if t < 0 or t == last_t: continue last_t = t lats.append(p.lat) lons.append(p.lon) times.append(t) speeds.append(p.speed_kmh) hrs.append(p.hr_bpm) powers.append(p.power_w) if len(lats) < 2: return None return ActivityTrack( activity_id=activity_id, sport=parsed.sport, started_at=parsed.started_at, lats=lats, lons=lons, times=times, speeds=speeds, hrs=hrs, powers=powers, ) def track_from_timeseries_json( ts: dict, activity_id: str, sport: str, started_at: datetime, ) -> Optional[ActivityTrack]: """Build an ActivityTrack from a stored timeseries JSON dict.""" raw_lats = ts.get("lat") or [] raw_lons = ts.get("lon") or [] raw_t = ts.get("t") or [] raw_spd = ts.get("speed_kmh") or [] raw_hr = ts.get("hr_bpm") or [] raw_pwr = ts.get("power_w") or [] n = len(raw_t) if n < 2 or not raw_lats or len(raw_lats) != n: return None def _pad(arr: list, length: int) -> list: return arr + [None] * (length - len(arr)) raw_spd = _pad(raw_spd, n) raw_hr = _pad(raw_hr, n) raw_pwr = _pad(raw_pwr, n) lats, lons, times, speeds, hrs, powers = [], [], [], [], [], [] for i in range(n): if raw_lats[i] is None or raw_lons[i] is None: continue lats.append(float(raw_lats[i])) lons.append(float(raw_lons[i])) times.append(int(raw_t[i])) speeds.append(raw_spd[i]) hrs.append(raw_hr[i]) powers.append(raw_pwr[i]) if len(lats) < 2: return None return ActivityTrack( activity_id=activity_id, sport=sport, started_at=started_at, lats=lats, lons=lons, times=times, speeds=speeds, hrs=hrs, powers=powers, ) # ── effort metric helpers ───────────────────────────────────────────────────── def _avg_nonnull(vals: list, lo: int, hi: int) -> Optional[float]: nums = [v for v in vals[lo:hi + 1] if v is not None] return sum(nums) / len(nums) if nums else None def _np_power(powers: list[Optional[int]], lo: int, hi: int) -> Optional[int]: """Coggan NP from a slice of 1Hz power data (may have gaps/nulls).""" WIN = 30 chunk = powers[lo:hi + 1] filled = [v if v is not None else 0 for v in chunk] n = len(filled) if n < WIN: # Too short for rolling average — just return avg power. non_null = [v for v in chunk if v is not None] return int(round(sum(non_null) / len(non_null))) if non_null else None half = WIN // 2 window_sum = sum(filled[:WIN]) fourth_powers = [] for i in range(half, n - half): fourth_powers.append((window_sum / WIN) ** 4) if i + half + 1 < n: window_sum += filled[i + half + 1] - filled[i - half] if not fourth_powers: return None return int(round((sum(fourth_powers) / len(fourth_powers)) ** 0.25)) # ── detection algorithm ─────────────────────────────────────────────────────── def _bboxes_overlap(a: list[float], b: list[float]) -> bool: return not (a[2] < b[0] or b[2] < a[0] or a[3] < b[1] or b[3] < a[1]) def _conformance_ok( track: ActivityTrack, seg: Segment, i: int, j: int, ) -> bool: """Check that the track slice [i..j] follows the segment polyline.""" interior = seg.polyline[1:-1] if not interior: return True # trivial 2-point segment failing = 0 for sp in interior: slat, slon = sp[0], sp[1] min_d = min( _dist(slat, slon, track.lats[k], track.lons[k]) for k in range(i, j + 1) ) if min_d > CONFORMANCE_MAX_DEV_M: failing += 1 return (failing / len(interior)) <= CONFORMANCE_MAX_FRAC def _extract_effort( track: ActivityTrack, seg: Segment, i: int, j: int, ) -> SegmentEffort: elapsed_s = track.times[j] - track.times[i] started_at = track.started_at + timedelta(seconds=track.times[i]) avg_speed = _avg_nonnull(track.speeds, i, j) avg_hr_raw = _avg_nonnull(track.hrs, i, j) avg_hr = int(round(avg_hr_raw)) if avg_hr_raw is not None else None avg_pwr_raw = _avg_nonnull(track.powers, i, j) avg_pwr = int(round(avg_pwr_raw)) if avg_pwr_raw is not None else None np_pwr = _np_power(track.powers, i, j) if any(v is not None for v in track.powers[i:j + 1]) else None return SegmentEffort( activity_id=track.activity_id, started_at=started_at, elapsed_s=max(1, elapsed_s), avg_speed_kmh=round(avg_speed, 2) if avg_speed is not None else None, avg_hr_bpm=avg_hr, avg_power_w=avg_pwr, np_power_w=np_pwr, detected_at=datetime.now(timezone.utc), ) def detect_one(track: ActivityTrack, seg: Segment) -> list[SegmentEffort]: """Return all matching efforts for a single segment against a track.""" if not track.bbox or not _bboxes_overlap(track.bbox, seg.bbox): return [] if seg.sport and seg.sport != track.sport: return [] seg_start_lat, seg_start_lon = seg.polyline[0][0], seg.polyline[0][1] seg_end_lat, seg_end_lon = seg.polyline[-1][0], seg.polyline[-1][1] n = len(track.lats) efforts: list[SegmentEffort] = [] search_from = 0 while search_from < n - 1: # Find next start candidate from search_from. start_idx = None for i in range(search_from, n): if _dist(seg_start_lat, seg_start_lon, track.lats[i], track.lons[i]) <= MATCH_RADIUS_M: start_idx = i break if start_idx is None: break # Scan forward from start_idx for an end candidate. end_idx = None for j in range(start_idx + 1, n): if _dist(seg_end_lat, seg_end_lon, track.lats[j], track.lons[j]) <= MATCH_RADIUS_M: end_idx = j break if end_idx is None: # No end found — no more efforts possible starting at or after start_idx. break if _conformance_ok(track, seg, start_idx, end_idx): efforts.append(_extract_effort(track, seg, start_idx, end_idx)) search_from = end_idx + 1 else: # Conformance failed; try next start candidate after start_idx. search_from = start_idx + 1 return efforts def detect_all( track: ActivityTrack, handle: str, data_dir: Path, ) -> int: """Detect efforts for all segments and persist them. Returns effort count.""" from bincio.segments import store as _store segments = _store.list_segments(data_dir) total = 0 for seg in segments: efforts = detect_one(track, seg) for effort in efforts: _store.add_effort(data_dir, handle, seg.id, effort) total += len(efforts) return total