refactor: extract import_garmin_gear() + add backfill script
Move gear backfill logic from the route handler into import_garmin_gear(data_dir, user_dir) in garmin_sync.py so it can be called both from the API and from the CLI script. scripts/backfill_garmin_gear.py finds all users with Garmin credentials and runs the backfill for each, printing a per-user summary.
This commit is contained in:
@@ -0,0 +1,63 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Backfill Garmin gear for all users who have stored Garmin credentials.
|
||||
|
||||
Usage (on VPS):
|
||||
cd /opt/bincio
|
||||
uv run python3 scripts/backfill_garmin_gear.py --data-dir /var/bincio/data
|
||||
|
||||
# Limit to specific users:
|
||||
uv run python3 scripts/backfill_garmin_gear.py --data-dir /var/bincio/data --users plagzo12
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Backfill Garmin gear for all users")
|
||||
parser.add_argument("--data-dir", required=True, type=Path, help="Root data directory")
|
||||
parser.add_argument("--users", nargs="*", help="Limit to these user handles (default: all)")
|
||||
args = parser.parse_args()
|
||||
|
||||
data_dir: Path = args.data_dir.resolve()
|
||||
if not data_dir.is_dir():
|
||||
sys.exit(f"data-dir not found: {data_dir}")
|
||||
|
||||
from bincio.extract.garmin_api import GarminError, has_credentials
|
||||
from bincio.extract.garmin_sync import import_garmin_gear
|
||||
|
||||
candidates = (
|
||||
[data_dir / h for h in args.users]
|
||||
if args.users
|
||||
else sorted(p for p in data_dir.iterdir() if p.is_dir())
|
||||
)
|
||||
|
||||
garmin_users = [p for p in candidates if has_credentials(p)]
|
||||
if not garmin_users:
|
||||
print("No users with Garmin credentials found.")
|
||||
return
|
||||
|
||||
print(f"Found {len(garmin_users)} Garmin user(s): {[p.name for p in garmin_users]}\n")
|
||||
|
||||
for user_dir in garmin_users:
|
||||
handle = user_dir.name
|
||||
print(f"[{handle}] importing gear...", flush=True)
|
||||
try:
|
||||
result = import_garmin_gear(data_dir, user_dir)
|
||||
print(
|
||||
f"[{handle}] done — "
|
||||
f"gear_added={result['gear_added']}, "
|
||||
f"activities_updated={result['activities_updated']}"
|
||||
)
|
||||
except GarminError as exc:
|
||||
print(f"[{handle}] Garmin error: {exc}")
|
||||
except Exception as exc:
|
||||
print(f"[{handle}] unexpected error: {type(exc).__name__}: {exc}")
|
||||
|
||||
print("\nAll done. Run merge_all or trigger a rebuild to refresh the index shards.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user