Segments phase 1: models, store, and API endpoints (GET/POST/DELETE /api/segments)

This commit is contained in:
Davide Scaini
2026-05-13 00:19:15 +02:00
parent 6b2698c0c5
commit 79cad29ff1
4 changed files with 297 additions and 0 deletions
View File
+29
View File
@@ -0,0 +1,29 @@
"""Segment and SegmentEffort data models."""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class Segment:
id: str
name: str
polyline: list[list[float]] # [[lat, lon], ...]
distance_m: float
bbox: list[float] # [lon_min, lat_min, lon_max, lat_max]
created_by: str
created_at: datetime
sport: Optional[str] = None # None = any sport
@dataclass
class SegmentEffort:
activity_id: str
started_at: datetime
elapsed_s: int
detected_at: datetime
avg_speed_kmh: Optional[float] = None
avg_hr_bpm: Optional[int] = None
avg_power_w: Optional[int] = None
np_power_w: Optional[int] = None
+181
View File
@@ -0,0 +1,181 @@
"""Read/write segments and segment efforts to/from /var/bincio."""
from __future__ import annotations
import hashlib
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from bincio.segments.models import Segment, SegmentEffort
# /var/bincio/segments/{id}.json
_SEGMENTS_DIR = "segments"
# /var/bincio/data/{handle}/segment_efforts/{segment_id}.json
_EFFORTS_SUBDIR = "segment_efforts"
# ── helpers ───────────────────────────────────────────────────────────────────
def _segments_dir(data_dir: Path) -> Path:
d = data_dir / _SEGMENTS_DIR
d.mkdir(parents=True, exist_ok=True)
return d
def _efforts_dir(data_dir: Path, handle: str) -> Path:
d = data_dir / handle / _EFFORTS_SUBDIR
d.mkdir(parents=True, exist_ok=True)
return d
def _slugify(name: str) -> str:
s = name.lower().strip()
s = re.sub(r"[^a-z0-9]+", "-", s)
return s.strip("-")[:48]
def _make_id(name: str) -> str:
slug = _slugify(name)
suffix = hashlib.sha256(name.encode()).hexdigest()[:4]
return f"{slug}-{suffix}"
def _dt(s: str) -> datetime:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
def _iso(dt: datetime) -> str:
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# ── serialisation ─────────────────────────────────────────────────────────────
def _segment_to_dict(seg: Segment) -> dict:
return {
"id": seg.id,
"name": seg.name,
"sport": seg.sport,
"polyline": seg.polyline,
"distance_m": round(seg.distance_m, 1),
"bbox": [round(v, 6) for v in seg.bbox],
"created_by": seg.created_by,
"created_at": _iso(seg.created_at),
}
def _segment_from_dict(d: dict) -> Segment:
return Segment(
id=d["id"],
name=d["name"],
sport=d.get("sport"),
polyline=d["polyline"],
distance_m=float(d["distance_m"]),
bbox=d["bbox"],
created_by=d["created_by"],
created_at=_dt(d["created_at"]),
)
def _effort_to_dict(e: SegmentEffort) -> dict:
return {
"activity_id": e.activity_id,
"started_at": _iso(e.started_at),
"elapsed_s": e.elapsed_s,
"avg_speed_kmh": e.avg_speed_kmh,
"avg_hr_bpm": e.avg_hr_bpm,
"avg_power_w": e.avg_power_w,
"np_power_w": e.np_power_w,
"detected_at": _iso(e.detected_at),
}
def _effort_from_dict(d: dict) -> SegmentEffort:
return SegmentEffort(
activity_id=d["activity_id"],
started_at=_dt(d["started_at"]),
elapsed_s=int(d["elapsed_s"]),
avg_speed_kmh=d.get("avg_speed_kmh"),
avg_hr_bpm=d.get("avg_hr_bpm"),
avg_power_w=d.get("avg_power_w"),
np_power_w=d.get("np_power_w"),
detected_at=_dt(d["detected_at"]),
)
# ── public API ────────────────────────────────────────────────────────────────
def make_segment_id(name: str) -> str:
return _make_id(name)
def save_segment(data_dir: Path, seg: Segment) -> None:
path = _segments_dir(data_dir) / f"{seg.id}.json"
path.write_text(json.dumps(_segment_to_dict(seg), ensure_ascii=False, indent=2), encoding="utf-8")
def load_segment(data_dir: Path, segment_id: str) -> Optional[Segment]:
path = _segments_dir(data_dir) / f"{segment_id}.json"
if not path.exists():
return None
return _segment_from_dict(json.loads(path.read_text(encoding="utf-8")))
def delete_segment(data_dir: Path, segment_id: str) -> bool:
path = _segments_dir(data_dir) / f"{segment_id}.json"
if not path.exists():
return False
path.unlink()
return True
def list_segments(data_dir: Path, bbox: Optional[list[float]] = None) -> list[Segment]:
"""Return all segments, optionally filtered to those overlapping bbox.
bbox = [lon_min, lat_min, lon_max, lat_max]
"""
segs = []
for path in sorted(_segments_dir(data_dir).glob("*.json")):
try:
seg = _segment_from_dict(json.loads(path.read_text(encoding="utf-8")))
except Exception:
continue
if bbox is not None and not _bboxes_overlap(seg.bbox, bbox):
continue
segs.append(seg)
return segs
def _bboxes_overlap(a: list[float], b: list[float]) -> bool:
"""True if two [lon_min, lat_min, lon_max, lat_max] boxes overlap."""
return not (a[2] < b[0] or b[2] < a[0] or a[3] < b[1] or b[3] < a[1])
# ── efforts ───────────────────────────────────────────────────────────────────
def load_efforts(data_dir: Path, handle: str, segment_id: str) -> list[SegmentEffort]:
path = _efforts_dir(data_dir, handle) / f"{segment_id}.json"
if not path.exists():
return []
try:
return [_effort_from_dict(d) for d in json.loads(path.read_text(encoding="utf-8"))]
except Exception:
return []
def save_efforts(data_dir: Path, handle: str, segment_id: str, efforts: list[SegmentEffort]) -> None:
path = _efforts_dir(data_dir, handle) / f"{segment_id}.json"
data = [_effort_to_dict(e) for e in efforts]
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def add_effort(data_dir: Path, handle: str, segment_id: str, effort: SegmentEffort) -> None:
"""Append an effort, replacing any existing effort for the same activity."""
efforts = load_efforts(data_dir, handle, segment_id)
efforts = [e for e in efforts if e.activity_id != effort.activity_id or
e.started_at != effort.started_at]
efforts.append(effort)
efforts.sort(key=lambda e: e.started_at, reverse=True)
save_efforts(data_dir, handle, segment_id, efforts)
+87
View File
@@ -120,6 +120,13 @@ class GenericResponse(BaseModel):
ok: bool = Field(True, description="Success flag")
class CreateSegmentRequest(BaseModel):
name: str = Field(..., description="Segment name")
sport: Optional[str] = Field(default=None, description="Sport filter (e.g. cycling)")
polyline: list[list[float]] = Field(..., description="[[lat, lon], ...] ordered GPS points")
distance_m: float = Field(..., description="Segment length in metres")
# ── Active job tracker ───────────────────────────────────────────────────────
# Tracks in-progress upload/processing jobs so admins can see what's running.
# Jobs are added when a streaming upload starts and removed when it finishes.
@@ -235,6 +242,8 @@ app.add_middleware(
_VALID_HANDLE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,29}$')
from bincio.edit.ops import VALID_ACTIVITY_ID as _VALID_ACTIVITY_ID
from bincio.segments import models as _seg_models
from bincio.segments import store as _seg_store
_SESSION_COOKIE = "bincio_session"
_COOKIE_MAX_AGE = 30 * 86400 # 30 days
_SESSION_DOMAIN = os.environ.get("SESSION_DOMAIN") or None # e.g. ".bincio.org" in production
@@ -2397,6 +2406,84 @@ async def upload_strava_zip(
)
# ── Segments ──────────────────────────────────────────────────────────────────
@app.get("/api/segments")
async def get_segments(
bbox: Optional[str] = None,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""List segments, optionally filtered to a map viewport bbox (lon_min,lat_min,lon_max,lat_max)."""
_require_user(bincio_session)
parsed_bbox: Optional[list[float]] = None
if bbox:
try:
parts = [float(x) for x in bbox.split(",")]
if len(parts) == 4:
parsed_bbox = parts
except ValueError:
raise HTTPException(400, "bbox must be four comma-separated floats")
dd = _get_data_dir()
segs = _seg_store.list_segments(dd, parsed_bbox)
return JSONResponse([{
"id": s.id,
"name": s.name,
"sport": s.sport,
"distance_m": s.distance_m,
"bbox": s.bbox,
"polyline": s.polyline,
"created_by": s.created_by,
"created_at": _seg_store._iso(s.created_at),
} for s in segs])
@app.post("/api/segments")
async def create_segment(
body: CreateSegmentRequest,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
user = _require_user(bincio_session)
if len(body.polyline) < 2:
raise HTTPException(400, "polyline must have at least 2 points")
if body.distance_m <= 0:
raise HTTPException(400, "distance_m must be positive")
lats = [p[0] for p in body.polyline]
lons = [p[1] for p in body.polyline]
bbox = [min(lons), min(lats), max(lons), max(lats)]
seg_id = _seg_store.make_segment_id(body.name)
from datetime import datetime, timezone as _tz
seg = _seg_models.Segment(
id=seg_id,
name=body.name,
sport=body.sport or None,
polyline=body.polyline,
distance_m=body.distance_m,
bbox=bbox,
created_by=user.handle,
created_at=datetime.now(_tz.utc),
)
_seg_store.save_segment(_get_data_dir(), seg)
return JSONResponse({"id": seg_id}, status_code=201)
@app.delete("/api/segments/{segment_id}")
async def delete_segment(
segment_id: str,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
user = _require_user(bincio_session)
dd = _get_data_dir()
seg = _seg_store.load_segment(dd, segment_id)
if seg is None:
raise HTTPException(404, "Segment not found")
if seg.created_by != user.handle and not user.is_admin:
raise HTTPException(403, "Not allowed")
_seg_store.delete_segment(dd, segment_id)
return JSONResponse({"ok": True})
# ── Feedback ──────────────────────────────────────────────────────────────────
_FEEDBACK_IMAGE_SUFFIXES = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic"}