Files
bincio-activity/bincio/segments/detect.py
T
Davide Scaini 4d2df860ce Segments Phase 3: detection algorithm, CLI, ingest hook, and efforts API
- detect.py: ActivityTrack + detect_one/detect_all (bbox pre-filter →
  start/end proximity 25m → path conformance 50m/30% → effort extraction
  with avg speed/HR/power and Coggan NP)
- cli.py: `bincio segments detect` for retroactive detection over stored
  timeseries JSONs, with optional --activity-id / --segment-id filters
- ingest.py: non-fatal hook at end of ingest_parsed runs detect_all
- server.py: GET /api/segments/{id}/efforts and POST /api/segments/{id}/detect
2026-05-13 00:50:39 +02:00

279 lines
9.8 KiB
Python

"""Segment effort detection.
Matches GPS tracks against stored segment polylines and produces SegmentEffort
records. Works from either a live ParsedActivity (ingest path) or from a
stored timeseries JSON (retroactive path).
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from bincio.segments.models import Segment, SegmentEffort
# ── tuning constants ──────────────────────────────────────────────────────────
MATCH_RADIUS_M = 25 # max distance to segment start/end to open/close an effort
CONFORMANCE_MAX_DEV_M = 50 # max allowed deviation for each interior segment point
CONFORMANCE_MAX_FRAC = 0.30 # max fraction of interior points allowed to deviate
# ── fast distance approximation ───────────────────────────────────────────────
_R = 6_371_000.0 # Earth radius in metres
def _dist(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Equirectangular approximation — fast, accurate to <0.1% within 100 km."""
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
mlat = math.radians((lat1 + lat2) / 2.0)
return math.hypot(dlat * _R, dlon * _R * math.cos(mlat))
# ── activity track representation ────────────────────────────────────────────
@dataclass
class ActivityTrack:
"""Common internal representation for detection, independent of source format."""
activity_id: str
sport: str
started_at: datetime
# Parallel arrays — all same length, GPS-only points (lat/lon not None).
lats: list[float]
lons: list[float]
times: list[int] # seconds from started_at
speeds: list[Optional[float]]
hrs: list[Optional[int]]
powers: list[Optional[int]]
bbox: list[float] = field(default_factory=list) # [lon_min, lat_min, lon_max, lat_max]
def __post_init__(self) -> None:
if self.lats and not self.bbox:
self.bbox = [
min(self.lons), min(self.lats),
max(self.lons), max(self.lats),
]
def track_from_parsed(parsed: "ParsedActivity", activity_id: str) -> Optional[ActivityTrack]: # noqa: F821
"""Build an ActivityTrack from a ParsedActivity (used during ingest)."""
lats, lons, times, speeds, hrs, powers = [], [], [], [], [], []
last_t = -1
for p in parsed.points:
if p.lat is None or p.lon is None:
continue
t = int((p.timestamp - parsed.started_at).total_seconds())
if t < 0 or t == last_t:
continue
last_t = t
lats.append(p.lat)
lons.append(p.lon)
times.append(t)
speeds.append(p.speed_kmh)
hrs.append(p.hr_bpm)
powers.append(p.power_w)
if len(lats) < 2:
return None
return ActivityTrack(
activity_id=activity_id,
sport=parsed.sport,
started_at=parsed.started_at,
lats=lats, lons=lons, times=times,
speeds=speeds, hrs=hrs, powers=powers,
)
def track_from_timeseries_json(
ts: dict,
activity_id: str,
sport: str,
started_at: datetime,
) -> Optional[ActivityTrack]:
"""Build an ActivityTrack from a stored timeseries JSON dict."""
raw_lats = ts.get("lat") or []
raw_lons = ts.get("lon") or []
raw_t = ts.get("t") or []
raw_spd = ts.get("speed_kmh") or []
raw_hr = ts.get("hr_bpm") or []
raw_pwr = ts.get("power_w") or []
n = len(raw_t)
if n < 2 or not raw_lats or len(raw_lats) != n:
return None
def _pad(arr: list, length: int) -> list:
return arr + [None] * (length - len(arr))
raw_spd = _pad(raw_spd, n)
raw_hr = _pad(raw_hr, n)
raw_pwr = _pad(raw_pwr, n)
lats, lons, times, speeds, hrs, powers = [], [], [], [], [], []
for i in range(n):
if raw_lats[i] is None or raw_lons[i] is None:
continue
lats.append(float(raw_lats[i]))
lons.append(float(raw_lons[i]))
times.append(int(raw_t[i]))
speeds.append(raw_spd[i])
hrs.append(raw_hr[i])
powers.append(raw_pwr[i])
if len(lats) < 2:
return None
return ActivityTrack(
activity_id=activity_id,
sport=sport,
started_at=started_at,
lats=lats, lons=lons, times=times,
speeds=speeds, hrs=hrs, powers=powers,
)
# ── effort metric helpers ─────────────────────────────────────────────────────
def _avg_nonnull(vals: list, lo: int, hi: int) -> Optional[float]:
nums = [v for v in vals[lo:hi + 1] if v is not None]
return sum(nums) / len(nums) if nums else None
def _np_power(powers: list[Optional[int]], lo: int, hi: int) -> Optional[int]:
"""Coggan NP from a slice of 1Hz power data (may have gaps/nulls)."""
WIN = 30
chunk = powers[lo:hi + 1]
filled = [v if v is not None else 0 for v in chunk]
n = len(filled)
if n < WIN:
# Too short for rolling average — just return avg power.
non_null = [v for v in chunk if v is not None]
return int(round(sum(non_null) / len(non_null))) if non_null else None
half = WIN // 2
window_sum = sum(filled[:WIN])
fourth_powers = []
for i in range(half, n - half):
fourth_powers.append((window_sum / WIN) ** 4)
if i + half + 1 < n:
window_sum += filled[i + half + 1] - filled[i - half]
if not fourth_powers:
return None
return int(round((sum(fourth_powers) / len(fourth_powers)) ** 0.25))
# ── detection algorithm ───────────────────────────────────────────────────────
def _bboxes_overlap(a: list[float], b: list[float]) -> bool:
return not (a[2] < b[0] or b[2] < a[0] or a[3] < b[1] or b[3] < a[1])
def _conformance_ok(
track: ActivityTrack,
seg: Segment,
i: int,
j: int,
) -> bool:
"""Check that the track slice [i..j] follows the segment polyline."""
interior = seg.polyline[1:-1]
if not interior:
return True # trivial 2-point segment
failing = 0
for sp in interior:
slat, slon = sp[0], sp[1]
min_d = min(
_dist(slat, slon, track.lats[k], track.lons[k])
for k in range(i, j + 1)
)
if min_d > CONFORMANCE_MAX_DEV_M:
failing += 1
return (failing / len(interior)) <= CONFORMANCE_MAX_FRAC
def _extract_effort(
track: ActivityTrack,
seg: Segment,
i: int,
j: int,
) -> SegmentEffort:
elapsed_s = track.times[j] - track.times[i]
started_at = track.started_at + timedelta(seconds=track.times[i])
avg_speed = _avg_nonnull(track.speeds, i, j)
avg_hr_raw = _avg_nonnull(track.hrs, i, j)
avg_hr = int(round(avg_hr_raw)) if avg_hr_raw is not None else None
avg_pwr_raw = _avg_nonnull(track.powers, i, j)
avg_pwr = int(round(avg_pwr_raw)) if avg_pwr_raw is not None else None
np_pwr = _np_power(track.powers, i, j) if any(v is not None for v in track.powers[i:j + 1]) else None
return SegmentEffort(
activity_id=track.activity_id,
started_at=started_at,
elapsed_s=max(1, elapsed_s),
avg_speed_kmh=round(avg_speed, 2) if avg_speed is not None else None,
avg_hr_bpm=avg_hr,
avg_power_w=avg_pwr,
np_power_w=np_pwr,
detected_at=datetime.now(timezone.utc),
)
def detect_one(track: ActivityTrack, seg: Segment) -> list[SegmentEffort]:
"""Return all matching efforts for a single segment against a track."""
if not track.bbox or not _bboxes_overlap(track.bbox, seg.bbox):
return []
if seg.sport and seg.sport != track.sport:
return []
seg_start_lat, seg_start_lon = seg.polyline[0][0], seg.polyline[0][1]
seg_end_lat, seg_end_lon = seg.polyline[-1][0], seg.polyline[-1][1]
n = len(track.lats)
efforts: list[SegmentEffort] = []
search_from = 0
while search_from < n - 1:
# Find next start candidate from search_from.
start_idx = None
for i in range(search_from, n):
if _dist(seg_start_lat, seg_start_lon, track.lats[i], track.lons[i]) <= MATCH_RADIUS_M:
start_idx = i
break
if start_idx is None:
break
# Scan forward from start_idx for an end candidate.
end_idx = None
for j in range(start_idx + 1, n):
if _dist(seg_end_lat, seg_end_lon, track.lats[j], track.lons[j]) <= MATCH_RADIUS_M:
end_idx = j
break
if end_idx is None:
# No end found — no more efforts possible starting at or after start_idx.
break
if _conformance_ok(track, seg, start_idx, end_idx):
efforts.append(_extract_effort(track, seg, start_idx, end_idx))
search_from = end_idx + 1
else:
# Conformance failed; try next start candidate after start_idx.
search_from = start_idx + 1
return efforts
def detect_all(
track: ActivityTrack,
handle: str,
data_dir: Path,
) -> int:
"""Detect efforts for all segments and persist them. Returns effort count."""
from bincio.segments import store as _store
segments = _store.list_segments(data_dir)
total = 0
for seg in segments:
efforts = detect_one(track, seg)
for effort in efforts:
_store.add_effort(data_dir, handle, seg.id, effort)
total += len(efforts)
return total