182 lines
6.1 KiB
Python
182 lines
6.1 KiB
Python
"""Read/write segments and segment efforts to/from /var/bincio."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from bincio.segments.models import Segment, SegmentEffort
|
|
|
|
# /var/bincio/segments/{id}.json
|
|
_SEGMENTS_DIR = "segments"
|
|
# /var/bincio/data/{handle}/segment_efforts/{segment_id}.json
|
|
_EFFORTS_SUBDIR = "segment_efforts"
|
|
|
|
|
|
# ── helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _segments_dir(data_dir: Path) -> Path:
|
|
d = data_dir / _SEGMENTS_DIR
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def _efforts_dir(data_dir: Path, handle: str) -> Path:
|
|
d = data_dir / handle / _EFFORTS_SUBDIR
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def _slugify(name: str) -> str:
|
|
s = name.lower().strip()
|
|
s = re.sub(r"[^a-z0-9]+", "-", s)
|
|
return s.strip("-")[:48]
|
|
|
|
|
|
def _make_id(name: str) -> str:
|
|
slug = _slugify(name)
|
|
suffix = hashlib.sha256(name.encode()).hexdigest()[:4]
|
|
return f"{slug}-{suffix}"
|
|
|
|
|
|
def _dt(s: str) -> datetime:
|
|
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
|
|
|
|
def _iso(dt: datetime) -> str:
|
|
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
# ── serialisation ─────────────────────────────────────────────────────────────
|
|
|
|
def _segment_to_dict(seg: Segment) -> dict:
|
|
return {
|
|
"id": seg.id,
|
|
"name": seg.name,
|
|
"sport": seg.sport,
|
|
"polyline": seg.polyline,
|
|
"distance_m": round(seg.distance_m, 1),
|
|
"bbox": [round(v, 6) for v in seg.bbox],
|
|
"created_by": seg.created_by,
|
|
"created_at": _iso(seg.created_at),
|
|
}
|
|
|
|
|
|
def _segment_from_dict(d: dict) -> Segment:
|
|
return Segment(
|
|
id=d["id"],
|
|
name=d["name"],
|
|
sport=d.get("sport"),
|
|
polyline=d["polyline"],
|
|
distance_m=float(d["distance_m"]),
|
|
bbox=d["bbox"],
|
|
created_by=d["created_by"],
|
|
created_at=_dt(d["created_at"]),
|
|
)
|
|
|
|
|
|
def _effort_to_dict(e: SegmentEffort) -> dict:
|
|
return {
|
|
"activity_id": e.activity_id,
|
|
"started_at": _iso(e.started_at),
|
|
"elapsed_s": e.elapsed_s,
|
|
"avg_speed_kmh": e.avg_speed_kmh,
|
|
"avg_hr_bpm": e.avg_hr_bpm,
|
|
"avg_power_w": e.avg_power_w,
|
|
"np_power_w": e.np_power_w,
|
|
"detected_at": _iso(e.detected_at),
|
|
}
|
|
|
|
|
|
def _effort_from_dict(d: dict) -> SegmentEffort:
|
|
return SegmentEffort(
|
|
activity_id=d["activity_id"],
|
|
started_at=_dt(d["started_at"]),
|
|
elapsed_s=int(d["elapsed_s"]),
|
|
avg_speed_kmh=d.get("avg_speed_kmh"),
|
|
avg_hr_bpm=d.get("avg_hr_bpm"),
|
|
avg_power_w=d.get("avg_power_w"),
|
|
np_power_w=d.get("np_power_w"),
|
|
detected_at=_dt(d["detected_at"]),
|
|
)
|
|
|
|
|
|
# ── public API ────────────────────────────────────────────────────────────────
|
|
|
|
def make_segment_id(name: str) -> str:
|
|
return _make_id(name)
|
|
|
|
|
|
def save_segment(data_dir: Path, seg: Segment) -> None:
|
|
path = _segments_dir(data_dir) / f"{seg.id}.json"
|
|
path.write_text(json.dumps(_segment_to_dict(seg), ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def load_segment(data_dir: Path, segment_id: str) -> Optional[Segment]:
|
|
path = _segments_dir(data_dir) / f"{segment_id}.json"
|
|
if not path.exists():
|
|
return None
|
|
return _segment_from_dict(json.loads(path.read_text(encoding="utf-8")))
|
|
|
|
|
|
def delete_segment(data_dir: Path, segment_id: str) -> bool:
|
|
path = _segments_dir(data_dir) / f"{segment_id}.json"
|
|
if not path.exists():
|
|
return False
|
|
path.unlink()
|
|
return True
|
|
|
|
|
|
def list_segments(data_dir: Path, bbox: Optional[list[float]] = None) -> list[Segment]:
|
|
"""Return all segments, optionally filtered to those overlapping bbox.
|
|
|
|
bbox = [lon_min, lat_min, lon_max, lat_max]
|
|
"""
|
|
segs = []
|
|
for path in sorted(_segments_dir(data_dir).glob("*.json")):
|
|
try:
|
|
seg = _segment_from_dict(json.loads(path.read_text(encoding="utf-8")))
|
|
except Exception:
|
|
continue
|
|
if bbox is not None and not _bboxes_overlap(seg.bbox, bbox):
|
|
continue
|
|
segs.append(seg)
|
|
return segs
|
|
|
|
|
|
def _bboxes_overlap(a: list[float], b: list[float]) -> bool:
|
|
"""True if two [lon_min, lat_min, lon_max, lat_max] boxes overlap."""
|
|
return not (a[2] < b[0] or b[2] < a[0] or a[3] < b[1] or b[3] < a[1])
|
|
|
|
|
|
# ── efforts ───────────────────────────────────────────────────────────────────
|
|
|
|
def load_efforts(data_dir: Path, handle: str, segment_id: str) -> list[SegmentEffort]:
|
|
path = _efforts_dir(data_dir, handle) / f"{segment_id}.json"
|
|
if not path.exists():
|
|
return []
|
|
try:
|
|
return [_effort_from_dict(d) for d in json.loads(path.read_text(encoding="utf-8"))]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def save_efforts(data_dir: Path, handle: str, segment_id: str, efforts: list[SegmentEffort]) -> None:
|
|
path = _efforts_dir(data_dir, handle) / f"{segment_id}.json"
|
|
data = [_effort_to_dict(e) for e in efforts]
|
|
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def add_effort(data_dir: Path, handle: str, segment_id: str, effort: SegmentEffort) -> None:
|
|
"""Append an effort, replacing any existing effort for the same activity."""
|
|
efforts = load_efforts(data_dir, handle, segment_id)
|
|
efforts = [e for e in efforts if e.activity_id != effort.activity_id or
|
|
e.started_at != effort.started_at]
|
|
efforts.append(effort)
|
|
efforts.sort(key=lambda e: e.started_at, reverse=True)
|
|
save_efforts(data_dir, handle, segment_id, efforts)
|