7db7bf91e0
Move gear backfill logic from the route handler into import_garmin_gear(data_dir, user_dir) in garmin_sync.py so it can be called both from the API and from the CLI script. scripts/backfill_garmin_gear.py finds all users with Garmin credentials and runs the backfill for each, printing a per-user summary.
389 lines
15 KiB
Python
389 lines
15 KiB
Python
"""Garmin Connect incremental sync — generator-based, mirrors strava_sync_iter.
|
|
|
|
Sync state is stored in {user_dir}/garmin_sync.json:
|
|
{
|
|
"last_sync_at": "2026-04-12" ← date of last successful sync (YYYY-MM-DD)
|
|
}
|
|
|
|
We query Garmin for all activities from (last_sync_at - 1 day) to today,
|
|
then skip any that already exist (FileExistsError from ingest_parsed).
|
|
The -1 day buffer catches activities that were saved to Garmin slightly
|
|
after their recorded end time crosses midnight.
|
|
|
|
Each yielded dict has a ``type`` key:
|
|
- ``"fetching"`` — about to contact Garmin
|
|
- ``"progress"`` — one activity processed; keys: n, total, name, status, garmin_id
|
|
- ``"done"`` — final summary; keys: imported, skipped, error_count, errors
|
|
- ``"error"`` — fatal error; key: message
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import zipfile
|
|
from collections.abc import Generator
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
_SYNC_FILE = "garmin_sync.json"
|
|
|
|
|
|
# ── Sync state helpers ────────────────────────────────────────────────────────
|
|
|
|
def _load_sync_state(user_dir: Path) -> dict:
|
|
p = user_dir / _SYNC_FILE
|
|
if not p.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(p.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _save_sync_state(user_dir: Path, state: dict) -> None:
|
|
(user_dir / _SYNC_FILE).write_text(json.dumps(state, indent=2))
|
|
|
|
|
|
# ── FIT extraction from ZIP ───────────────────────────────────────────────────
|
|
|
|
def _extract_fit(zip_bytes: bytes) -> tuple[bytes, str]:
|
|
"""Return (fit_bytes, filename) from a Garmin activity ZIP.
|
|
|
|
Garmin always packages the original FIT as the first .fit entry.
|
|
Raises ValueError if no FIT file is found.
|
|
"""
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
|
|
fit_names = [n for n in zf.namelist() if n.lower().endswith(".fit")]
|
|
if not fit_names:
|
|
raise ValueError(f"No FIT file in archive. Contents: {zf.namelist()}")
|
|
name = fit_names[0]
|
|
return zf.read(name), name
|
|
|
|
|
|
# ── Main generator ────────────────────────────────────────────────────────────
|
|
|
|
def garmin_sync_iter(
|
|
data_dir: Path,
|
|
user_dir: Path,
|
|
) -> Generator[dict, None, None]:
|
|
"""Fetch new activities from Garmin Connect and ingest them.
|
|
|
|
Args:
|
|
data_dir: Root data directory (used for encryption key lookup).
|
|
user_dir: Per-user directory (contains activities/, garmin_creds.json, etc.).
|
|
"""
|
|
import uuid as _uuid
|
|
|
|
from bincio.extract.garmin_api import GarminError, get_client
|
|
from bincio.extract.ingest import ingest_parsed
|
|
from bincio.extract.parsers.fit import FitParser
|
|
from bincio.serve.routers.gear import _load as _gear_load
|
|
from bincio.serve.routers.gear import _save as _gear_save
|
|
|
|
# ── Login ──────────────────────────────────────────────────────────────────
|
|
try:
|
|
client = get_client(data_dir, user_dir)
|
|
except GarminError as exc:
|
|
yield {"type": "error", "message": str(exc)}
|
|
return
|
|
|
|
yield {"type": "fetching"}
|
|
|
|
# ── Sync gear registry ─────────────────────────────────────────────────────
|
|
_garmin_uuid_to_name: dict[str, str] = {}
|
|
try:
|
|
prof = client.connectapi("/userprofile-service/socialProfile")
|
|
profile_id = prof.get("profileId") if isinstance(prof, dict) else None
|
|
if profile_id:
|
|
garmin_gear = client.get_gear(profile_id)
|
|
if isinstance(garmin_gear, list):
|
|
registry = _gear_load(user_dir)
|
|
known = {g.get("garmin_id") for g in registry if g.get("garmin_id")}
|
|
for g in garmin_gear:
|
|
guuid = g.get("uuid") or ""
|
|
name = (g.get("customMakeModel") or g.get("displayName") or
|
|
f"{g.get('gearMakeName','')} {g.get('gearModelName','')}".strip())
|
|
if not name or not guuid:
|
|
continue
|
|
_garmin_uuid_to_name[guuid] = name
|
|
if guuid not in known:
|
|
gear_type = g.get("gearTypeName", "").lower()
|
|
if gear_type not in ("bike", "shoes", "skis"):
|
|
gear_type = "other"
|
|
retired = g.get("gearStatusName") == "retired"
|
|
registry.append({"id": str(_uuid.uuid4()), "name": name,
|
|
"type": gear_type, "retired": retired,
|
|
"garmin_id": guuid})
|
|
known.add(guuid)
|
|
else:
|
|
# Update name in case it changed
|
|
for item in registry:
|
|
if item.get("garmin_id") == guuid:
|
|
item["name"] = name
|
|
_gear_save(user_dir, registry)
|
|
except Exception:
|
|
pass # gear sync is best-effort; don't abort activity sync
|
|
|
|
# ── Determine date range ───────────────────────────────────────────────────
|
|
state = _load_sync_state(user_dir)
|
|
last = state.get("last_sync_at")
|
|
|
|
if last:
|
|
# Start one day before last sync to catch edge cases around midnight
|
|
start_dt = datetime.fromisoformat(last) - timedelta(days=1)
|
|
else:
|
|
# First sync: import everything Garmin has
|
|
start_dt = datetime(2000, 1, 1)
|
|
|
|
start_date = start_dt.strftime("%Y-%m-%d")
|
|
end_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
# ── Fetch activity list ────────────────────────────────────────────────────
|
|
try:
|
|
activities = client.get_activities_by_date(
|
|
startdate=start_date,
|
|
enddate=end_date,
|
|
)
|
|
except Exception as exc:
|
|
yield {"type": "error", "message": f"Failed to fetch activity list: {exc}"}
|
|
return
|
|
|
|
total = len(activities)
|
|
imported = 0
|
|
skipped = 0
|
|
errors: list[str] = []
|
|
parser = FitParser()
|
|
|
|
# ── Process each activity ──────────────────────────────────────────────────
|
|
for n, meta in enumerate(activities, 1):
|
|
garmin_id = meta.get("activityId")
|
|
name = meta.get("activityName") or "Untitled"
|
|
|
|
try:
|
|
# Download original FIT (wrapped in a ZIP by Garmin)
|
|
try:
|
|
zip_bytes = client.download_activity(
|
|
garmin_id,
|
|
dl_fmt=client.ActivityDownloadFormat.ORIGINAL,
|
|
)
|
|
except Exception as exc:
|
|
raise RuntimeError(f"Download failed: {exc}") from exc
|
|
|
|
try:
|
|
fit_bytes, fit_name = _extract_fit(zip_bytes)
|
|
except Exception as exc:
|
|
raise RuntimeError(f"ZIP extraction failed: {exc}") from exc
|
|
|
|
# Parse FIT — pass a dummy Path so the parser has a filename for
|
|
# any format-detection logic; raw bytes are the actual data.
|
|
fake_path = Path(fit_name)
|
|
try:
|
|
parsed = parser.parse(fake_path, fit_bytes)
|
|
except Exception as exc:
|
|
raise RuntimeError(f"FIT parse error: {exc}") from exc
|
|
|
|
# Resolve gear for this activity
|
|
if garmin_id and _garmin_uuid_to_name:
|
|
try:
|
|
act_gear = client.get_activity_gear(garmin_id)
|
|
if isinstance(act_gear, list) and act_gear:
|
|
guuid = act_gear[0].get("uuid") or ""
|
|
parsed.gear = _garmin_uuid_to_name.get(guuid) or None
|
|
except Exception:
|
|
pass
|
|
|
|
# Ingest — raises FileExistsError if already present (dedup)
|
|
ingest_parsed(parsed, user_dir)
|
|
imported += 1
|
|
yield {
|
|
"type": "progress",
|
|
"n": n, "total": total, "name": name,
|
|
"status": "imported",
|
|
"garmin_id": garmin_id,
|
|
}
|
|
|
|
except FileExistsError:
|
|
skipped += 1
|
|
yield {
|
|
"type": "progress",
|
|
"n": n, "total": total, "name": name,
|
|
"status": "skipped",
|
|
"garmin_id": garmin_id,
|
|
}
|
|
|
|
except Exception as exc:
|
|
errors.append(f"{garmin_id} ({name}): {type(exc).__name__}: {exc}")
|
|
yield {
|
|
"type": "progress",
|
|
"n": n, "total": total, "name": name,
|
|
"status": "error",
|
|
"garmin_id": garmin_id,
|
|
}
|
|
|
|
# ── Persist sync state ─────────────────────────────────────────────────────
|
|
state["last_sync_at"] = datetime.now(UTC).strftime("%Y-%m-%d")
|
|
state["total_imported"] = state.get("total_imported", 0) + imported
|
|
_save_sync_state(user_dir, state)
|
|
|
|
yield {
|
|
"type": "done",
|
|
"imported": imported,
|
|
"skipped": skipped,
|
|
"error_count": len(errors),
|
|
"errors": errors[:5],
|
|
}
|
|
|
|
|
|
def run_garmin_sync(data_dir: Path, user_dir: Path) -> dict:
|
|
"""Blocking wrapper around garmin_sync_iter for non-SSE callers."""
|
|
result: dict = {}
|
|
for event in garmin_sync_iter(data_dir, user_dir):
|
|
if event["type"] == "done":
|
|
result = event
|
|
elif event["type"] == "error":
|
|
raise RuntimeError(event["message"])
|
|
return result
|
|
|
|
|
|
def import_garmin_gear(data_dir: Path, user_dir: Path) -> dict:
|
|
"""Backfill gear for all existing activities by querying Garmin's gear-activities API.
|
|
|
|
For each gear item, fetches the list of activities from Garmin and matches them
|
|
to local activities by UTC start timestamp (±60 s). Writes a sidecar and calls
|
|
merge_one for each match that doesn't already have gear set.
|
|
|
|
Returns {"gear_added": int, "activities_updated": int}.
|
|
"""
|
|
import contextlib
|
|
import re
|
|
import uuid
|
|
|
|
import yaml
|
|
|
|
from bincio.extract.garmin_api import GarminError, get_client
|
|
from bincio.render.merge import merge_one
|
|
from bincio.serve.routers.gear import _load as _gear_load
|
|
from bincio.serve.routers.gear import _save as _gear_save
|
|
|
|
client = get_client(data_dir, user_dir)
|
|
|
|
# Fetch gear list from Garmin
|
|
prof = client.connectapi("/userprofile-service/socialProfile")
|
|
profile_id = prof.get("profileId") if isinstance(prof, dict) else None
|
|
if not profile_id:
|
|
raise GarminError("Could not read Garmin profile ID")
|
|
garmin_gear = client.get_gear(profile_id)
|
|
|
|
if not isinstance(garmin_gear, list) or not garmin_gear:
|
|
return {"gear_added": 0, "activities_updated": 0}
|
|
|
|
# Build / update local gear registry
|
|
registry = _gear_load(user_dir)
|
|
known = {g.get("garmin_id") for g in registry if g.get("garmin_id")}
|
|
uuid_to_name: dict[str, str] = {}
|
|
gear_added = 0
|
|
|
|
for g in garmin_gear:
|
|
guuid = g.get("uuid") or ""
|
|
name = (g.get("customMakeModel") or g.get("displayName") or
|
|
f"{g.get('gearMakeName', '')} {g.get('gearModelName', '')}".strip())
|
|
if not name or not guuid:
|
|
continue
|
|
uuid_to_name[guuid] = name
|
|
if guuid not in known:
|
|
gear_type = g.get("gearTypeName", "").lower()
|
|
if gear_type not in ("bike", "shoes", "skis"):
|
|
gear_type = "other"
|
|
retired = g.get("gearStatusName") == "retired"
|
|
registry.append({"id": str(uuid.uuid4()), "name": name,
|
|
"type": gear_type, "retired": retired, "garmin_id": guuid})
|
|
known.add(guuid)
|
|
gear_added += 1
|
|
else:
|
|
for item in registry:
|
|
if item.get("garmin_id") == guuid:
|
|
item["name"] = name
|
|
|
|
_gear_save(user_dir, registry)
|
|
|
|
# Build timestamp → activity_id map from index shards
|
|
ts_to_id: dict[int, str] = {}
|
|
merged_dir = user_dir / "_merged"
|
|
shard_dir = merged_dir if merged_dir.exists() else user_dir
|
|
for shard_path in sorted(shard_dir.glob("index*.json")):
|
|
try:
|
|
idx = json.loads(shard_path.read_text(encoding="utf-8"))
|
|
for a in idx.get("activities", []):
|
|
started = a.get("started_at") or ""
|
|
if started and a.get("id"):
|
|
dt = datetime.fromisoformat(started.replace("Z", "+00:00"))
|
|
ts_to_id[int(dt.astimezone(UTC).timestamp())] = a["id"]
|
|
except (OSError, json.JSONDecodeError, KeyError):
|
|
continue
|
|
|
|
edits_dir = user_dir / "edits"
|
|
edits_dir.mkdir(exist_ok=True)
|
|
activities_updated = 0
|
|
|
|
for guuid, gear_name in uuid_to_name.items():
|
|
try:
|
|
gear_acts = client.get_gear_activities(guuid, limit=10000)
|
|
except Exception:
|
|
continue
|
|
if not isinstance(gear_acts, list):
|
|
continue
|
|
|
|
for ga in gear_acts:
|
|
gmt = ga.get("startTimeGMT") or ""
|
|
if not gmt:
|
|
continue
|
|
try:
|
|
dt = datetime.strptime(gmt, "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
|
|
ts = int(dt.timestamp())
|
|
except ValueError:
|
|
continue
|
|
|
|
act_id = None
|
|
for delta in range(0, 61):
|
|
act_id = ts_to_id.get(ts + delta) or ts_to_id.get(ts - delta)
|
|
if act_id:
|
|
break
|
|
if not act_id:
|
|
continue
|
|
|
|
# Skip if activity already has gear set
|
|
act_json = user_dir / "activities" / f"{act_id}.json"
|
|
if act_json.exists():
|
|
try:
|
|
if json.loads(act_json.read_text(encoding="utf-8")).get("gear"):
|
|
continue
|
|
except (OSError, json.JSONDecodeError):
|
|
pass
|
|
|
|
sidecar = edits_dir / f"{act_id}.md"
|
|
fm, body = {}, ""
|
|
if sidecar.exists():
|
|
try:
|
|
text = sidecar.read_text(encoding="utf-8")
|
|
parts = re.split(r"^---[ \t]*$", text, maxsplit=2, flags=re.MULTILINE)
|
|
if len(parts) >= 3:
|
|
fm = yaml.safe_load(parts[1]) or {}
|
|
body = parts[2].strip()
|
|
except Exception:
|
|
pass
|
|
if fm.get("gear"):
|
|
continue
|
|
|
|
fm["gear"] = gear_name
|
|
fm_text = yaml.safe_dump(fm, default_flow_style=False, allow_unicode=True).strip()
|
|
content = f"---\n{fm_text}\n---\n"
|
|
if body:
|
|
content += f"\n{body}\n"
|
|
sidecar.write_text(content, encoding="utf-8")
|
|
with contextlib.suppress(Exception):
|
|
merge_one(user_dir, act_id)
|
|
activities_updated += 1
|
|
|
|
return {"gear_added": gear_added, "activities_updated": activities_updated}
|