Files
bincio-activity/scripts/backfill_garmin_gear.py
Davide Scaini 7db7bf91e0 refactor: extract import_garmin_gear() + add backfill script
Move gear backfill logic from the route handler into
import_garmin_gear(data_dir, user_dir) in garmin_sync.py so it can be
called both from the API and from the CLI script.

scripts/backfill_garmin_gear.py finds all users with Garmin credentials
and runs the backfill for each, printing a per-user summary.
2026-05-24 13:13:47 +02:00

64 lines
2.1 KiB
Python

#!/usr/bin/env python3
"""Backfill Garmin gear for all users who have stored Garmin credentials.
Usage (on VPS):
cd /opt/bincio
uv run python3 scripts/backfill_garmin_gear.py --data-dir /var/bincio/data
# Limit to specific users:
uv run python3 scripts/backfill_garmin_gear.py --data-dir /var/bincio/data --users plagzo12
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
def main() -> None:
parser = argparse.ArgumentParser(description="Backfill Garmin gear for all users")
parser.add_argument("--data-dir", required=True, type=Path, help="Root data directory")
parser.add_argument("--users", nargs="*", help="Limit to these user handles (default: all)")
args = parser.parse_args()
data_dir: Path = args.data_dir.resolve()
if not data_dir.is_dir():
sys.exit(f"data-dir not found: {data_dir}")
from bincio.extract.garmin_api import GarminError, has_credentials
from bincio.extract.garmin_sync import import_garmin_gear
candidates = (
[data_dir / h for h in args.users]
if args.users
else sorted(p for p in data_dir.iterdir() if p.is_dir())
)
garmin_users = [p for p in candidates if has_credentials(p)]
if not garmin_users:
print("No users with Garmin credentials found.")
return
print(f"Found {len(garmin_users)} Garmin user(s): {[p.name for p in garmin_users]}\n")
for user_dir in garmin_users:
handle = user_dir.name
print(f"[{handle}] importing gear...", flush=True)
try:
result = import_garmin_gear(data_dir, user_dir)
print(
f"[{handle}] done — "
f"gear_added={result['gear_added']}, "
f"activities_updated={result['activities_updated']}"
)
except GarminError as exc:
print(f"[{handle}] Garmin error: {exc}")
except Exception as exc:
print(f"[{handle}] unexpected error: {type(exc).__name__}: {exc}")
print("\nAll done. Run merge_all or trigger a rebuild to refresh the index shards.")
if __name__ == "__main__":
main()