"""Pre-bake per-handle GPS tracks for the Explore page. Reads all activity GeoJSON files for a handle, applies RDP simplification, and writes per-year tracks_YYYY.json shards plus a tracks_index.json manifest for progressive client-side loading. """ from __future__ import annotations import json import time from pathlib import Path from bincio.extract.simplify import _rdp_mask _VERSION = 2 _RDP_EPSILON = 0.0001 # ~10 m on the ground _SPORT_MAP: dict[str, str] = { "cycling": "cycling", "road_cycling": "cycling", "gravel_cycling": "cycling", "mountain_biking": "cycling", "e_biking": "cycling", "indoor_cycling": "cycling", "biking": "cycling", "bike": "cycling", "ride": "cycling", "running": "running", "trail_running": "running", "treadmill_running": "running", "jogging": "running", "hiking": "hiking", "walking": "hiking", "trekking": "hiking", "mountaineering": "hiking", "skiing": "skiing", "cross_country_skiing": "skiing", "alpine_skiing": "skiing", "snowboarding": "skiing", } def _sport_to_type(sport: str | None) -> str: if not sport: return "other" return _SPORT_MAP.get(sport.lower(), "other") def bake_tracks(handle: str, data_dir: Path) -> int: """Build tracks.json for handle. Returns number of tracks included.""" acts_dir = data_dir / handle / "activities" if not acts_dir.exists(): return 0 tracks = [] for gj_path in sorted(acts_dir.glob("*.geojson")): act_id = gj_path.stem meta: dict = {} meta_path = acts_dir / f"{act_id}.json" if meta_path.exists(): try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): pass else: # bare-timestamp geojson with no metadata — superseded by a slug version if list(acts_dir.glob(f"{act_id}-*.geojson")): continue if meta.get("virtual") or meta.get("sub_sport") == "indoor": continue try: gj = json.loads(gj_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue raw_coords = gj.get("geometry", {}).get("coordinates") or [] if len(raw_coords) < 2: continue lng_lat = [[float(c[0]), float(c[1])] for c in raw_coords if len(c) >= 2] if len(lng_lat) < 2: continue mask = _rdp_mask(lng_lat, epsilon=_RDP_EPSILON) simplified = [pt for pt, keep in zip(lng_lat, mask) if keep] if len(simplified) < 2: continue tracks.append({ "id": act_id, "date": (meta.get("started_at") or "")[:10], "type": _sport_to_type(meta.get("sport")), "name": meta.get("title") or act_id, "dist": int(meta.get("distance_m") or 0), "coords": simplified, }) tracks.sort(key=lambda t: t["date"], reverse=True) user_dir = data_dir / handle now = int(time.time()) # Group into per-year buckets by_year: dict[str, list] = {} for t in tracks: year = t["date"][:4] or "0000" by_year.setdefault(year, []).append(t) # Remove stale year shards that no longer have data for old in user_dir.glob("tracks_*.json"): stem = old.stem # e.g. "tracks_2024" or "tracks_index" if stem == "tracks_index": continue year_part = stem[len("tracks_"):] if year_part not in by_year: old.unlink(missing_ok=True) # Write per-year shards for year, year_tracks in by_year.items(): shard_path = user_dir / f"tracks_{year}.json" shard_path.write_text( json.dumps({ "v": _VERSION, "handle": handle, "year": year, "generated_at": now, "tracks": year_tracks, }), encoding="utf-8", ) # Write manifest years_sorted = sorted(by_year.keys(), reverse=True) index_path = user_dir / "tracks_index.json" index_path.write_text( json.dumps({ "v": _VERSION, "handle": handle, "generated_at": now, "total": len(tracks), "years": years_sorted, }), encoding="utf-8", ) # Remove legacy monolithic file if present legacy = user_dir / "tracks.json" legacy.unlink(missing_ok=True) return len(tracks)