refactor: extract/ingest facade, merge_one, deduplicate ops constants

- Add bincio/extract/ingest.py as a facade over the extract internals (ingest_parsed, strava_sync), reducing coupling from 6+ imports to one
  - Add merge_one() to merge.py — fast single-activity path for interactive edits (rewrites one file + index, skips full directory rebuild)
  - Rewrite edit/ops.py to delegate to the new facade; fix broken run_strava_sync return (was referencing undefined locals)
  - Remove duplicated SPORTS, STAT_PANELS, VALID_ACTIVITY_ID from edit/server.py — now imported from ops.py
This commit is contained in:
Davide Scaini
2026-04-09 12:03:06 +02:00
parent 0223d468c9
commit 7dcb1e6dd0
5 changed files with 229 additions and 77 deletions
+10 -68
View File
@@ -7,12 +7,15 @@ No FastAPI, no globals — all context is passed as explicit arguments.
from __future__ import annotations from __future__ import annotations
import json import json
import time import re
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
# ── Shared constants (imported by edit/server.py and serve/server.py) ─────────
SPORTS = ["cycling", "running", "hiking", "walking", "swimming", "skiing", "other"] SPORTS = ["cycling", "running", "hiking", "walking", "swimming", "skiing", "other"]
STAT_PANELS = ["elevation", "speed", "heart_rate", "cadence", "power"] STAT_PANELS = ["elevation", "speed", "heart_rate", "cadence", "power"]
VALID_ACTIVITY_ID = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9\-]{0,250}$')
def apply_sidecar_edit(activity_id: str, payload: dict[str, Any], data_dir: Path) -> None: def apply_sidecar_edit(activity_id: str, payload: dict[str, Any], data_dir: Path) -> None:
@@ -51,8 +54,8 @@ def apply_sidecar_edit(activity_id: str, payload: dict[str, Any], data_dir: Path
sidecar_path.write_text(content, encoding="utf-8") sidecar_path.write_text(content, encoding="utf-8")
from bincio.render.merge import merge_all from bincio.render.merge import merge_one
merge_all(data_dir) merge_one(data_dir, activity_id)
def run_strava_sync(data_dir: Path, client_id: str, client_secret: str) -> dict[str, Any]: def run_strava_sync(data_dir: Path, client_id: str, client_secret: str) -> dict[str, Any]:
@@ -69,72 +72,11 @@ def run_strava_sync(data_dir: Path, client_id: str, client_secret: str) -> dict[
Raises: Raises:
RuntimeError: If Strava credentials are missing or API calls fail. RuntimeError: If Strava credentials are missing or API calls fail.
""" """
if not client_id or not client_secret: from bincio.extract.ingest import strava_sync as _strava_sync
raise RuntimeError("Strava not configured (missing client_id or client_secret)")
from bincio.extract.strava_api import (
StravaError,
ensure_fresh,
fetch_activities,
fetch_streams,
save_token,
strava_meta_to_partial,
strava_to_parsed,
)
try:
token = ensure_fresh(data_dir, client_id, client_secret)
except StravaError as e:
raise RuntimeError(str(e)) from e
after: int | None = token.get("last_sync_at")
try:
activities = fetch_activities(token["access_token"], after=after)
except StravaError as e:
raise RuntimeError(str(e)) from e
from bincio.extract.metrics import compute
from bincio.extract.writer import build_summary, make_activity_id, write_activity, write_index
from bincio.render.merge import merge_all from bincio.render.merge import merge_all
index_path = data_dir / "index.json" result = _strava_sync(data_dir, client_id, client_secret)
if index_path.exists(): if result["imported"]:
index_data = json.loads(index_path.read_text(encoding="utf-8"))
else:
index_data = {"owner": {"handle": "unknown"}, "activities": []}
owner = index_data.get("owner", {})
summaries: dict[str, dict] = {s["id"]: s for s in index_data.get("activities", [])}
imported = 0
skipped = 0
errors: list[str] = []
for meta in activities:
try:
activity_id = make_activity_id(strava_meta_to_partial(meta))
if (data_dir / "activities" / f"{activity_id}.json").exists():
skipped += 1
continue
streams = fetch_streams(token["access_token"], meta["id"])
parsed = strava_to_parsed(meta, streams)
metrics = compute(parsed)
write_activity(parsed, metrics, data_dir, privacy="public", rdp_epsilon=0.0001)
summaries[activity_id] = build_summary(parsed, metrics, activity_id, "public")
imported += 1
except Exception as exc:
errors.append(f"{meta.get('id')}: {type(exc).__name__}")
if imported:
write_index(list(summaries.values()), data_dir, owner)
merge_all(data_dir) merge_all(data_dir)
token["last_sync_at"] = int(time.time()) return result
save_token(data_dir, token)
return {
"ok": True,
"imported": imported,
"skipped": skipped,
"error_count": len(errors),
"errors": errors[:5],
}
+3 -8
View File
@@ -3,7 +3,6 @@
from __future__ import annotations from __future__ import annotations
import json import json
import re
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -12,6 +11,8 @@ from fastapi import FastAPI, File, HTTPException, Request, UploadFile
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from bincio.edit.ops import SPORTS, STAT_PANELS, VALID_ACTIVITY_ID
# Populated by the CLI before uvicorn starts # Populated by the CLI before uvicorn starts
data_dir: Path | None = None data_dir: Path | None = None
site_url: str = "http://localhost:4321" site_url: str = "http://localhost:4321"
@@ -28,18 +29,12 @@ app.add_middleware(
allow_headers=["Content-Type"], allow_headers=["Content-Type"],
) )
_VALID_ACTIVITY_ID = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9\-]{0,250}$')
def _check_id(activity_id: str) -> str: def _check_id(activity_id: str) -> str:
"""Reject activity IDs that contain path traversal sequences.""" """Reject activity IDs that contain path traversal sequences."""
if not _VALID_ACTIVITY_ID.match(activity_id): if not VALID_ACTIVITY_ID.match(activity_id):
raise HTTPException(400, "Invalid activity ID") raise HTTPException(400, "Invalid activity ID")
return activity_id return activity_id
SPORTS = ["cycling", "running", "hiking", "walking", "swimming", "skiing", "other"]
STAT_PANELS = ["elevation", "speed", "heart_rate", "cadence", "power"]
# ── HTML UI ─────────────────────────────────────────────────────────────────── # ── HTML UI ───────────────────────────────────────────────────────────────────
+137
View File
@@ -0,0 +1,137 @@
"""Facade for writing a parsed or Strava-sourced activity into a BAS data store.
Callers (edit/ops.py) import from here instead of reaching into extract.metrics,
extract.writer, and extract.strava_api individually. If the internal structure
of the extract package changes, only this file needs updating.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Optional
from bincio.extract.models import ParsedActivity
def ingest_parsed(
parsed: ParsedActivity,
data_dir: Path,
privacy: str = "public",
rdp_epsilon: float = 0.0001,
) -> str:
"""Compute metrics, write activity files, and update index.json.
Args:
parsed: Activity produced by any parser or Strava converter.
data_dir: Per-user output directory (contains activities/, index.json).
privacy: BAS privacy level — "public", "no_gps", or "private".
rdp_epsilon: RDP simplification threshold in degrees.
Returns:
The BAS activity ID of the written activity.
Raises:
FileExistsError: If an activity with the same ID already exists.
"""
from bincio.extract.metrics import compute
from bincio.extract.writer import (
build_summary,
make_activity_id,
write_activity,
write_index,
)
activity_id = make_activity_id(parsed)
if (data_dir / "activities" / f"{activity_id}.json").exists():
raise FileExistsError(f"Activity already exists: {activity_id}")
metrics = compute(parsed)
write_activity(parsed, metrics, data_dir, privacy=privacy, rdp_epsilon=rdp_epsilon)
summary = build_summary(parsed, metrics, activity_id, privacy)
index_path = data_dir / "index.json"
if index_path.exists():
index_data = json.loads(index_path.read_text(encoding="utf-8"))
else:
index_data = {"owner": {"handle": "unknown"}, "activities": []}
owner = index_data.get("owner", {})
summaries: dict[str, Any] = {s["id"]: s for s in index_data.get("activities", [])}
summaries[activity_id] = summary
write_index(list(summaries.values()), data_dir, owner)
return activity_id
def strava_sync(
data_dir: Path,
client_id: str,
client_secret: str,
) -> dict[str, Any]:
"""Fetch new Strava activities and ingest them into data_dir.
Args:
data_dir: Per-user data directory.
client_id: Strava OAuth client ID.
client_secret: Strava OAuth client secret.
Returns:
Dict with keys: ok, imported, skipped, error_count, errors.
Raises:
RuntimeError: If Strava credentials are missing or API calls fail.
"""
import time
from bincio.extract.strava_api import (
StravaError,
ensure_fresh,
fetch_activities,
fetch_streams,
save_token,
strava_meta_to_partial,
strava_to_parsed,
)
from bincio.extract.writer import make_activity_id
if not client_id or not client_secret:
raise RuntimeError("Strava not configured (missing client_id or client_secret)")
try:
token = ensure_fresh(data_dir, client_id, client_secret)
except StravaError as e:
raise RuntimeError(str(e)) from e
after: Optional[int] = token.get("last_sync_at")
try:
activities = fetch_activities(token["access_token"], after=after)
except StravaError as e:
raise RuntimeError(str(e)) from e
imported = 0
skipped = 0
errors: list[str] = []
for meta in activities:
try:
activity_id = make_activity_id(strava_meta_to_partial(meta))
if (data_dir / "activities" / f"{activity_id}.json").exists():
skipped += 1
continue
streams = fetch_streams(token["access_token"], meta["id"])
parsed = strava_to_parsed(meta, streams)
ingest_parsed(parsed, data_dir, privacy="public", rdp_epsilon=0.0001)
imported += 1
except Exception as exc:
errors.append(f"{meta.get('id')}: {type(exc).__name__}")
token["last_sync_at"] = int(time.time())
save_token(data_dir, token)
return {
"ok": True,
"imported": imported,
"skipped": skipped,
"error_count": len(errors),
"errors": errors[:5],
}
+78
View File
@@ -74,6 +74,84 @@ def _apply_sidecar_summary(summary: dict, fm: dict) -> dict:
return s return s
def merge_one(data_dir: Path, activity_id: str) -> None:
"""Apply (or remove) sidecar overrides for a single activity.
Updates data_dir/_merged/activities/{id}.json and rewrites
_merged/index.json. Faster than merge_all() for interactive edits
because it touches only one activity file instead of rebuilding the
whole _merged/activities/ directory.
Use merge_all() for bulk operations (first run, Strava sync, etc.).
"""
edits_dir = data_dir / "edits"
acts_dir = data_dir / "activities"
merged_dir = data_dir / "_merged"
merged_acts = merged_dir / "activities"
merged_acts.mkdir(parents=True, exist_ok=True)
src = acts_dir / f"{activity_id}.json"
if not src.exists():
return
dest = merged_acts / f"{activity_id}.json"
# Determine if a sidecar or image list applies to this activity
sidecar_path = edits_dir / f"{activity_id}.md" if edits_dir.exists() else None
images_dir = edits_dir / "images" / activity_id if edits_dir.exists() else None
has_sidecar = sidecar_path is not None and sidecar_path.exists()
image_files: list[str] = []
if images_dir and images_dir.exists():
image_files = sorted(
p.name for p in images_dir.iterdir()
if p.is_file() and not p.name.startswith(".")
)
needs_merge = has_sidecar or bool(image_files)
# Remove the old dest (symlink or file) before writing the new one
if dest.exists() or dest.is_symlink():
dest.unlink()
if needs_merge:
detail = json.loads(src.read_text(encoding="utf-8"))
if has_sidecar:
fm, body = parse_sidecar(sidecar_path) # type: ignore[arg-type]
detail = apply_sidecar(detail, fm, body)
if image_files:
detail["custom"] = dict(detail.get("custom") or {})
detail["custom"]["images"] = image_files
dest.write_text(json.dumps(detail, indent=2, ensure_ascii=False))
else:
dest.symlink_to(src.resolve())
# Rewrite index — load the full sidecar map so all summaries stay consistent
index_path = data_dir / "index.json"
if not index_path.exists():
return
all_sidecars: dict[str, tuple[dict, str]] = {}
if edits_dir and edits_dir.exists():
for md_path in edits_dir.glob("*.md"):
all_sidecars[md_path.stem] = parse_sidecar(md_path)
index = json.loads(index_path.read_text(encoding="utf-8"))
activities = []
for s in index.get("activities", []):
aid = s.get("id", "")
if aid in all_sidecars:
fm, _ = all_sidecars[aid]
s = _apply_sidecar_summary(s, fm)
activities.append(s)
activities = [a for a in activities if a.get("privacy") != "private"]
activities.sort(key=lambda a: a.get("started_at", ""), reverse=True)
activities.sort(key=lambda a: 0 if a.get("custom", {}).get("highlight") else 1)
index["activities"] = activities
(merged_dir / "index.json").write_text(json.dumps(index, indent=2, ensure_ascii=False))
def merge_all(data_dir: Path) -> int: def merge_all(data_dir: Path) -> int:
"""Build data_dir/_merged/ with all sidecar overrides applied. """Build data_dir/_merged/ with all sidecar overrides applied.
+1 -1
View File
@@ -70,7 +70,7 @@ app.add_middleware(
) )
_VALID_HANDLE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,29}$') _VALID_HANDLE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,29}$')
_VALID_ACTIVITY_ID = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9\-]{0,250}$') from bincio.edit.ops import VALID_ACTIVITY_ID as _VALID_ACTIVITY_ID
_SESSION_COOKIE = "bincio_session" _SESSION_COOKIE = "bincio_session"
_COOKIE_MAX_AGE = 30 * 86400 # 30 days _COOKIE_MAX_AGE = 30 * 86400 # 30 days