Files
bincio-activity/bincio/extract/ingest.py
T
Davide Scaini 7dcb1e6dd0 refactor: extract/ingest facade, merge_one, deduplicate ops constants
- Add bincio/extract/ingest.py as a facade over the extract internals (ingest_parsed, strava_sync), reducing coupling from 6+ imports to one
  - Add merge_one() to merge.py — fast single-activity path for interactive edits (rewrites one file + index, skips full directory rebuild)
  - Rewrite edit/ops.py to delegate to the new facade; fix broken run_strava_sync return (was referencing undefined locals)
  - Remove duplicated SPORTS, STAT_PANELS, VALID_ACTIVITY_ID from edit/server.py — now imported from ops.py
2026-04-09 12:05:01 +02:00

138 lines
4.2 KiB
Python

"""Facade for writing a parsed or Strava-sourced activity into a BAS data store.
Callers (edit/ops.py) import from here instead of reaching into extract.metrics,
extract.writer, and extract.strava_api individually. If the internal structure
of the extract package changes, only this file needs updating.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Optional
from bincio.extract.models import ParsedActivity
def ingest_parsed(
parsed: ParsedActivity,
data_dir: Path,
privacy: str = "public",
rdp_epsilon: float = 0.0001,
) -> str:
"""Compute metrics, write activity files, and update index.json.
Args:
parsed: Activity produced by any parser or Strava converter.
data_dir: Per-user output directory (contains activities/, index.json).
privacy: BAS privacy level — "public", "no_gps", or "private".
rdp_epsilon: RDP simplification threshold in degrees.
Returns:
The BAS activity ID of the written activity.
Raises:
FileExistsError: If an activity with the same ID already exists.
"""
from bincio.extract.metrics import compute
from bincio.extract.writer import (
build_summary,
make_activity_id,
write_activity,
write_index,
)
activity_id = make_activity_id(parsed)
if (data_dir / "activities" / f"{activity_id}.json").exists():
raise FileExistsError(f"Activity already exists: {activity_id}")
metrics = compute(parsed)
write_activity(parsed, metrics, data_dir, privacy=privacy, rdp_epsilon=rdp_epsilon)
summary = build_summary(parsed, metrics, activity_id, privacy)
index_path = data_dir / "index.json"
if index_path.exists():
index_data = json.loads(index_path.read_text(encoding="utf-8"))
else:
index_data = {"owner": {"handle": "unknown"}, "activities": []}
owner = index_data.get("owner", {})
summaries: dict[str, Any] = {s["id"]: s for s in index_data.get("activities", [])}
summaries[activity_id] = summary
write_index(list(summaries.values()), data_dir, owner)
return activity_id
def strava_sync(
data_dir: Path,
client_id: str,
client_secret: str,
) -> dict[str, Any]:
"""Fetch new Strava activities and ingest them into data_dir.
Args:
data_dir: Per-user data directory.
client_id: Strava OAuth client ID.
client_secret: Strava OAuth client secret.
Returns:
Dict with keys: ok, imported, skipped, error_count, errors.
Raises:
RuntimeError: If Strava credentials are missing or API calls fail.
"""
import time
from bincio.extract.strava_api import (
StravaError,
ensure_fresh,
fetch_activities,
fetch_streams,
save_token,
strava_meta_to_partial,
strava_to_parsed,
)
from bincio.extract.writer import make_activity_id
if not client_id or not client_secret:
raise RuntimeError("Strava not configured (missing client_id or client_secret)")
try:
token = ensure_fresh(data_dir, client_id, client_secret)
except StravaError as e:
raise RuntimeError(str(e)) from e
after: Optional[int] = token.get("last_sync_at")
try:
activities = fetch_activities(token["access_token"], after=after)
except StravaError as e:
raise RuntimeError(str(e)) from e
imported = 0
skipped = 0
errors: list[str] = []
for meta in activities:
try:
activity_id = make_activity_id(strava_meta_to_partial(meta))
if (data_dir / "activities" / f"{activity_id}.json").exists():
skipped += 1
continue
streams = fetch_streams(token["access_token"], meta["id"])
parsed = strava_to_parsed(meta, streams)
ingest_parsed(parsed, data_dir, privacy="public", rdp_epsilon=0.0001)
imported += 1
except Exception as exc:
errors.append(f"{meta.get('id')}: {type(exc).__name__}")
token["last_sync_at"] = int(time.time())
save_token(data_dir, token)
return {
"ok": True,
"imported": imported,
"skipped": skipped,
"error_count": len(errors),
"errors": errors[:5],
}