"""Merge and unmerge activity endpoints.""" from __future__ import annotations import json import shutil from datetime import datetime, timezone from pathlib import Path from typing import Any from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from bincio.serve import deps, tasks from bincio.serve.db import User router = APIRouter() _MERGES_FILE = "_merges.json" _BACKUP_DIR = "_merge_backup" def _read_merges(user_dir: Path) -> dict: p = user_dir / _MERGES_FILE if p.exists(): try: return json.loads(p.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): pass return {"hidden": [], "merges": {}} def _write_merges(user_dir: Path, data: dict) -> None: (user_dir / _MERGES_FILE).write_text( json.dumps(data, ensure_ascii=False), encoding="utf-8" ) def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() @router.get("/api/merges") async def get_merges(user: User = Depends(deps._require_auth)) -> JSONResponse: user_dir = deps._get_data_dir() / user.handle return JSONResponse(_read_merges(user_dir)) class MergeRequest(BaseModel): activity_ids: list[str] @router.post("/api/merge") async def merge_activities( body: MergeRequest, user: User = Depends(deps._require_auth), ) -> JSONResponse: if len(body.activity_ids) < 2: raise HTTPException(400, "Need at least 2 activities to merge") dd = deps._get_data_dir() user_dir = dd / user.handle acts_dir = user_dir / "activities" activities: list[dict] = [] for aid in body.activity_ids: deps._check_id(aid) p = acts_dir / f"{aid}.json" if not p.exists(): raise HTTPException(404, f"Activity {aid} not found") try: a = json.loads(p.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): raise HTTPException(500, f"Could not read activity {aid}") a["_id"] = aid activities.append(a) sports = {a.get("sport") for a in activities} if len(sports) > 1: raise HTTPException(400, "Cannot merge activities of different sports") activities.sort(key=lambda a: a.get("started_at", "")) primary = activities[0] secondaries = activities[1:] primary_id = primary["_id"] secondary_ids = [a["_id"] for a in secondaries] def _sum(key: str) -> float | None: vals = [a.get(key) for a in activities if a.get(key) is not None] return sum(vals) if vals else None def _wavg(key: str) -> float | None: pairs = [(a.get(key), a.get("moving_time_s")) for a in activities] pairs = [(v, w) for v, w in pairs if v is not None and w and w > 0] if not pairs: return None total_w = sum(w for _, w in pairs) return sum(v * w for v, w in pairs) / total_w if total_w > 0 else None distance_m = _sum("distance_m") duration_s = _sum("duration_s") moving_time_s = _sum("moving_time_s") elevation_gain_m = _sum("elevation_gain_m") avg_speed_kmh = (distance_m / moving_time_s * 3.6) if distance_m and moving_time_s else None avg_hr_bpm_val = _wavg("avg_hr_bpm") avg_power_w_val = _wavg("avg_power_w") backup_dir = user_dir / _BACKUP_DIR backup_dir.mkdir(exist_ok=True) for suffix in (".json", ".geojson", ".timeseries.json"): src = acts_dir / f"{primary_id}{suffix}" if src.exists(): shutil.copy2(src, backup_dir / f"{primary_id}{suffix}.bak") primary_data: dict[str, Any] = {k: v for k, v in primary.items() if not k.startswith("_")} primary_data["distance_m"] = distance_m primary_data["duration_s"] = duration_s primary_data["moving_time_s"] = moving_time_s primary_data["elevation_gain_m"] = elevation_gain_m if avg_speed_kmh is not None: primary_data["avg_speed_kmh"] = round(avg_speed_kmh, 3) if avg_hr_bpm_val is not None: primary_data["avg_hr_bpm"] = round(avg_hr_bpm_val) if avg_power_w_val is not None: primary_data["avg_power_w"] = round(avg_power_w_val) primary_data["merged_ids"] = secondary_ids (acts_dir / f"{primary_id}.json").write_text( json.dumps(primary_data, ensure_ascii=False, indent=2), encoding="utf-8" ) _merge_geojson(acts_dir, primary_id, secondary_ids) _merge_timeseries(acts_dir, primary_id, secondary_ids, activities) merges = _read_merges(user_dir) for sid in secondary_ids: if sid not in merges["hidden"]: merges["hidden"].append(sid) merges.setdefault("merges", {})[primary_id] = { "secondary_ids": secondary_ids, "merged_at": _now_iso(), } _write_merges(user_dir, merges) from bincio.render.merge import merge_one merge_one(user_dir, primary_id) tasks._trigger_rebuild(user.handle) return JSONResponse({"ok": True, "primary_id": primary_id, "hidden": secondary_ids}) @router.post("/api/unmerge/{primary_id}") async def unmerge_activity( primary_id: str, user: User = Depends(deps._require_auth), ) -> JSONResponse: deps._check_id(primary_id) dd = deps._get_data_dir() user_dir = dd / user.handle acts_dir = user_dir / "activities" backup_dir = user_dir / _BACKUP_DIR merges = _read_merges(user_dir) merge_info = merges.get("merges", {}).get(primary_id) if not merge_info: raise HTTPException(404, "No merge record found for this activity") secondary_ids: list[str] = merge_info.get("secondary_ids", []) for suffix in (".json", ".geojson", ".timeseries.json"): bak = backup_dir / f"{primary_id}{suffix}.bak" if bak.exists(): shutil.copy2(bak, acts_dir / f"{primary_id}{suffix}") bak.unlink() merges["hidden"] = [h for h in merges.get("hidden", []) if h not in secondary_ids] merges["merges"].pop(primary_id, None) _write_merges(user_dir, merges) from bincio.render.merge import merge_one merge_one(user_dir, primary_id) tasks._trigger_rebuild(user.handle) return JSONResponse({"ok": True, "restored": secondary_ids}) def _merge_geojson(acts_dir: Path, primary_id: str, secondary_ids: list[str]) -> None: primary_path = acts_dir / f"{primary_id}.geojson" if not primary_path.exists(): return try: geo = json.loads(primary_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return features: list = list(geo.get("features", [])) for sid in secondary_ids: p = acts_dir / f"{sid}.geojson" if not p.exists(): continue try: sec = json.loads(p.read_text(encoding="utf-8")) features.extend(sec.get("features", [])) except (OSError, json.JSONDecodeError): continue geo["features"] = features primary_path.write_text(json.dumps(geo, ensure_ascii=False), encoding="utf-8") def _merge_timeseries( acts_dir: Path, primary_id: str, secondary_ids: list[str], activities: list[dict], ) -> None: primary_path = acts_dir / f"{primary_id}.timeseries.json" if not primary_path.exists(): return try: merged = json.loads(primary_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return merged = {k: list(v) if isinstance(v, list) else v for k, v in merged.items()} _ARRAY_KEYS = ("lat", "lon", "elevation_m", "speed_kmh", "hr_bpm", "cadence_rpm", "power_w", "temperature_c") for i, sid in enumerate(secondary_ids): sec_path = acts_dir / f"{sid}.timeseries.json" if not sec_path.exists(): continue try: sec = json.loads(sec_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): continue sec_t: list = sec.get("t") or [] if not sec_t: continue try: p_start = datetime.fromisoformat( activities[0].get("started_at", "").replace("Z", "+00:00") ).timestamp() s_start = datetime.fromisoformat( activities[i + 1].get("started_at", "").replace("Z", "+00:00") ).timestamp() offset = s_start - p_start except (ValueError, TypeError): cur_t: list = merged.get("t") or [] offset = (cur_t[-1] + 1) if cur_t else 0 adjusted_t = [offset + t for t in sec_t] primary_len = len(merged.get("t") or []) if merged.get("t") is None: merged["t"] = [] merged["t"].extend(adjusted_t) for key in _ARRAY_KEYS: sec_vals = sec.get(key) if sec_vals is None: if merged.get(key) is not None: merged[key] = merged[key] + [None] * len(sec_t) else: if merged.get(key) is None: merged[key] = [None] * primary_len + list(sec_vals) else: merged[key] = merged[key] + list(sec_vals) primary_path.write_text(json.dumps(merged, ensure_ascii=False), encoding="utf-8")