Files
bincio-activity/bincio/extract/parsers/fit.py
T
Davide Scaini 1ce94b8536 records: exclude indoor/treadmill/virtual sub_sport; rebuild athlete.json on bake
- fit.py: map FIT sub_sport 'treadmill' and 'virtual' to 'indoor'
- writer.py: broaden _is_outdoor to catch all indoor sub_sport variants
- render/cli.py: rebuild athlete.json from index.json on every bake so
  records never go stale when the exclusion logic changes
2026-05-14 23:37:54 +02:00

165 lines
6.3 KiB
Python

"""FIT file parser (Garmin binary format)."""
from datetime import timezone
from pathlib import Path
from typing import Any
import fitdecode
from bincio.extract.models import DataPoint, LapData, ParsedActivity, strip_bogus_leading_points
from bincio.extract.sport import normalise_sport
class FitParser:
def parse(self, path: Path, raw_bytes: bytes) -> ParsedActivity:
import io
points: list[DataPoint] = []
laps: list[LapData] = []
sport: str = "other"
sub_sport: str | None = None
device: str | None = None
has_baro_alt = False # True if any record used enhanced_altitude
with fitdecode.FitReader(io.BytesIO(raw_bytes)) as fit:
for frame in fit:
if not isinstance(frame, fitdecode.FitDataMessage):
continue
if frame.name == "sport":
sport = normalise_sport(_get(frame, "sport"))
sub_sport = _normalise_sub_sport(_get(frame, "sub_sport"))
elif frame.name == "session":
# Karoo and Strava-generated FIT files store sport here
# instead of (or in addition to) a separate 'sport' message.
# Only use session sport if no 'sport' frame was seen yet.
if sport == "other":
raw_sport = _get(frame, "sport")
if raw_sport is not None:
sport = normalise_sport(raw_sport)
sub_sport = _normalise_sub_sport(_get(frame, "sub_sport"))
elif frame.name == "device_info":
mfr = _get(frame, "manufacturer")
prod = _get(frame, "product_name") or _get(frame, "garmin_product")
if mfr and prod:
device = f"{mfr} {prod}"
elif prod:
device = str(prod)
elif frame.name == "record":
ts = _get(frame, "timestamp")
if ts is None:
continue
if hasattr(ts, "tzinfo") and ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
lat = _semicircles_to_deg(_get(frame, "position_lat"))
lon = _semicircles_to_deg(_get(frame, "position_long"))
speed_raw = _get(frame, "speed") # m/s
# enhanced_altitude is written by barometric altimeters (most
# modern Garmins). Fall back to GPS-derived altitude if absent.
_alt = _get(frame, "enhanced_altitude")
if _alt is not None:
has_baro_alt = True
else:
_alt = _get(frame, "altitude")
dp = DataPoint(
timestamp=ts,
lat=lat,
lon=lon,
elevation_m=_alt,
hr_bpm=_get(frame, "heart_rate"),
cadence_rpm=_get(frame, "cadence"),
speed_kmh=speed_raw * 3.6 if speed_raw is not None else None,
power_w=_get(frame, "power"),
temperature_c=_get(frame, "temperature"),
distance_m=_get(frame, "distance"),
)
points.append(dp)
elif frame.name == "lap":
ts = _get(frame, "start_time")
if ts is not None:
if hasattr(ts, "tzinfo") and ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
elapsed = _get(frame, "total_elapsed_time")
speed_raw = _get(frame, "avg_speed")
laps.append(
LapData(
index=len(laps),
started_at=ts,
duration_s=int(elapsed) if elapsed else None,
distance_m=_get(frame, "total_distance"),
elevation_gain_m=_get(frame, "total_ascent"),
avg_speed_kmh=speed_raw * 3.6 if speed_raw is not None else None,
avg_hr_bpm=_get(frame, "avg_heart_rate"),
avg_power_w=_get(frame, "avg_power"),
)
)
points = strip_bogus_leading_points(points)
if not points:
raise ValueError(f"No record messages found in {path.name}")
altitude_source = "barometric" if has_baro_alt else "gps"
return ParsedActivity(
points=points,
sport=sport,
sub_sport=sub_sport,
started_at=points[0].timestamp,
device=device,
laps=laps,
source_file=path.name,
source_hash="",
altitude_source=altitude_source,
)
def _get(frame: fitdecode.FitDataMessage, field: str, default: Any = None) -> Any:
try:
return frame.get_value(field)
except KeyError:
return default
def _semicircles_to_deg(value: Any) -> float | None:
if value is None:
return None
try:
deg = float(value) * (180.0 / 2**31)
# Sanity check: invalid semicircle values often come out as ±180+
if abs(deg) > 180:
return None
return deg
except (TypeError, ValueError):
return None
def _normalise_sub_sport(value: Any) -> str | None:
if value is None:
return None
s = str(value).lower().replace(" ", "_")
mapping = {
"generic": None, # FIT default — unspecified
"virtual_activity": "indoor",
"virtual": "indoor",
"road": "road",
"mountain": "mountain",
"gravel_cycling": "gravel",
"cyclocross": "gravel",
"indoor_cycling": "indoor",
"treadmill": "indoor",
"trail": "trail",
"track": "track",
"cross_country_skiing": "nordic",
"nordic_skiing": "nordic",
"skate_skiing": "nordic",
"backcountry_skiing":"nordic",
}
return mapping.get(s, s) or None