baf20b51ba
Extract pipeline now computes two VAM metrics per activity (cycling, running, hiking, walking): - climbing_vam_mh: VAM on ascending segments only, using 30 s forward lookahead to classify climbing vs. flat/descent (stored in detail JSON) - vam_curve: [[duration_s, vam_mh], ...] best VAM per standard duration (60 s – 1 h), sliding window on 30 s smoothed elevation, only windows with ≥ 10 m net gain count (stored in summary + detail) Athlete JSON aggregates vam_curve across all activities (all_time, last_365d, last_90d), same structure as power_curve. Frontend: - ActivityDetail shows "Climbing VAM" stat (grouped with elevation) - AthleteView adds a "VAM Curve" tab that appears only when the athlete has climbing data; renders VamChart (new component, mirrors MmpChart) vam_curve stripped from combined global feed; kept in user year shards for season-based on-the-fly aggregation in VamChart. Requires bincio reextract to backfill existing activities.
507 lines
20 KiB
Python
507 lines
20 KiB
Python
"""Apply sidecar .md edits to BAS JSON files.
|
||
|
||
Produces data_dir/_merged/ — a mirror of data_dir where:
|
||
- Files without sidecars are symlinked to the originals (cheap, preserves extracted data)
|
||
- Files with sidecars are written as merged copies
|
||
- index.json is rewritten with private filtering + highlight sort
|
||
|
||
This keeps data_dir/activities/*.json pristine (re-running extract never clobbers
|
||
user edits, and removing a sidecar always reverts fully on the next render).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
import shutil
|
||
import threading
|
||
from pathlib import Path
|
||
|
||
import yaml
|
||
|
||
# Per-user-directory lock so concurrent upload requests and the dev file-watcher
|
||
# cannot run merge_all simultaneously on the same directory.
|
||
|
||
|
||
def _fix_surrogates(obj: object) -> object:
|
||
"""Recursively replace surrogate pairs in strings with proper Unicode code points.
|
||
|
||
Surrogate pairs (U+D800–U+DFFF) are valid in Python str but not in UTF-8.
|
||
They typically arise when emoji from UTF-16-encoded sources (Strava, some FIT
|
||
devices) are decoded incorrectly. encode/decode via utf-16 with surrogatepass
|
||
reconstructs the intended characters.
|
||
"""
|
||
if isinstance(obj, str):
|
||
try:
|
||
obj.encode("utf-8")
|
||
return obj
|
||
except UnicodeEncodeError:
|
||
return obj.encode("utf-16", "surrogatepass").decode("utf-16")
|
||
if isinstance(obj, dict):
|
||
return {k: _fix_surrogates(v) for k, v in obj.items()}
|
||
if isinstance(obj, list):
|
||
return [_fix_surrogates(v) for v in obj]
|
||
return obj
|
||
|
||
|
||
def _dumps(obj: object) -> str:
|
||
return json.dumps(_fix_surrogates(obj), indent=2, ensure_ascii=False)
|
||
_merge_locks: dict[str, threading.Lock] = {}
|
||
_merge_locks_mu = threading.Lock()
|
||
|
||
|
||
def _merge_lock(data_dir: Path) -> threading.Lock:
|
||
key = str(data_dir.resolve())
|
||
with _merge_locks_mu:
|
||
if key not in _merge_locks:
|
||
_merge_locks[key] = threading.Lock()
|
||
return _merge_locks[key]
|
||
|
||
|
||
def parse_sidecar(path: Path) -> tuple[dict, str]:
|
||
"""Return (frontmatter_dict, markdown_body) from a sidecar .md file."""
|
||
text = path.read_text(encoding="utf-8")
|
||
if text.startswith("---"):
|
||
parts = re.split(r"^---[ \t]*$", text, maxsplit=2, flags=re.MULTILINE)
|
||
if len(parts) >= 3:
|
||
fm = yaml.safe_load(parts[1]) or {}
|
||
return fm, parts[2].strip()
|
||
return {}, text.strip()
|
||
|
||
|
||
def apply_sidecar(detail: dict, fm: dict, body: str, *, download_disabled_default: bool = False) -> dict:
|
||
"""Apply sidecar overrides to a detail JSON dict. Returns a modified copy."""
|
||
from bincio.extract.writer import _infer_indoor_title
|
||
d = dict(detail)
|
||
d.setdefault("custom", {})
|
||
d["custom"] = dict(d["custom"]) # don't mutate original
|
||
|
||
if "title" in fm:
|
||
d["title"] = str(fm["title"])
|
||
if "sport" in fm:
|
||
d["sport"] = str(fm["sport"])
|
||
if "sub_sport" in fm:
|
||
d["sub_sport"] = str(fm["sub_sport"]) if fm["sub_sport"] else None
|
||
# Infer indoor from title when sub_sport is still absent after sidecar
|
||
if not d.get("sub_sport") and _infer_indoor_title(d.get("title") or ""):
|
||
d["sub_sport"] = "indoor"
|
||
if "gear" in fm:
|
||
d["gear"] = str(fm["gear"]) if fm["gear"] else d.get("gear")
|
||
if body:
|
||
d["description"] = body
|
||
elif "description" in fm:
|
||
d["description"] = str(fm["description"])
|
||
if "highlight" in fm:
|
||
d["custom"]["highlight"] = bool(fm["highlight"])
|
||
if "private" in fm:
|
||
d["privacy"] = "unlisted" if fm["private"] else detail.get("privacy", "public")
|
||
if "hide_stats" in fm:
|
||
d["custom"]["hide_stats"] = [str(s) for s in (fm["hide_stats"] or [])]
|
||
dd = fm.get("download_disabled") # True, False, or None (absent)
|
||
if dd is True:
|
||
d["download_disabled"] = True
|
||
elif dd is None and download_disabled_default:
|
||
d["download_disabled"] = True
|
||
# dd is False → explicit per-activity opt-in, leave unset
|
||
|
||
return d
|
||
|
||
|
||
def _apply_sidecar_summary(summary: dict, fm: dict) -> dict:
|
||
"""Apply sidecar overrides to an index summary entry."""
|
||
from bincio.extract.writer import _infer_indoor_title
|
||
s = dict(summary)
|
||
s.setdefault("custom", {})
|
||
s["custom"] = dict(s["custom"])
|
||
|
||
if "title" in fm:
|
||
s["title"] = str(fm["title"])
|
||
if "sport" in fm:
|
||
s["sport"] = str(fm["sport"])
|
||
if "sub_sport" in fm:
|
||
s["sub_sport"] = str(fm["sub_sport"]) if fm["sub_sport"] else None
|
||
if "highlight" in fm:
|
||
s["custom"]["highlight"] = bool(fm["highlight"])
|
||
if "private" in fm:
|
||
s["privacy"] = "unlisted" if fm["private"] else summary.get("privacy", "public")
|
||
# Infer indoor from title when sub_sport is still absent
|
||
if not s.get("sub_sport") and _infer_indoor_title(s.get("title") or ""):
|
||
s["sub_sport"] = "indoor"
|
||
|
||
return s
|
||
|
||
|
||
def merge_one(data_dir: Path, activity_id: str) -> None:
|
||
"""Apply (or remove) sidecar overrides for a single activity.
|
||
|
||
Updates data_dir/_merged/activities/{id}.json and rewrites
|
||
_merged/index.json. Faster than merge_all() for interactive edits
|
||
because it touches only one activity file instead of rebuilding the
|
||
whole _merged/activities/ directory.
|
||
|
||
Use merge_all() for bulk operations (first run, Strava sync, etc.).
|
||
"""
|
||
with _merge_lock(data_dir):
|
||
_merge_one_locked(data_dir, activity_id)
|
||
|
||
|
||
def _merge_one_locked(data_dir: Path, activity_id: str) -> None:
|
||
edits_dir = data_dir / "edits"
|
||
acts_dir = data_dir / "activities"
|
||
merged_dir = data_dir / "_merged"
|
||
merged_acts = merged_dir / "activities"
|
||
merged_acts.mkdir(parents=True, exist_ok=True)
|
||
|
||
src = acts_dir / f"{activity_id}.json"
|
||
if not src.exists():
|
||
return
|
||
|
||
dest = merged_acts / f"{activity_id}.json"
|
||
|
||
# Determine if a sidecar or image list applies to this activity
|
||
sidecar_path = edits_dir / f"{activity_id}.md" if edits_dir.exists() else None
|
||
images_dir = edits_dir / "images" / activity_id if edits_dir.exists() else None
|
||
has_sidecar = sidecar_path is not None and sidecar_path.exists()
|
||
image_files: list[str] = []
|
||
if images_dir and images_dir.exists():
|
||
image_files = sorted(
|
||
p.name for p in images_dir.iterdir()
|
||
if p.is_file() and not p.name.startswith(".")
|
||
)
|
||
|
||
needs_merge = has_sidecar or bool(image_files)
|
||
# Also need a real file (not symlink) when title inference would change sub_sport
|
||
if not needs_merge and not has_sidecar:
|
||
from bincio.extract.writer import _infer_indoor_title
|
||
_peek = json.loads(src.read_text(encoding="utf-8"))
|
||
if not _peek.get("sub_sport") and _infer_indoor_title(_peek.get("title") or ""):
|
||
needs_merge = True
|
||
|
||
# Symlink the timeseries file (never merged — always points to the extract output)
|
||
ts_src = acts_dir / f"{activity_id}.timeseries.json"
|
||
ts_dest = merged_acts / f"{activity_id}.timeseries.json"
|
||
if ts_dest.exists() or ts_dest.is_symlink():
|
||
ts_dest.unlink()
|
||
if ts_src.exists():
|
||
ts_dest.symlink_to(ts_src.resolve())
|
||
|
||
# Remove the old dest (symlink or file) before writing the new one
|
||
if dest.exists() or dest.is_symlink():
|
||
dest.unlink()
|
||
|
||
if needs_merge:
|
||
detail = locals().get("_peek") or json.loads(src.read_text(encoding="utf-8"))
|
||
if has_sidecar:
|
||
fm, body = parse_sidecar(sidecar_path) # type: ignore[arg-type]
|
||
detail = apply_sidecar(detail, fm, body)
|
||
else:
|
||
# No sidecar — still apply title inference
|
||
detail = apply_sidecar(detail, {}, "")
|
||
if image_files:
|
||
detail["custom"] = dict(detail.get("custom") or {})
|
||
detail["custom"]["images"] = image_files
|
||
dest.write_text(_dumps(detail))
|
||
else:
|
||
dest.symlink_to(src.resolve())
|
||
|
||
# Rewrite index — load the full sidecar map so all summaries stay consistent
|
||
index_path = data_dir / "index.json"
|
||
if not index_path.exists():
|
||
return
|
||
|
||
all_sidecars: dict[str, tuple[dict, str]] = {}
|
||
if edits_dir and edits_dir.exists():
|
||
for md_path in edits_dir.glob("*.md"):
|
||
all_sidecars[md_path.stem] = parse_sidecar(md_path)
|
||
|
||
index = json.loads(index_path.read_text(encoding="utf-8"))
|
||
activities = []
|
||
for s in index.get("activities", []):
|
||
aid = s.get("id", "")
|
||
fm, _ = all_sidecars[aid] if aid in all_sidecars else ({}, "")
|
||
s = _apply_sidecar_summary(s, fm)
|
||
activities.append(s)
|
||
|
||
activities.sort(key=lambda a: a.get("started_at", ""), reverse=True)
|
||
activities.sort(key=lambda a: 0 if a.get("custom", {}).get("highlight") else 1)
|
||
|
||
_write_year_shards(merged_dir, activities, index)
|
||
|
||
|
||
def merge_all(data_dir: Path) -> int:
|
||
"""Build data_dir/_merged/ with all sidecar overrides applied.
|
||
|
||
Returns the number of sidecars found and applied.
|
||
"""
|
||
with _merge_lock(data_dir):
|
||
return _merge_all_locked(data_dir)
|
||
|
||
|
||
def _merge_all_locked(data_dir: Path) -> int:
|
||
edits_dir = data_dir / "edits"
|
||
acts_dir = data_dir / "activities"
|
||
merged_dir = data_dir / "_merged"
|
||
merged_acts = merged_dir / "activities"
|
||
|
||
_settings_path = data_dir / "_user_settings.json"
|
||
try:
|
||
_user_settings = json.loads(_settings_path.read_text(encoding="utf-8")) if _settings_path.exists() else {}
|
||
except (OSError, json.JSONDecodeError):
|
||
_user_settings = {}
|
||
_dl_default: bool = bool(_user_settings.get("download_disabled_default", False))
|
||
|
||
# Collect sidecars upfront
|
||
sidecars: dict[str, tuple[dict, str]] = {}
|
||
if edits_dir.exists():
|
||
for md_path in sorted(edits_dir.glob("*.md")):
|
||
sidecars[md_path.stem] = parse_sidecar(md_path)
|
||
|
||
# Collect image lists — activities with uploaded images get custom.images even
|
||
# if they have no sidecar text yet
|
||
image_lists: dict[str, list[str]] = {}
|
||
images_root = edits_dir / "images" if edits_dir.exists() else None
|
||
if images_root and images_root.exists():
|
||
for img_dir in sorted(images_root.iterdir()):
|
||
if img_dir.is_dir():
|
||
files = sorted(
|
||
p.name for p in img_dir.iterdir()
|
||
if p.is_file() and not p.name.startswith(".")
|
||
)
|
||
if files:
|
||
image_lists[img_dir.name] = files
|
||
|
||
to_merge = set(sidecars) | set(image_lists)
|
||
|
||
# Also include activities whose title implies indoor (no sidecar required)
|
||
_index_path = data_dir / "index.json"
|
||
_cached_index: dict | None = None
|
||
if _index_path.exists():
|
||
from bincio.extract.writer import _infer_indoor_title
|
||
_cached_index = json.loads(_index_path.read_text(encoding="utf-8"))
|
||
for _s in _cached_index.get("activities", []):
|
||
_aid = _s.get("id", "")
|
||
if _aid and not _s.get("sub_sport") and _infer_indoor_title(_s.get("title") or ""):
|
||
to_merge.add(_aid)
|
||
|
||
# Wipe and recreate _merged/activities/
|
||
shutil.rmtree(merged_acts, ignore_errors=True)
|
||
merged_acts.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Mirror activities/ — symlink unmodified, write merged copies for overridden
|
||
if acts_dir.exists():
|
||
for src in sorted(acts_dir.iterdir()):
|
||
if not src.is_file():
|
||
continue
|
||
dest = merged_acts / src.name
|
||
activity_id = src.stem
|
||
if src.suffix == ".json" and activity_id in to_merge:
|
||
detail = json.loads(src.read_text(encoding="utf-8"))
|
||
if activity_id in sidecars:
|
||
fm, body = sidecars[activity_id]
|
||
detail = apply_sidecar(detail, fm, body, download_disabled_default=_dl_default)
|
||
else:
|
||
detail = apply_sidecar(detail, {}, "", download_disabled_default=_dl_default)
|
||
if activity_id in image_lists:
|
||
detail["custom"] = dict(detail.get("custom") or {})
|
||
detail["custom"]["images"] = image_lists[activity_id]
|
||
dest.write_text(_dumps(detail))
|
||
else:
|
||
if not dest.exists() and not dest.is_symlink():
|
||
dest.symlink_to(src.resolve())
|
||
|
||
# Mirror edits/images/ → _merged/activities/images/ so the site can serve them
|
||
if images_root and images_root.exists():
|
||
merged_images = merged_acts / "images"
|
||
merged_images.mkdir(exist_ok=True)
|
||
for img_dir in images_root.iterdir():
|
||
if img_dir.is_dir():
|
||
dest_img = merged_images / img_dir.name
|
||
if not dest_img.exists():
|
||
dest_img.symlink_to(img_dir.resolve())
|
||
|
||
# Produce merged athlete.json — base from extract overlaid with edits/athlete.yaml
|
||
athlete_src = data_dir / "athlete.json"
|
||
athlete_dest = merged_dir / "athlete.json"
|
||
if athlete_dest.exists() or athlete_dest.is_symlink():
|
||
athlete_dest.unlink()
|
||
if athlete_src.exists():
|
||
athlete_edits_path = data_dir / "edits" / "athlete.yaml"
|
||
if athlete_edits_path.exists():
|
||
try:
|
||
import yaml as _yaml
|
||
edits = _yaml.safe_load(athlete_edits_path.read_text(encoding="utf-8")) or {}
|
||
except Exception:
|
||
edits = {}
|
||
else:
|
||
edits = {}
|
||
_ATHLETE_EDITABLE = {"max_hr", "ftp_w", "hr_zones", "power_zones", "seasons", "gear"}
|
||
if edits:
|
||
athlete_data = json.loads(athlete_src.read_text(encoding="utf-8"))
|
||
athlete_data.update({k: v for k, v in edits.items() if k in _ATHLETE_EDITABLE})
|
||
athlete_dest.write_text(_dumps(athlete_data))
|
||
else:
|
||
athlete_dest.symlink_to(athlete_src.resolve())
|
||
|
||
# Write merged index.json (private filtered, highlight sorted)
|
||
index_path = data_dir / "index.json"
|
||
if index_path.exists():
|
||
index = _cached_index or json.loads(index_path.read_text(encoding="utf-8"))
|
||
activities = []
|
||
for s in index.get("activities", []):
|
||
aid = s.get("id", "")
|
||
fm, _ = sidecars[aid] if aid in sidecars else ({}, "")
|
||
s = _apply_sidecar_summary(s, fm)
|
||
activities.append(s)
|
||
|
||
# "unlisted" (and legacy "private") activities are kept in the index so
|
||
# the owner can reach them by direct URL; the feed UI filters them out
|
||
# for non-owners client-side.
|
||
# Sort: newest first, then bring highlighted activities to the top.
|
||
activities.sort(key=lambda a: a.get("started_at", ""), reverse=True)
|
||
activities.sort(key=lambda a: 0 if a.get("custom", {}).get("highlight") else 1)
|
||
|
||
_write_year_shards(merged_dir, activities, index)
|
||
else:
|
||
# Remove any stale year shard files if the source index disappeared
|
||
for f in merged_dir.glob("index-*.json"):
|
||
f.unlink()
|
||
if (merged_dir / "index.json").exists():
|
||
(merged_dir / "index.json").unlink()
|
||
|
||
return len(sidecars)
|
||
|
||
|
||
# Fields only needed for athlete.json aggregation at extract time — they add
|
||
# bulk to every summary entry but are never read by the feed UI.
|
||
_FEED_STRIP = {"best_efforts", "best_climb_m", "source"}
|
||
|
||
|
||
def _write_year_shards(merged_dir: Path, activities: list[dict], index_meta: dict) -> None:
|
||
"""Split activities by year and write index-{year}.json shards.
|
||
|
||
Replaces merged_dir/index.json with a shard manifest so the feed can
|
||
load only the most-recent year on first paint and fetch older years lazily.
|
||
"""
|
||
from collections import defaultdict
|
||
|
||
# Remove stale year shard files from previous runs
|
||
for f in merged_dir.glob("index-*.json"):
|
||
f.unlink(missing_ok=True)
|
||
|
||
by_year: dict[str, list[dict]] = defaultdict(list)
|
||
for a in activities:
|
||
year = (a.get("started_at") or "")[:4] or "unknown"
|
||
# Strip aggregation-only fields to keep shard files small
|
||
slim = {k: v for k, v in a.items() if k not in _FEED_STRIP}
|
||
by_year[year].append(slim)
|
||
|
||
years = sorted(by_year.keys(), reverse=True) # newest first
|
||
shards = []
|
||
for year in years:
|
||
shard_doc = {
|
||
**{k: v for k, v in index_meta.items() if k not in ("activities", "shards")},
|
||
"shards": [],
|
||
"activities": by_year[year],
|
||
}
|
||
fname = f"index-{year}.json"
|
||
(merged_dir / fname).write_text(_dumps(shard_doc))
|
||
shards.append({"url": fname, "year": int(year) if year.isdigit() else 0,
|
||
"count": len(by_year[year])})
|
||
|
||
root_doc = {
|
||
**{k: v for k, v in index_meta.items() if k not in ("activities", "shards")},
|
||
"shards": shards,
|
||
"activities": [],
|
||
}
|
||
(merged_dir / "index.json").write_text(_dumps(root_doc))
|
||
|
||
|
||
FEED_PAGE_SIZE = 50
|
||
|
||
# Extra fields stripped from the combined feed — preview_coords is the biggest
|
||
# contributor (~24% of shard size) but the feed cards need it for thumbnails,
|
||
# so we keep it. mmp is never displayed in feed cards.
|
||
_COMBINED_FEED_STRIP = _FEED_STRIP | {"mmp", "vam_curve"}
|
||
|
||
|
||
def write_combined_feed(data_dir: Path) -> int:
|
||
"""Build data_dir/feed.json and per-month data_dir/feed-YYYY-MM.json shards.
|
||
|
||
feed.json is a BAS shard index (same format as per-user index.json).
|
||
Each feed-YYYY-MM.json contains all activities for that month across all users,
|
||
sorted newest-first. Returns the number of activities written.
|
||
"""
|
||
user_dirs = sorted(
|
||
p for p in data_dir.iterdir()
|
||
if p.is_dir() and (p / "activities").exists()
|
||
)
|
||
|
||
all_activities: list[dict] = []
|
||
for user_dir in user_dirs:
|
||
handle = user_dir.name
|
||
merged = user_dir / "_merged"
|
||
index_path = merged / "index.json" if merged.exists() else user_dir / "index.json"
|
||
if not index_path.exists():
|
||
continue
|
||
|
||
index = json.loads(index_path.read_text(encoding="utf-8"))
|
||
shards = index.get("shards", [])
|
||
activities = index.get("activities", [])
|
||
|
||
if shards:
|
||
year_shards = [s for s in shards if re.match(r"index-\d{4}\.json$", s.get("url", ""))]
|
||
base = index_path.parent
|
||
for shard in year_shards:
|
||
shard_path = base / shard["url"]
|
||
if shard_path.exists():
|
||
shard_data = json.loads(shard_path.read_text(encoding="utf-8"))
|
||
for a in shard_data.get("activities", []):
|
||
a_tagged = {**a, "handle": handle}
|
||
detail_url = a_tagged.get("detail_url", "")
|
||
if detail_url and not detail_url.startswith("http") and not detail_url.startswith("/"):
|
||
merged_rel = f"{handle}/_merged/" if merged.exists() else f"{handle}/"
|
||
a_tagged["detail_url"] = merged_rel + detail_url
|
||
track_url = a_tagged.get("track_url", "")
|
||
if track_url and not track_url.startswith("http") and not track_url.startswith("/"):
|
||
merged_rel = f"{handle}/_merged/" if merged.exists() else f"{handle}/"
|
||
a_tagged["track_url"] = merged_rel + track_url
|
||
all_activities.append(a_tagged)
|
||
else:
|
||
for a in activities:
|
||
all_activities.append({**a, "handle": handle})
|
||
|
||
all_activities.sort(key=lambda a: a.get("started_at", ""), reverse=True)
|
||
|
||
# Remove stale feed files (sequential pages and old year shards)
|
||
for f in data_dir.glob("feed*.json"):
|
||
f.unlink(missing_ok=True)
|
||
|
||
if not all_activities:
|
||
return 0
|
||
|
||
# Group by YYYY-MM (month), preserving newest-first order within each bucket
|
||
by_month: dict[str, list[dict]] = {}
|
||
for a in all_activities:
|
||
ym = (a.get("started_at") or "")[:7] # "YYYY-MM"
|
||
if len(ym) == 7 and ym[4] == "-":
|
||
by_month.setdefault(ym, []).append(a)
|
||
|
||
months_desc = sorted(by_month.keys(), reverse=True)
|
||
|
||
# Write per-month shard files (~150-200 acts each → ~25 KB gzip)
|
||
for ym, acts in by_month.items():
|
||
slim = [{k: v for k, v in a.items() if k not in _COMBINED_FEED_STRIP} for a in acts]
|
||
doc: dict = {"bas_version": "1.0", "activities": slim}
|
||
(data_dir / f"feed-{ym}.json").write_text(_dumps(doc))
|
||
|
||
# Write feed.json as a BAS shard index (same pattern as per-user index.json)
|
||
index_doc: dict = {
|
||
"bas_version": "1.0",
|
||
"total_activities": len(all_activities),
|
||
"shards": [{"url": f"feed-{ym}.json"} for ym in months_desc],
|
||
"activities": [],
|
||
}
|
||
(data_dir / "feed.json").write_text(_dumps(index_doc))
|
||
|
||
return len(all_activities)
|