"""Facade for writing a parsed or Strava-sourced activity into a BAS data store. Callers (edit/ops.py) import from here instead of reaching into extract.metrics, extract.writer, and extract.strava_api individually. If the internal structure of the extract package changes, only this file needs updating. """ from __future__ import annotations import json from pathlib import Path from typing import Any, Optional from bincio.extract.models import ParsedActivity def ingest_parsed( parsed: ParsedActivity, data_dir: Path, privacy: str = "public", rdp_epsilon: float = 0.0001, ) -> str: """Compute metrics, write activity files, and update index.json. Args: parsed: Activity produced by any parser or Strava converter. data_dir: Per-user output directory (contains activities/, index.json). privacy: BAS privacy level — "public", "no_gps", or "private". rdp_epsilon: RDP simplification threshold in degrees. Returns: The BAS activity ID of the written activity. Raises: FileExistsError: If an activity with the same ID already exists. """ from bincio.extract.metrics import compute from bincio.extract.writer import ( build_summary, make_activity_id, write_activity, write_index, ) activity_id = make_activity_id(parsed) if (data_dir / "activities" / f"{activity_id}.json").exists(): raise FileExistsError(f"Activity already exists: {activity_id}") metrics = compute(parsed) write_activity(parsed, metrics, data_dir, privacy=privacy, rdp_epsilon=rdp_epsilon) summary = build_summary(parsed, metrics, activity_id, privacy) index_path = data_dir / "index.json" if index_path.exists(): index_data = json.loads(index_path.read_text(encoding="utf-8")) else: index_data = {"owner": {"handle": "unknown"}, "activities": []} owner = index_data.get("owner", {}) summaries: dict[str, Any] = {s["id"]: s for s in index_data.get("activities", [])} summaries[activity_id] = summary write_index(list(summaries.values()), data_dir, owner) return activity_id def strava_sync( data_dir: Path, client_id: str, client_secret: str, originals_dir: Optional[Path] = None, ) -> dict[str, Any]: """Fetch new Strava activities and ingest them into data_dir. Args: data_dir: Per-user data directory. client_id: Strava OAuth client ID. client_secret: Strava OAuth client secret. Returns: Dict with keys: ok, imported, skipped, error_count, errors. Raises: RuntimeError: If Strava credentials are missing or API calls fail. """ import time from bincio.extract.strava_api import ( StravaError, ensure_fresh, fetch_activities, fetch_streams, save_token, strava_meta_to_partial, strava_to_parsed, ) from bincio.extract.writer import make_activity_id if not client_id or not client_secret: raise RuntimeError("Strava not configured (missing client_id or client_secret)") try: token = ensure_fresh(data_dir, client_id, client_secret) except StravaError as e: raise RuntimeError(str(e)) from e after: Optional[int] = token.get("last_sync_at") try: activities = fetch_activities(token["access_token"], after=after) except StravaError as e: raise RuntimeError(str(e)) from e imported = 0 skipped = 0 errors: list[str] = [] for meta in activities: try: activity_id = make_activity_id(strava_meta_to_partial(meta)) if (data_dir / "activities" / f"{activity_id}.json").exists(): skipped += 1 continue streams = fetch_streams(token["access_token"], meta["id"]) if originals_dir is not None: orig_path = originals_dir / f"{activity_id}.json" orig_path.write_text( json.dumps({"meta": meta, "streams": streams}, indent=2), encoding="utf-8", ) parsed = strava_to_parsed(meta, streams) ingest_parsed(parsed, data_dir, privacy="public", rdp_epsilon=0.0001) imported += 1 except Exception as exc: errors.append(f"{meta.get('id')}: {type(exc).__name__}") token["last_sync_at"] = int(time.time()) save_token(data_dir, token) return { "ok": True, "imported": imported, "skipped": skipped, "error_count": len(errors), "errors": errors[:5], }