"""Compute aggregated metrics from a ParsedActivity. All calculations are self-contained — no external state needed. """ import math from dataclasses import dataclass from datetime import datetime from typing import Optional from geopy.distance import geodesic from bincio.extract.models import DataPoint, ParsedActivity # Speed below which we consider the athlete stopped (km/h) _STOPPED_THRESHOLD_KMH = 1.0 @dataclass class ComputedMetrics: distance_m: Optional[float] duration_s: Optional[int] moving_time_s: Optional[int] elevation_gain_m: Optional[float] elevation_loss_m: Optional[float] avg_speed_kmh: Optional[float] max_speed_kmh: Optional[float] avg_hr_bpm: Optional[int] max_hr_bpm: Optional[int] avg_cadence_rpm: Optional[int] avg_power_w: Optional[int] max_power_w: Optional[int] bbox: Optional[tuple[float, float, float, float]] # min_lon, min_lat, max_lon, max_lat start_latlng: Optional[tuple[float, float]] end_latlng: Optional[tuple[float, float]] def compute(activity: ParsedActivity) -> ComputedMetrics: pts = activity.points if not pts: return _empty() duration_s = _duration(pts) distance_m = _distance(pts) moving_time_s, moving_speed_kmh = _moving_stats(pts) gain, loss = _elevation(pts) max_speed = _max_speed(pts) avg_hr, max_hr = _hr_stats(pts) avg_cad = _avg_nonnull([p.cadence_rpm for p in pts]) avg_pow = _avg_nonnull([p.power_w for p in pts]) max_pow = _max_nonnull([p.power_w for p in pts]) bbox = _bbox(pts) start_ll, end_ll = _endpoints(pts) return ComputedMetrics( distance_m=distance_m, duration_s=duration_s, moving_time_s=moving_time_s, elevation_gain_m=round(gain, 1) if gain is not None else None, elevation_loss_m=round(abs(loss), 1) if loss is not None else None, avg_speed_kmh=round(moving_speed_kmh, 2) if moving_speed_kmh else None, max_speed_kmh=round(max_speed, 2) if max_speed else None, avg_hr_bpm=avg_hr, max_hr_bpm=max_hr, avg_cadence_rpm=avg_cad, avg_power_w=avg_pow, max_power_w=max_pow, bbox=bbox, start_latlng=start_ll, end_latlng=end_ll, ) # ── helpers ────────────────────────────────────────────────────────────────── def _duration(pts: list[DataPoint]) -> Optional[int]: if len(pts) < 2: return None return int((pts[-1].timestamp - pts[0].timestamp).total_seconds()) def _distance(pts: list[DataPoint]) -> Optional[float]: """Prefer device-recorded cumulative distance; fall back to GPS geodesic.""" # If the last point has a device distance, use it last_dist = next( (p.distance_m for p in reversed(pts) if p.distance_m is not None), None ) if last_dist is not None: return round(last_dist, 1) # GPS fallback total = 0.0 has_gps = False for a, b in zip(pts, pts[1:]): if a.lat is None or a.lon is None or b.lat is None or b.lon is None: continue has_gps = True total += geodesic((a.lat, a.lon), (b.lat, b.lon)).meters return round(total, 1) if has_gps else None def _moving_stats(pts: list[DataPoint]) -> tuple[Optional[int], Optional[float]]: """Return (moving_time_s, avg_speed_kmh_over_moving_time).""" moving_s = 0 moving_dist_m = 0.0 has_gps = False for a, b in zip(pts, pts[1:]): dt = (b.timestamp - a.timestamp).total_seconds() if dt <= 0: continue # Compute speed for this interval from GPS if a.lat is not None and a.lon is not None and b.lat is not None and b.lon is not None: has_gps = True seg_m = geodesic((a.lat, a.lon), (b.lat, b.lon)).meters seg_kmh = (seg_m / dt) * 3.6 elif a.speed_kmh is not None: seg_kmh = a.speed_kmh seg_m = (seg_kmh / 3.6) * dt has_gps = True # speed data present else: continue if seg_kmh >= _STOPPED_THRESHOLD_KMH: moving_s += int(dt) moving_dist_m += seg_m if not has_gps or moving_s == 0: return None, None avg_kmh = (moving_dist_m / moving_s) * 3.6 return moving_s, avg_kmh def _elevation(pts: list[DataPoint]) -> tuple[Optional[float], Optional[float]]: elevations = [p.elevation_m for p in pts if p.elevation_m is not None] if len(elevations) < 2: return None, None gain = loss = 0.0 for a, b in zip(elevations, elevations[1:]): diff = b - a if diff > 0: gain += diff else: loss += diff return gain, loss def _max_speed(pts: list[DataPoint]) -> Optional[float]: # Prefer device speed; fall back to GPS-derived device_speeds = [p.speed_kmh for p in pts if p.speed_kmh is not None] if device_speeds: return max(device_speeds) # GPS-derived max gps_speeds = [] for a, b in zip(pts, pts[1:]): if a.lat is None or b.lat is None: continue dt = (b.timestamp - a.timestamp).total_seconds() if dt <= 0: continue m = geodesic((a.lat, a.lon), (b.lat, b.lon)).meters gps_speeds.append((m / dt) * 3.6) return max(gps_speeds) if gps_speeds else None def _hr_stats(pts: list[DataPoint]) -> tuple[Optional[int], Optional[int]]: hrs = [p.hr_bpm for p in pts if p.hr_bpm is not None] if not hrs: return None, None return int(sum(hrs) / len(hrs)), max(hrs) def _avg_nonnull(values: list) -> Optional[int]: v = [x for x in values if x is not None] return int(sum(v) / len(v)) if v else None def _max_nonnull(values: list) -> Optional[int]: v = [x for x in values if x is not None] return max(v) if v else None def _bbox(pts: list[DataPoint]) -> Optional[tuple[float, float, float, float]]: lats = [p.lat for p in pts if p.lat is not None] lons = [p.lon for p in pts if p.lon is not None] if not lats: return None return (min(lons), min(lats), max(lons), max(lats)) def _endpoints( pts: list[DataPoint], ) -> tuple[Optional[tuple[float, float]], Optional[tuple[float, float]]]: gps = [(p.lat, p.lon) for p in pts if p.lat is not None and p.lon is not None] if not gps: return None, None return gps[0], gps[-1] def _empty() -> ComputedMetrics: return ComputedMetrics( distance_m=None, duration_s=None, moving_time_s=None, elevation_gain_m=None, elevation_loss_m=None, avg_speed_kmh=None, max_speed_kmh=None, avg_hr_bpm=None, max_hr_bpm=None, avg_cadence_rpm=None, avg_power_w=None, max_power_w=None, bbox=None, start_latlng=None, end_latlng=None, )