2287d6e2ee
Each sync run now writes _strava_sync_status.json per user (status, imported count, error message). New admin endpoints expose this data and allow triggering an on-demand sync. The admin page gains a Strava Sync section showing per-user token/credentials state, total imported, last sync time, and last-run status with inline error messages.
273 lines
9.8 KiB
Python
273 lines
9.8 KiB
Python
"""Headless multi-user Strava sync — designed to run as a systemd timer.
|
|
|
|
For each user directory that contains both strava_token.json and
|
|
strava_credentials.json, refreshes the token, fetches new activities,
|
|
writes them to the user's data dir, merges sidecars, and updates the
|
|
_strava_sync.json checkpoint.
|
|
|
|
After all users are synced, optionally POSTs to a server endpoint
|
|
to trigger an Astro rebuild + rsync.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import urllib.error
|
|
import urllib.request
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
import click
|
|
|
|
_TOKEN_FILE = "strava_token.json"
|
|
_CREDS_FILE = "strava_credentials.json"
|
|
_SYNC_FILE = "_strava_sync.json"
|
|
_STATUS_FILE = "_strava_sync_status.json"
|
|
|
|
log = logging.getLogger("bincio.sync_strava")
|
|
|
|
|
|
def _write_status(
|
|
user_dir: Path,
|
|
status: str,
|
|
imported: int,
|
|
errors: int,
|
|
error_message: str | None = None,
|
|
) -> None:
|
|
payload: dict = {
|
|
"status": status,
|
|
"imported": imported,
|
|
"errors": errors,
|
|
"last_run": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
if error_message is not None:
|
|
payload["error_message"] = error_message
|
|
try:
|
|
(user_dir / _STATUS_FILE).write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _load_creds(user_dir: Path) -> tuple[str, str] | None:
|
|
"""Return (client_id, client_secret) from strava_credentials.json, or None."""
|
|
p = user_dir / _CREDS_FILE
|
|
if not p.exists():
|
|
return None
|
|
try:
|
|
d = json.loads(p.read_text(encoding="utf-8"))
|
|
cid = str(d.get("client_id", "")).strip()
|
|
csec = str(d.get("client_secret", "")).strip()
|
|
if cid and csec:
|
|
return cid, csec
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def sync_user(user_dir: Path) -> tuple[int, int]:
|
|
"""Sync one user's Strava activities.
|
|
|
|
Returns (new_count, error_count). Skips silently if no credentials.
|
|
"""
|
|
from bincio.extract.strava_api import ensure_fresh, fetch_activities, fetch_streams, StravaError
|
|
from bincio.extract.metrics import compute
|
|
from bincio.extract.writer import build_summary, make_activity_id, write_activity, write_index
|
|
from bincio.import_.strava import _strava_to_parsed, _patch_from_summary
|
|
from bincio.render.merge import merge_all
|
|
|
|
handle = user_dir.name
|
|
|
|
creds = _load_creds(user_dir)
|
|
if creds is None:
|
|
log.debug("sync[%s]: no strava_credentials.json — skipped", handle)
|
|
_write_status(user_dir, "no_credentials", 0, 0)
|
|
return 0, 0
|
|
|
|
client_id, client_secret = creds
|
|
|
|
try:
|
|
token = ensure_fresh(user_dir, client_id, client_secret)
|
|
except StravaError as exc:
|
|
log.error("sync[%s]: token refresh failed: %s", handle, exc)
|
|
_write_status(user_dir, "token_error", 0, 1, str(exc))
|
|
return 0, 1
|
|
|
|
access_token = token["access_token"]
|
|
|
|
# Load incremental sync state
|
|
sync_path = user_dir / _SYNC_FILE
|
|
sync_state: dict = (
|
|
json.loads(sync_path.read_text(encoding="utf-8"))
|
|
if sync_path.exists() else {}
|
|
)
|
|
imported_ids: set[str] = set(sync_state.get("imported_ids", []))
|
|
|
|
after_ts: int | None = None
|
|
if sync_state.get("last_sync"):
|
|
last = datetime.fromisoformat(sync_state["last_sync"])
|
|
# 1-hour overlap to catch activities saved late to Strava
|
|
after_ts = int((last - timedelta(hours=1)).timestamp())
|
|
|
|
try:
|
|
all_acts = fetch_activities(access_token, after=after_ts)
|
|
except StravaError as exc:
|
|
log.error("sync[%s]: fetch_activities failed: %s", handle, exc)
|
|
_write_status(user_dir, "api_error", 0, 1, str(exc))
|
|
return 0, 1
|
|
|
|
new_acts = [a for a in all_acts if str(a["id"]) not in imported_ids]
|
|
log.info(
|
|
"sync[%s]: %d new, %d already imported",
|
|
handle, len(new_acts), len(all_acts) - len(new_acts),
|
|
)
|
|
if not new_acts:
|
|
_write_status(user_dir, "ok", 0, 0)
|
|
return 0, 0
|
|
|
|
# Load existing index so we can update it in place
|
|
index_path = user_dir / "index.json"
|
|
if index_path.exists():
|
|
index_data = json.loads(index_path.read_text(encoding="utf-8"))
|
|
else:
|
|
index_data = {"owner": {"handle": handle}, "activities": []}
|
|
owner = index_data.get("owner", {})
|
|
summaries: dict[str, dict] = {s["id"]: s for s in index_data.get("activities", [])}
|
|
|
|
imported = 0
|
|
errors = 0
|
|
|
|
for act in new_acts:
|
|
strava_id = str(act["id"])
|
|
try:
|
|
try:
|
|
streams = fetch_streams(access_token, int(strava_id))
|
|
except StravaError as exc:
|
|
if "404" in str(exc):
|
|
# Activity exists in list but has no accessible streams (old/deleted GPS).
|
|
# Still import it using summary-only stats via _patch_from_summary.
|
|
streams = {}
|
|
else:
|
|
raise
|
|
|
|
# strava_api.fetch_streams returns {type: {"data": [...], ...}};
|
|
# _strava_to_parsed (from import_/strava.py) expects {type: [...]}
|
|
flat_streams = {
|
|
k: v["data"] for k, v in streams.items()
|
|
if isinstance(v, dict) and "data" in v
|
|
}
|
|
|
|
parsed = _strava_to_parsed(act, flat_streams)
|
|
metrics = compute(parsed)
|
|
metrics = _patch_from_summary(metrics, act)
|
|
act_id = make_activity_id(parsed)
|
|
|
|
# Respect Strava visibility: only_me → unlisted
|
|
visibility = act.get("visibility") or ""
|
|
privacy = "unlisted" if (act.get("private") or visibility == "only_me") else "public"
|
|
|
|
write_activity(parsed, metrics, user_dir, privacy=privacy)
|
|
summaries[act_id] = build_summary(parsed, metrics, act_id, privacy)
|
|
imported_ids.add(strava_id)
|
|
imported += 1
|
|
except Exception as exc:
|
|
log.error("sync[%s]: activity %s failed: %s", handle, strava_id, exc)
|
|
errors += 1
|
|
|
|
# Persist index and sync checkpoint
|
|
write_index(list(summaries.values()), user_dir, owner)
|
|
sync_state["imported_ids"] = sorted(imported_ids)
|
|
sync_state["last_sync"] = datetime.now(timezone.utc).isoformat()
|
|
sync_path.write_text(json.dumps(sync_state, indent=2), encoding="utf-8")
|
|
|
|
# Merge sidecars so _merged/ reflects any edits
|
|
merge_all(user_dir)
|
|
|
|
log.info("sync[%s]: done — %d imported, %d errors", handle, imported, errors)
|
|
_write_status(user_dir, "ok", imported, errors)
|
|
return imported, errors
|
|
|
|
|
|
def sync_all(root_data_dir: Path) -> dict[str, tuple[int, int]]:
|
|
"""Sync all users that have a strava_token.json. Returns {handle: (new, errors)}."""
|
|
results: dict[str, tuple[int, int]] = {}
|
|
token_files = sorted(root_data_dir.glob("*/strava_token.json"))
|
|
if not token_files:
|
|
log.info("sync_all: no users with strava_token.json found in %s", root_data_dir)
|
|
return results
|
|
log.info("sync_all: %d user(s) with Strava token", len(token_files))
|
|
for tf in token_files:
|
|
user_dir = tf.parent
|
|
handle = user_dir.name
|
|
try:
|
|
results[handle] = sync_user(user_dir)
|
|
except Exception as exc:
|
|
log.exception("sync_all[%s]: unexpected error: %s", handle, exc)
|
|
results[handle] = (0, -1)
|
|
return results
|
|
|
|
|
|
def _post_rebuild(url: str, secret: str | None) -> None:
|
|
headers: dict[str, str] = {"Content-Type": "application/json"}
|
|
if secret:
|
|
headers["X-Sync-Secret"] = secret
|
|
req = urllib.request.Request(url, data=b"{}", headers=headers, method="POST")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
log.info("rebuild triggered: HTTP %d", resp.status)
|
|
except urllib.error.HTTPError as exc:
|
|
log.error("rebuild trigger failed: HTTP %d %s", exc.code, exc.read().decode()[:100])
|
|
except Exception as exc:
|
|
log.error("rebuild trigger failed: %s", exc)
|
|
|
|
|
|
@click.command("sync-strava")
|
|
@click.option("--data-dir", "data_dir_str", required=True,
|
|
help="Root data dir (parent of all user dirs, e.g. /var/bincio/data).")
|
|
@click.option("--user", "only_user", default=None,
|
|
help="Sync only this handle instead of all users.")
|
|
@click.option("--rebuild-url", default=None, envvar="BINCIO_REBUILD_URL",
|
|
help="POST here after a successful sync to trigger a site rebuild.")
|
|
@click.option("--rebuild-secret", default=None, envvar="BINCIO_SYNC_SECRET",
|
|
help="Value sent as X-Sync-Secret header to the rebuild endpoint.")
|
|
def sync_strava_cmd(
|
|
data_dir_str: str,
|
|
only_user: str | None,
|
|
rebuild_url: str | None,
|
|
rebuild_secret: str | None,
|
|
) -> None:
|
|
"""Headless Strava sync for all users (designed for systemd timer).
|
|
|
|
Discovers every user directory that has both strava_token.json and
|
|
strava_credentials.json, syncs new activities, and optionally triggers
|
|
a site rebuild via an HTTP POST.
|
|
"""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s — %(message)s",
|
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
|
)
|
|
|
|
root = Path(data_dir_str).expanduser().resolve()
|
|
if not root.is_dir():
|
|
raise click.ClickException(f"Data dir not found: {root}")
|
|
|
|
if only_user:
|
|
user_dir = root / only_user
|
|
if not user_dir.is_dir():
|
|
raise click.ClickException(f"User dir not found: {user_dir}")
|
|
new_count, err_count = sync_user(user_dir)
|
|
click.echo(f"{only_user}: {new_count} imported, {err_count} errors")
|
|
total_new = new_count
|
|
else:
|
|
results = sync_all(root)
|
|
total_new = sum(n for n, _ in results.values())
|
|
total_err = sum(e for _, e in results.values())
|
|
click.echo(
|
|
f"Sync complete: {len(results)} users, "
|
|
f"{total_new} new activities, {total_err} errors"
|
|
)
|
|
|
|
if total_new > 0 and rebuild_url:
|
|
_post_rebuild(rebuild_url, rebuild_secret)
|