"""Garmin Connect incremental sync — generator-based, mirrors strava_sync_iter. Sync state is stored in {user_dir}/garmin_sync.json: { "last_sync_at": "2026-04-12" ← date of last successful sync (YYYY-MM-DD) } We query Garmin for all activities from (last_sync_at - 1 day) to today, then skip any that already exist (FileExistsError from ingest_parsed). The -1 day buffer catches activities that were saved to Garmin slightly after their recorded end time crosses midnight. Each yielded dict has a ``type`` key: - ``"fetching"`` — about to contact Garmin - ``"progress"`` — one activity processed; keys: n, total, name, status, garmin_id - ``"done"`` — final summary; keys: imported, skipped, error_count, errors - ``"error"`` — fatal error; key: message """ from __future__ import annotations import io import json import zipfile from collections.abc import Generator from datetime import UTC, datetime, timedelta from pathlib import Path _SYNC_FILE = "garmin_sync.json" # ── Sync state helpers ──────────────────────────────────────────────────────── def _load_sync_state(user_dir: Path) -> dict: p = user_dir / _SYNC_FILE if not p.exists(): return {} try: return json.loads(p.read_text()) except Exception: return {} def _save_sync_state(user_dir: Path, state: dict) -> None: (user_dir / _SYNC_FILE).write_text(json.dumps(state, indent=2)) # ── FIT extraction from ZIP ─────────────────────────────────────────────────── def _extract_fit(zip_bytes: bytes) -> tuple[bytes, str]: """Return (fit_bytes, filename) from a Garmin activity ZIP. Garmin always packages the original FIT as the first .fit entry. Raises ValueError if no FIT file is found. """ with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: fit_names = [n for n in zf.namelist() if n.lower().endswith(".fit")] if not fit_names: raise ValueError(f"No FIT file in archive. Contents: {zf.namelist()}") name = fit_names[0] return zf.read(name), name # ── Main generator ──────────────────────────────────────────────────────────── def garmin_sync_iter( data_dir: Path, user_dir: Path, ) -> Generator[dict, None, None]: """Fetch new activities from Garmin Connect and ingest them. Args: data_dir: Root data directory (used for encryption key lookup). user_dir: Per-user directory (contains activities/, garmin_creds.json, etc.). """ import uuid as _uuid from bincio.extract.garmin_api import GarminError, get_client from bincio.extract.ingest import ingest_parsed from bincio.extract.parsers.fit import FitParser from bincio.serve.routers.gear import _load as _gear_load from bincio.serve.routers.gear import _save as _gear_save # ── Login ────────────────────────────────────────────────────────────────── try: client = get_client(data_dir, user_dir) except GarminError as exc: yield {"type": "error", "message": str(exc)} return yield {"type": "fetching"} # ── Sync gear registry ───────────────────────────────────────────────────── _garmin_uuid_to_name: dict[str, str] = {} try: prof = client.connectapi("/userprofile-service/socialProfile") profile_id = prof.get("profileId") if isinstance(prof, dict) else None if profile_id: garmin_gear = client.get_gear(profile_id) if isinstance(garmin_gear, list): registry = _gear_load(user_dir) known = {g.get("garmin_id") for g in registry if g.get("garmin_id")} for g in garmin_gear: guuid = g.get("uuid") or "" name = (g.get("customMakeModel") or g.get("displayName") or f"{g.get('gearMakeName','')} {g.get('gearModelName','')}".strip()) if not name or not guuid: continue _garmin_uuid_to_name[guuid] = name if guuid not in known: gear_type = g.get("gearTypeName", "").lower() if gear_type not in ("bike", "shoes", "skis"): gear_type = "other" retired = g.get("gearStatusName") == "retired" registry.append({"id": str(_uuid.uuid4()), "name": name, "type": gear_type, "retired": retired, "garmin_id": guuid}) known.add(guuid) else: # Update name in case it changed for item in registry: if item.get("garmin_id") == guuid: item["name"] = name _gear_save(user_dir, registry) except Exception: pass # gear sync is best-effort; don't abort activity sync # ── Determine date range ─────────────────────────────────────────────────── state = _load_sync_state(user_dir) last = state.get("last_sync_at") if last: # Start one day before last sync to catch edge cases around midnight start_dt = datetime.fromisoformat(last) - timedelta(days=1) else: # First sync: import everything Garmin has start_dt = datetime(2000, 1, 1) start_date = start_dt.strftime("%Y-%m-%d") end_date = datetime.now().strftime("%Y-%m-%d") # ── Fetch activity list ──────────────────────────────────────────────────── try: activities = client.get_activities_by_date( startdate=start_date, enddate=end_date, ) except Exception as exc: yield {"type": "error", "message": f"Failed to fetch activity list: {exc}"} return total = len(activities) imported = 0 skipped = 0 errors: list[str] = [] parser = FitParser() # ── Process each activity ────────────────────────────────────────────────── for n, meta in enumerate(activities, 1): garmin_id = meta.get("activityId") name = meta.get("activityName") or "Untitled" try: # Download original FIT (wrapped in a ZIP by Garmin) try: zip_bytes = client.download_activity( garmin_id, dl_fmt=client.ActivityDownloadFormat.ORIGINAL, ) except Exception as exc: raise RuntimeError(f"Download failed: {exc}") from exc try: fit_bytes, fit_name = _extract_fit(zip_bytes) except Exception as exc: raise RuntimeError(f"ZIP extraction failed: {exc}") from exc # Parse FIT — pass a dummy Path so the parser has a filename for # any format-detection logic; raw bytes are the actual data. fake_path = Path(fit_name) try: parsed = parser.parse(fake_path, fit_bytes) except Exception as exc: raise RuntimeError(f"FIT parse error: {exc}") from exc # Resolve gear for this activity if garmin_id and _garmin_uuid_to_name: try: act_gear = client.get_activity_gear(garmin_id) if isinstance(act_gear, list) and act_gear: guuid = act_gear[0].get("uuid") or "" parsed.gear = _garmin_uuid_to_name.get(guuid) or None except Exception: pass # Ingest — raises FileExistsError if already present (dedup) ingest_parsed(parsed, user_dir) imported += 1 yield { "type": "progress", "n": n, "total": total, "name": name, "status": "imported", "garmin_id": garmin_id, } except FileExistsError: skipped += 1 yield { "type": "progress", "n": n, "total": total, "name": name, "status": "skipped", "garmin_id": garmin_id, } except Exception as exc: errors.append(f"{garmin_id} ({name}): {type(exc).__name__}: {exc}") yield { "type": "progress", "n": n, "total": total, "name": name, "status": "error", "garmin_id": garmin_id, } # ── Persist sync state ───────────────────────────────────────────────────── state["last_sync_at"] = datetime.now(UTC).strftime("%Y-%m-%d") state["total_imported"] = state.get("total_imported", 0) + imported _save_sync_state(user_dir, state) yield { "type": "done", "imported": imported, "skipped": skipped, "error_count": len(errors), "errors": errors[:5], } def run_garmin_sync(data_dir: Path, user_dir: Path) -> dict: """Blocking wrapper around garmin_sync_iter for non-SSE callers.""" result: dict = {} for event in garmin_sync_iter(data_dir, user_dir): if event["type"] == "done": result = event elif event["type"] == "error": raise RuntimeError(event["message"]) return result def import_garmin_gear(data_dir: Path, user_dir: Path) -> dict: """Backfill gear for all existing activities by querying Garmin's gear-activities API. For each gear item, fetches the list of activities from Garmin and matches them to local activities by UTC start timestamp (±60 s). Writes a sidecar and calls merge_one for each match that doesn't already have gear set. Returns {"gear_added": int, "activities_updated": int}. """ import contextlib import re import uuid import yaml from bincio.extract.garmin_api import GarminError, get_client from bincio.render.merge import merge_one from bincio.serve.routers.gear import _load as _gear_load from bincio.serve.routers.gear import _save as _gear_save client = get_client(data_dir, user_dir) # Fetch gear list from Garmin prof = client.connectapi("/userprofile-service/socialProfile") profile_id = prof.get("profileId") if isinstance(prof, dict) else None if not profile_id: raise GarminError("Could not read Garmin profile ID") garmin_gear = client.get_gear(profile_id) if not isinstance(garmin_gear, list) or not garmin_gear: return {"gear_added": 0, "activities_updated": 0} # Build / update local gear registry registry = _gear_load(user_dir) known = {g.get("garmin_id") for g in registry if g.get("garmin_id")} uuid_to_name: dict[str, str] = {} gear_added = 0 for g in garmin_gear: guuid = g.get("uuid") or "" name = (g.get("customMakeModel") or g.get("displayName") or f"{g.get('gearMakeName', '')} {g.get('gearModelName', '')}".strip()) if not name or not guuid: continue uuid_to_name[guuid] = name if guuid not in known: gear_type = g.get("gearTypeName", "").lower() if gear_type not in ("bike", "shoes", "skis"): gear_type = "other" retired = g.get("gearStatusName") == "retired" registry.append({"id": str(uuid.uuid4()), "name": name, "type": gear_type, "retired": retired, "garmin_id": guuid}) known.add(guuid) gear_added += 1 else: for item in registry: if item.get("garmin_id") == guuid: item["name"] = name _gear_save(user_dir, registry) # Build timestamp → activity_id map from index shards ts_to_id: dict[int, str] = {} merged_dir = user_dir / "_merged" shard_dir = merged_dir if merged_dir.exists() else user_dir for shard_path in sorted(shard_dir.glob("index*.json")): try: idx = json.loads(shard_path.read_text(encoding="utf-8")) for a in idx.get("activities", []): started = a.get("started_at") or "" if started and a.get("id"): dt = datetime.fromisoformat(started.replace("Z", "+00:00")) ts_to_id[int(dt.astimezone(UTC).timestamp())] = a["id"] except (OSError, json.JSONDecodeError, KeyError): continue edits_dir = user_dir / "edits" edits_dir.mkdir(exist_ok=True) activities_updated = 0 for guuid, gear_name in uuid_to_name.items(): try: gear_acts = client.get_gear_activities(guuid, limit=10000) except Exception: continue if not isinstance(gear_acts, list): continue for ga in gear_acts: gmt = ga.get("startTimeGMT") or "" if not gmt: continue try: dt = datetime.strptime(gmt, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC) ts = int(dt.timestamp()) except ValueError: continue act_id = None for delta in range(0, 61): act_id = ts_to_id.get(ts + delta) or ts_to_id.get(ts - delta) if act_id: break if not act_id: continue # Skip if activity already has gear set act_json = user_dir / "activities" / f"{act_id}.json" if act_json.exists(): try: if json.loads(act_json.read_text(encoding="utf-8")).get("gear"): continue except (OSError, json.JSONDecodeError): pass sidecar = edits_dir / f"{act_id}.md" fm, body = {}, "" if sidecar.exists(): try: text = sidecar.read_text(encoding="utf-8") parts = re.split(r"^---[ \t]*$", text, maxsplit=2, flags=re.MULTILINE) if len(parts) >= 3: fm = yaml.safe_load(parts[1]) or {} body = parts[2].strip() except Exception: pass if fm.get("gear"): continue fm["gear"] = gear_name fm_text = yaml.safe_dump(fm, default_flow_style=False, allow_unicode=True).strip() content = f"---\n{fm_text}\n---\n" if body: content += f"\n{body}\n" sidecar.write_text(content, encoding="utf-8") with contextlib.suppress(Exception): merge_one(user_dir, act_id) activities_updated += 1 return {"gear_added": gear_added, "activities_updated": activities_updated}