Files
bincio-activity/bincio/sync_strava.py
T
Davide Scaini 12693dbd60 feat: scheduled Strava sync + admin suspend/delete account
- Add bincio sync-strava command: headless multi-user Strava sync
  designed for systemd timer. Discovers users via strava_token.json,
  skips users without their own strava_credentials.json, respects
  Strava visibility (only_me → unlisted). Treats 404 stream errors as
  no-GPS activities rather than retrying every run.
- Add deploy/systemd/bincio-sync.{service,timer}: runs every 3 hours,
  Persistent=true to catch up after downtime.
- Add POST /api/internal/rebuild: webhook for sync timer to trigger
  site rebuild, authenticated via X-Sync-Secret header.
- Add suspended column to users table with auto-migration on open_db.
  Suspended users are blocked at login and session lookup (covers both
  activity site and wiki, which share instance.db).
- Add POST /api/admin/users/{handle}/suspend|unsuspend and
  DELETE /api/admin/users/{handle}/account endpoints.
- Admin panel: Suspend/Unsuspend toggle, Del account button, suspended
  badge on user row.
2026-05-08 10:36:21 +02:00

246 lines
9.0 KiB
Python

"""Headless multi-user Strava sync — designed to run as a systemd timer.
For each user directory that contains both strava_token.json and
strava_credentials.json, refreshes the token, fetches new activities,
writes them to the user's data dir, merges sidecars, and updates the
_strava_sync.json checkpoint.
After all users are synced, optionally POSTs to a server endpoint
to trigger an Astro rebuild + rsync.
"""
from __future__ import annotations
import json
import logging
import urllib.error
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
import click
_TOKEN_FILE = "strava_token.json"
_CREDS_FILE = "strava_credentials.json"
_SYNC_FILE = "_strava_sync.json"
log = logging.getLogger("bincio.sync_strava")
def _load_creds(user_dir: Path) -> tuple[str, str] | None:
"""Return (client_id, client_secret) from strava_credentials.json, or None."""
p = user_dir / _CREDS_FILE
if not p.exists():
return None
try:
d = json.loads(p.read_text(encoding="utf-8"))
cid = str(d.get("client_id", "")).strip()
csec = str(d.get("client_secret", "")).strip()
if cid and csec:
return cid, csec
except Exception:
pass
return None
def sync_user(user_dir: Path) -> tuple[int, int]:
"""Sync one user's Strava activities.
Returns (new_count, error_count). Skips silently if no credentials.
"""
from bincio.extract.strava_api import ensure_fresh, fetch_activities, fetch_streams, StravaError
from bincio.extract.metrics import compute
from bincio.extract.writer import build_summary, make_activity_id, write_activity, write_index
from bincio.import_.strava import _strava_to_parsed, _patch_from_summary
from bincio.render.merge import merge_all
handle = user_dir.name
creds = _load_creds(user_dir)
if creds is None:
log.debug("sync[%s]: no strava_credentials.json — skipped", handle)
return 0, 0
client_id, client_secret = creds
try:
token = ensure_fresh(user_dir, client_id, client_secret)
except StravaError as exc:
log.error("sync[%s]: token refresh failed: %s", handle, exc)
return 0, 1
access_token = token["access_token"]
# Load incremental sync state
sync_path = user_dir / _SYNC_FILE
sync_state: dict = (
json.loads(sync_path.read_text(encoding="utf-8"))
if sync_path.exists() else {}
)
imported_ids: set[str] = set(sync_state.get("imported_ids", []))
after_ts: int | None = None
if sync_state.get("last_sync"):
last = datetime.fromisoformat(sync_state["last_sync"])
# 1-hour overlap to catch activities saved late to Strava
after_ts = int((last - timedelta(hours=1)).timestamp())
try:
all_acts = fetch_activities(access_token, after=after_ts)
except StravaError as exc:
log.error("sync[%s]: fetch_activities failed: %s", handle, exc)
return 0, 1
new_acts = [a for a in all_acts if str(a["id"]) not in imported_ids]
log.info(
"sync[%s]: %d new, %d already imported",
handle, len(new_acts), len(all_acts) - len(new_acts),
)
if not new_acts:
return 0, 0
# Load existing index so we can update it in place
index_path = user_dir / "index.json"
if index_path.exists():
index_data = json.loads(index_path.read_text(encoding="utf-8"))
else:
index_data = {"owner": {"handle": handle}, "activities": []}
owner = index_data.get("owner", {})
summaries: dict[str, dict] = {s["id"]: s for s in index_data.get("activities", [])}
imported = 0
errors = 0
for act in new_acts:
strava_id = str(act["id"])
try:
try:
streams = fetch_streams(access_token, int(strava_id))
except StravaError as exc:
if "404" in str(exc):
# Activity exists in list but has no accessible streams (old/deleted GPS).
# Still import it using summary-only stats via _patch_from_summary.
streams = {}
else:
raise
# strava_api.fetch_streams returns {type: {"data": [...], ...}};
# _strava_to_parsed (from import_/strava.py) expects {type: [...]}
flat_streams = {
k: v["data"] for k, v in streams.items()
if isinstance(v, dict) and "data" in v
}
parsed = _strava_to_parsed(act, flat_streams)
metrics = compute(parsed)
metrics = _patch_from_summary(metrics, act)
act_id = make_activity_id(parsed)
# Respect Strava visibility: only_me → unlisted
visibility = act.get("visibility") or ""
privacy = "unlisted" if (act.get("private") or visibility == "only_me") else "public"
write_activity(parsed, metrics, user_dir, privacy=privacy)
summaries[act_id] = build_summary(parsed, metrics, act_id, privacy)
imported_ids.add(strava_id)
imported += 1
except Exception as exc:
log.error("sync[%s]: activity %s failed: %s", handle, strava_id, exc)
errors += 1
# Persist index and sync checkpoint
write_index(list(summaries.values()), user_dir, owner)
sync_state["imported_ids"] = sorted(imported_ids)
sync_state["last_sync"] = datetime.now(timezone.utc).isoformat()
sync_path.write_text(json.dumps(sync_state, indent=2), encoding="utf-8")
# Merge sidecars so _merged/ reflects any edits
merge_all(user_dir)
log.info("sync[%s]: done — %d imported, %d errors", handle, imported, errors)
return imported, errors
def sync_all(root_data_dir: Path) -> dict[str, tuple[int, int]]:
"""Sync all users that have a strava_token.json. Returns {handle: (new, errors)}."""
results: dict[str, tuple[int, int]] = {}
token_files = sorted(root_data_dir.glob("*/strava_token.json"))
if not token_files:
log.info("sync_all: no users with strava_token.json found in %s", root_data_dir)
return results
log.info("sync_all: %d user(s) with Strava token", len(token_files))
for tf in token_files:
user_dir = tf.parent
handle = user_dir.name
try:
results[handle] = sync_user(user_dir)
except Exception as exc:
log.exception("sync_all[%s]: unexpected error: %s", handle, exc)
results[handle] = (0, -1)
return results
def _post_rebuild(url: str, secret: str | None) -> None:
headers: dict[str, str] = {"Content-Type": "application/json"}
if secret:
headers["X-Sync-Secret"] = secret
req = urllib.request.Request(url, data=b"{}", headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
log.info("rebuild triggered: HTTP %d", resp.status)
except urllib.error.HTTPError as exc:
log.error("rebuild trigger failed: HTTP %d %s", exc.code, exc.read().decode()[:100])
except Exception as exc:
log.error("rebuild trigger failed: %s", exc)
@click.command("sync-strava")
@click.option("--data-dir", "data_dir_str", required=True,
help="Root data dir (parent of all user dirs, e.g. /var/bincio/data).")
@click.option("--user", "only_user", default=None,
help="Sync only this handle instead of all users.")
@click.option("--rebuild-url", default=None, envvar="BINCIO_REBUILD_URL",
help="POST here after a successful sync to trigger a site rebuild.")
@click.option("--rebuild-secret", default=None, envvar="BINCIO_SYNC_SECRET",
help="Value sent as X-Sync-Secret header to the rebuild endpoint.")
def sync_strava_cmd(
data_dir_str: str,
only_user: str | None,
rebuild_url: str | None,
rebuild_secret: str | None,
) -> None:
"""Headless Strava sync for all users (designed for systemd timer).
Discovers every user directory that has both strava_token.json and
strava_credentials.json, syncs new activities, and optionally triggers
a site rebuild via an HTTP POST.
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
root = Path(data_dir_str).expanduser().resolve()
if not root.is_dir():
raise click.ClickException(f"Data dir not found: {root}")
if only_user:
user_dir = root / only_user
if not user_dir.is_dir():
raise click.ClickException(f"User dir not found: {user_dir}")
new_count, err_count = sync_user(user_dir)
click.echo(f"{only_user}: {new_count} imported, {err_count} errors")
total_new = new_count
else:
results = sync_all(root)
total_new = sum(n for n, _ in results.values())
total_err = sum(e for _, e in results.values())
click.echo(
f"Sync complete: {len(results)} users, "
f"{total_new} new activities, {total_err} errors"
)
if total_new > 0 and rebuild_url:
_post_rebuild(rebuild_url, rebuild_secret)