1ce94b8536
- fit.py: map FIT sub_sport 'treadmill' and 'virtual' to 'indoor' - writer.py: broaden _is_outdoor to catch all indoor sub_sport variants - render/cli.py: rebuild athlete.json from index.json on every bake so records never go stale when the exclusion logic changes
165 lines
6.3 KiB
Python
165 lines
6.3 KiB
Python
"""FIT file parser (Garmin binary format)."""
|
|
|
|
from datetime import timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import fitdecode
|
|
|
|
from bincio.extract.models import DataPoint, LapData, ParsedActivity, strip_bogus_leading_points
|
|
from bincio.extract.sport import normalise_sport
|
|
|
|
|
|
class FitParser:
|
|
def parse(self, path: Path, raw_bytes: bytes) -> ParsedActivity:
|
|
import io
|
|
|
|
points: list[DataPoint] = []
|
|
laps: list[LapData] = []
|
|
sport: str = "other"
|
|
sub_sport: str | None = None
|
|
device: str | None = None
|
|
|
|
has_baro_alt = False # True if any record used enhanced_altitude
|
|
|
|
with fitdecode.FitReader(io.BytesIO(raw_bytes)) as fit:
|
|
for frame in fit:
|
|
if not isinstance(frame, fitdecode.FitDataMessage):
|
|
continue
|
|
|
|
if frame.name == "sport":
|
|
sport = normalise_sport(_get(frame, "sport"))
|
|
sub_sport = _normalise_sub_sport(_get(frame, "sub_sport"))
|
|
|
|
elif frame.name == "session":
|
|
# Karoo and Strava-generated FIT files store sport here
|
|
# instead of (or in addition to) a separate 'sport' message.
|
|
# Only use session sport if no 'sport' frame was seen yet.
|
|
if sport == "other":
|
|
raw_sport = _get(frame, "sport")
|
|
if raw_sport is not None:
|
|
sport = normalise_sport(raw_sport)
|
|
sub_sport = _normalise_sub_sport(_get(frame, "sub_sport"))
|
|
|
|
elif frame.name == "device_info":
|
|
mfr = _get(frame, "manufacturer")
|
|
prod = _get(frame, "product_name") or _get(frame, "garmin_product")
|
|
if mfr and prod:
|
|
device = f"{mfr} {prod}"
|
|
elif prod:
|
|
device = str(prod)
|
|
|
|
elif frame.name == "record":
|
|
ts = _get(frame, "timestamp")
|
|
if ts is None:
|
|
continue
|
|
if hasattr(ts, "tzinfo") and ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
|
|
lat = _semicircles_to_deg(_get(frame, "position_lat"))
|
|
lon = _semicircles_to_deg(_get(frame, "position_long"))
|
|
speed_raw = _get(frame, "speed") # m/s
|
|
# enhanced_altitude is written by barometric altimeters (most
|
|
# modern Garmins). Fall back to GPS-derived altitude if absent.
|
|
_alt = _get(frame, "enhanced_altitude")
|
|
if _alt is not None:
|
|
has_baro_alt = True
|
|
else:
|
|
_alt = _get(frame, "altitude")
|
|
|
|
dp = DataPoint(
|
|
timestamp=ts,
|
|
lat=lat,
|
|
lon=lon,
|
|
elevation_m=_alt,
|
|
hr_bpm=_get(frame, "heart_rate"),
|
|
cadence_rpm=_get(frame, "cadence"),
|
|
speed_kmh=speed_raw * 3.6 if speed_raw is not None else None,
|
|
power_w=_get(frame, "power"),
|
|
temperature_c=_get(frame, "temperature"),
|
|
distance_m=_get(frame, "distance"),
|
|
)
|
|
points.append(dp)
|
|
|
|
elif frame.name == "lap":
|
|
ts = _get(frame, "start_time")
|
|
if ts is not None:
|
|
if hasattr(ts, "tzinfo") and ts.tzinfo is None:
|
|
ts = ts.replace(tzinfo=timezone.utc)
|
|
elapsed = _get(frame, "total_elapsed_time")
|
|
speed_raw = _get(frame, "avg_speed")
|
|
laps.append(
|
|
LapData(
|
|
index=len(laps),
|
|
started_at=ts,
|
|
duration_s=int(elapsed) if elapsed else None,
|
|
distance_m=_get(frame, "total_distance"),
|
|
elevation_gain_m=_get(frame, "total_ascent"),
|
|
avg_speed_kmh=speed_raw * 3.6 if speed_raw is not None else None,
|
|
avg_hr_bpm=_get(frame, "avg_heart_rate"),
|
|
avg_power_w=_get(frame, "avg_power"),
|
|
)
|
|
)
|
|
|
|
points = strip_bogus_leading_points(points)
|
|
if not points:
|
|
raise ValueError(f"No record messages found in {path.name}")
|
|
|
|
altitude_source = "barometric" if has_baro_alt else "gps"
|
|
|
|
return ParsedActivity(
|
|
points=points,
|
|
sport=sport,
|
|
sub_sport=sub_sport,
|
|
started_at=points[0].timestamp,
|
|
device=device,
|
|
laps=laps,
|
|
source_file=path.name,
|
|
source_hash="",
|
|
altitude_source=altitude_source,
|
|
)
|
|
|
|
|
|
def _get(frame: fitdecode.FitDataMessage, field: str, default: Any = None) -> Any:
|
|
try:
|
|
return frame.get_value(field)
|
|
except KeyError:
|
|
return default
|
|
|
|
|
|
def _semicircles_to_deg(value: Any) -> float | None:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
deg = float(value) * (180.0 / 2**31)
|
|
# Sanity check: invalid semicircle values often come out as ±180+
|
|
if abs(deg) > 180:
|
|
return None
|
|
return deg
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _normalise_sub_sport(value: Any) -> str | None:
|
|
if value is None:
|
|
return None
|
|
s = str(value).lower().replace(" ", "_")
|
|
mapping = {
|
|
"generic": None, # FIT default — unspecified
|
|
"virtual_activity": "indoor",
|
|
"virtual": "indoor",
|
|
"road": "road",
|
|
"mountain": "mountain",
|
|
"gravel_cycling": "gravel",
|
|
"cyclocross": "gravel",
|
|
"indoor_cycling": "indoor",
|
|
"treadmill": "indoor",
|
|
"trail": "trail",
|
|
"track": "track",
|
|
"cross_country_skiing": "nordic",
|
|
"nordic_skiing": "nordic",
|
|
"skate_skiing": "nordic",
|
|
"backcountry_skiing":"nordic",
|
|
}
|
|
return mapping.get(s, s) or None
|