Segments Phase 3: detection algorithm, CLI, ingest hook, and efforts API

- detect.py: ActivityTrack + detect_one/detect_all (bbox pre-filter →
  start/end proximity 25m → path conformance 50m/30% → effort extraction
  with avg speed/HR/power and Coggan NP)
- cli.py: `bincio segments detect` for retroactive detection over stored
  timeseries JSONs, with optional --activity-id / --segment-id filters
- ingest.py: non-fatal hook at end of ingest_parsed runs detect_all
- server.py: GET /api/segments/{id}/efforts and POST /api/segments/{id}/detect
This commit is contained in:
Davide Scaini
2026-05-13 00:50:39 +02:00
parent 61db0734d2
commit 4d2df860ce
6 changed files with 673 additions and 0 deletions
+114
View File
@@ -0,0 +1,114 @@
"""bincio segments — segment management CLI commands."""
from __future__ import annotations
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
import click
def _dt(s: str) -> datetime:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
@click.group("segments")
def segments_group() -> None:
"""Manage segments and detect efforts."""
@segments_group.command("detect")
@click.option("--data-dir", required=True, type=click.Path(), help="BAS data directory (e.g. /var/bincio)")
@click.option("--handle", required=True, help="User handle to run detection for")
@click.option("--activity-id", default=None, help="Limit to a single activity ID (optional)")
@click.option("--segment-id", default=None, help="Limit to a single segment ID (optional)")
def detect_cmd(data_dir: str, handle: str, activity_id: str | None, segment_id: str | None) -> None:
"""Retroactively detect segment efforts for stored activities.
Walks every activity with GPS data, runs the detection algorithm against
all (or a single) segment, and persists any new efforts found.
"""
from bincio.segments.detect import track_from_timeseries_json, detect_one, detect_all
from bincio.segments import store as _store
dd = Path(data_dir).expanduser().resolve()
user_dir = dd / handle
acts_dir = user_dir / "activities"
if not acts_dir.exists():
click.echo(f"No activities directory at {acts_dir}", err=True)
sys.exit(1)
# Choose which segments to check.
if segment_id:
seg = _store.load_segment(dd, segment_id)
if seg is None:
click.echo(f"Segment not found: {segment_id}", err=True)
sys.exit(1)
segments = [seg]
else:
segments = _store.list_segments(dd)
if not segments:
click.echo("No segments defined.", err=True)
sys.exit(0)
# Choose which activities to process.
if activity_id:
detail_files = [acts_dir / f"{activity_id}.json"]
else:
detail_files = sorted(acts_dir.glob("*.json"))
# Exclude timeseries files.
detail_files = [f for f in detail_files if ".timeseries." not in f.name]
total_efforts = 0
processed = 0
for detail_path in detail_files:
try:
detail = json.loads(detail_path.read_text(encoding="utf-8"))
except Exception:
continue
ts_url = detail.get("timeseries_url")
if not ts_url:
continue
act_id = detail.get("id", detail_path.stem)
sport = detail.get("sport", "other")
started = detail.get("started_at")
if not started:
continue
try:
started_at = _dt(started)
except Exception:
continue
ts_path = user_dir / ts_url
if not ts_path.exists():
continue
try:
ts = json.loads(ts_path.read_text(encoding="utf-8"))
except Exception:
continue
track = track_from_timeseries_json(ts, act_id, sport, started_at)
if track is None:
continue
processed += 1
for seg in segments:
from bincio.segments.detect import detect_one
efforts = detect_one(track, seg)
for effort in efforts:
_store.add_effort(dd, handle, seg.id, effort)
if efforts:
click.echo(
f" {act_id}: {len(efforts)} effort(s) on '{seg.name}' "
f"({', '.join(str(e.elapsed_s) + 's' for e in efforts)})"
)
total_efforts += len(efforts)
click.echo(f"\nProcessed {processed} activities, found {total_efforts} effort(s).")
+278
View File
@@ -0,0 +1,278 @@
"""Segment effort detection.
Matches GPS tracks against stored segment polylines and produces SegmentEffort
records. Works from either a live ParsedActivity (ingest path) or from a
stored timeseries JSON (retroactive path).
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from bincio.segments.models import Segment, SegmentEffort
# ── tuning constants ──────────────────────────────────────────────────────────
MATCH_RADIUS_M = 25 # max distance to segment start/end to open/close an effort
CONFORMANCE_MAX_DEV_M = 50 # max allowed deviation for each interior segment point
CONFORMANCE_MAX_FRAC = 0.30 # max fraction of interior points allowed to deviate
# ── fast distance approximation ───────────────────────────────────────────────
_R = 6_371_000.0 # Earth radius in metres
def _dist(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Equirectangular approximation — fast, accurate to <0.1% within 100 km."""
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
mlat = math.radians((lat1 + lat2) / 2.0)
return math.hypot(dlat * _R, dlon * _R * math.cos(mlat))
# ── activity track representation ────────────────────────────────────────────
@dataclass
class ActivityTrack:
"""Common internal representation for detection, independent of source format."""
activity_id: str
sport: str
started_at: datetime
# Parallel arrays — all same length, GPS-only points (lat/lon not None).
lats: list[float]
lons: list[float]
times: list[int] # seconds from started_at
speeds: list[Optional[float]]
hrs: list[Optional[int]]
powers: list[Optional[int]]
bbox: list[float] = field(default_factory=list) # [lon_min, lat_min, lon_max, lat_max]
def __post_init__(self) -> None:
if self.lats and not self.bbox:
self.bbox = [
min(self.lons), min(self.lats),
max(self.lons), max(self.lats),
]
def track_from_parsed(parsed: "ParsedActivity", activity_id: str) -> Optional[ActivityTrack]: # noqa: F821
"""Build an ActivityTrack from a ParsedActivity (used during ingest)."""
lats, lons, times, speeds, hrs, powers = [], [], [], [], [], []
last_t = -1
for p in parsed.points:
if p.lat is None or p.lon is None:
continue
t = int((p.timestamp - parsed.started_at).total_seconds())
if t < 0 or t == last_t:
continue
last_t = t
lats.append(p.lat)
lons.append(p.lon)
times.append(t)
speeds.append(p.speed_kmh)
hrs.append(p.hr_bpm)
powers.append(p.power_w)
if len(lats) < 2:
return None
return ActivityTrack(
activity_id=activity_id,
sport=parsed.sport,
started_at=parsed.started_at,
lats=lats, lons=lons, times=times,
speeds=speeds, hrs=hrs, powers=powers,
)
def track_from_timeseries_json(
ts: dict,
activity_id: str,
sport: str,
started_at: datetime,
) -> Optional[ActivityTrack]:
"""Build an ActivityTrack from a stored timeseries JSON dict."""
raw_lats = ts.get("lat") or []
raw_lons = ts.get("lon") or []
raw_t = ts.get("t") or []
raw_spd = ts.get("speed_kmh") or []
raw_hr = ts.get("hr_bpm") or []
raw_pwr = ts.get("power_w") or []
n = len(raw_t)
if n < 2 or not raw_lats or len(raw_lats) != n:
return None
def _pad(arr: list, length: int) -> list:
return arr + [None] * (length - len(arr))
raw_spd = _pad(raw_spd, n)
raw_hr = _pad(raw_hr, n)
raw_pwr = _pad(raw_pwr, n)
lats, lons, times, speeds, hrs, powers = [], [], [], [], [], []
for i in range(n):
if raw_lats[i] is None or raw_lons[i] is None:
continue
lats.append(float(raw_lats[i]))
lons.append(float(raw_lons[i]))
times.append(int(raw_t[i]))
speeds.append(raw_spd[i])
hrs.append(raw_hr[i])
powers.append(raw_pwr[i])
if len(lats) < 2:
return None
return ActivityTrack(
activity_id=activity_id,
sport=sport,
started_at=started_at,
lats=lats, lons=lons, times=times,
speeds=speeds, hrs=hrs, powers=powers,
)
# ── effort metric helpers ─────────────────────────────────────────────────────
def _avg_nonnull(vals: list, lo: int, hi: int) -> Optional[float]:
nums = [v for v in vals[lo:hi + 1] if v is not None]
return sum(nums) / len(nums) if nums else None
def _np_power(powers: list[Optional[int]], lo: int, hi: int) -> Optional[int]:
"""Coggan NP from a slice of 1Hz power data (may have gaps/nulls)."""
WIN = 30
chunk = powers[lo:hi + 1]
filled = [v if v is not None else 0 for v in chunk]
n = len(filled)
if n < WIN:
# Too short for rolling average — just return avg power.
non_null = [v for v in chunk if v is not None]
return int(round(sum(non_null) / len(non_null))) if non_null else None
half = WIN // 2
window_sum = sum(filled[:WIN])
fourth_powers = []
for i in range(half, n - half):
fourth_powers.append((window_sum / WIN) ** 4)
if i + half + 1 < n:
window_sum += filled[i + half + 1] - filled[i - half]
if not fourth_powers:
return None
return int(round((sum(fourth_powers) / len(fourth_powers)) ** 0.25))
# ── detection algorithm ───────────────────────────────────────────────────────
def _bboxes_overlap(a: list[float], b: list[float]) -> bool:
return not (a[2] < b[0] or b[2] < a[0] or a[3] < b[1] or b[3] < a[1])
def _conformance_ok(
track: ActivityTrack,
seg: Segment,
i: int,
j: int,
) -> bool:
"""Check that the track slice [i..j] follows the segment polyline."""
interior = seg.polyline[1:-1]
if not interior:
return True # trivial 2-point segment
failing = 0
for sp in interior:
slat, slon = sp[0], sp[1]
min_d = min(
_dist(slat, slon, track.lats[k], track.lons[k])
for k in range(i, j + 1)
)
if min_d > CONFORMANCE_MAX_DEV_M:
failing += 1
return (failing / len(interior)) <= CONFORMANCE_MAX_FRAC
def _extract_effort(
track: ActivityTrack,
seg: Segment,
i: int,
j: int,
) -> SegmentEffort:
elapsed_s = track.times[j] - track.times[i]
started_at = track.started_at + timedelta(seconds=track.times[i])
avg_speed = _avg_nonnull(track.speeds, i, j)
avg_hr_raw = _avg_nonnull(track.hrs, i, j)
avg_hr = int(round(avg_hr_raw)) if avg_hr_raw is not None else None
avg_pwr_raw = _avg_nonnull(track.powers, i, j)
avg_pwr = int(round(avg_pwr_raw)) if avg_pwr_raw is not None else None
np_pwr = _np_power(track.powers, i, j) if any(v is not None for v in track.powers[i:j + 1]) else None
return SegmentEffort(
activity_id=track.activity_id,
started_at=started_at,
elapsed_s=max(1, elapsed_s),
avg_speed_kmh=round(avg_speed, 2) if avg_speed is not None else None,
avg_hr_bpm=avg_hr,
avg_power_w=avg_pwr,
np_power_w=np_pwr,
detected_at=datetime.now(timezone.utc),
)
def detect_one(track: ActivityTrack, seg: Segment) -> list[SegmentEffort]:
"""Return all matching efforts for a single segment against a track."""
if not track.bbox or not _bboxes_overlap(track.bbox, seg.bbox):
return []
if seg.sport and seg.sport != track.sport:
return []
seg_start_lat, seg_start_lon = seg.polyline[0][0], seg.polyline[0][1]
seg_end_lat, seg_end_lon = seg.polyline[-1][0], seg.polyline[-1][1]
n = len(track.lats)
efforts: list[SegmentEffort] = []
search_from = 0
while search_from < n - 1:
# Find next start candidate from search_from.
start_idx = None
for i in range(search_from, n):
if _dist(seg_start_lat, seg_start_lon, track.lats[i], track.lons[i]) <= MATCH_RADIUS_M:
start_idx = i
break
if start_idx is None:
break
# Scan forward from start_idx for an end candidate.
end_idx = None
for j in range(start_idx + 1, n):
if _dist(seg_end_lat, seg_end_lon, track.lats[j], track.lons[j]) <= MATCH_RADIUS_M:
end_idx = j
break
if end_idx is None:
# No end found — no more efforts possible starting at or after start_idx.
break
if _conformance_ok(track, seg, start_idx, end_idx):
efforts.append(_extract_effort(track, seg, start_idx, end_idx))
search_from = end_idx + 1
else:
# Conformance failed; try next start candidate after start_idx.
search_from = start_idx + 1
return efforts
def detect_all(
track: ActivityTrack,
handle: str,
data_dir: Path,
) -> int:
"""Detect efforts for all segments and persist them. Returns effort count."""
from bincio.segments import store as _store
segments = _store.list_segments(data_dir)
total = 0
for seg in segments:
efforts = detect_one(track, seg)
for effort in efforts:
_store.add_effort(data_dir, handle, seg.id, effort)
total += len(efforts)
return total