Files
bincio-activity/bincio/serve/routers/segments.py
T
Davide Scaini 8380b1d2cc Refactor: split serve/server.py (3220 lines) into focused modules
serve/server.py is now 69 lines — app factory, middleware, and router
registration only.

New modules:
  deps.py    (168 lines) — module-level globals + auth dependency functions
  models.py   (85 lines) — all Pydantic request/response models
  tasks.py   (136 lines) — background workers and job tracker
  routers/               — one file per domain (10 routers, ~2750 lines total)
    auth.py, me.py, admin.py, activities.py, uploads.py,
    segments.py, strava.py, garmin.py, ideas.py, feed.py

cli.py updated to set globals on deps instead of server.

88 new regression tests in tests/serve/ cover auth guards and key
behaviours for every router; 294 total passing after the split.
2026-05-13 23:47:19 +02:00

294 lines
10 KiB
Python

"""Segments endpoints (/api/segments/*)."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Cookie, HTTPException
from fastapi.responses import JSONResponse
from bincio.serve import deps
from bincio.serve.models import CreateSegmentRequest
from bincio.segments import models as _seg_models
from bincio.segments import store as _seg_store
router = APIRouter()
def _scan_segment_for_user(dd: Path, handle: str, segment_id: str) -> int:
"""Scan all of a user's activities against one segment. Returns effort count."""
from datetime import datetime as _datetime
from bincio.segments.detect import track_from_timeseries_json, detect_one
seg = _seg_store.load_segment(dd, segment_id)
if seg is None:
return 0
user_dir = dd / handle
acts_dir = user_dir / "activities"
total = 0
for detail_path in sorted(acts_dir.glob("*.json")):
if ".timeseries." in detail_path.name:
continue
try:
detail = json.loads(detail_path.read_text(encoding="utf-8"))
except Exception:
continue
ts_url = detail.get("timeseries_url")
if not ts_url:
continue
ts_path = user_dir / ts_url
if not ts_path.exists():
continue
try:
ts = json.loads(ts_path.read_text(encoding="utf-8"))
except Exception:
continue
started_raw = detail.get("started_at")
if not started_raw:
continue
try:
started_at = _datetime.fromisoformat(started_raw.replace("Z", "+00:00"))
except Exception:
continue
track = track_from_timeseries_json(ts, detail.get("id", detail_path.stem),
detail.get("sport", "other"), started_at)
if track is None:
continue
efforts = detect_one(track, seg)
for effort in efforts:
_seg_store.add_effort(dd, handle, segment_id, effort)
total += len(efforts)
return total
@router.get("/api/segments")
async def get_segments(
bbox: Optional[str] = None,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""List segments, optionally filtered to a map viewport bbox (lon_min,lat_min,lon_max,lat_max)."""
deps._require_user(bincio_session)
parsed_bbox: Optional[list[float]] = None
if bbox:
try:
parts = [float(x) for x in bbox.split(",")]
if len(parts) == 4:
parsed_bbox = parts
except ValueError:
raise HTTPException(400, "bbox must be four comma-separated floats")
dd = deps._get_data_dir()
segs = _seg_store.list_segments(dd, parsed_bbox)
return JSONResponse([{
"id": s.id,
"name": s.name,
"sport": s.sport,
"distance_m": s.distance_m,
"bbox": s.bbox,
"polyline": s.polyline,
"created_by": s.created_by,
"created_at": _seg_store._iso(s.created_at),
} for s in segs])
@router.get("/api/segments/{segment_id}")
async def get_segment(segment_id: str) -> JSONResponse:
"""Return metadata for a single segment."""
dd = deps._get_data_dir()
seg = _seg_store.load_segment(dd, segment_id)
if seg is None:
raise HTTPException(404, "Segment not found")
return JSONResponse({
"id": seg.id,
"name": seg.name,
"sport": seg.sport,
"polyline": seg.polyline,
"distance_m": seg.distance_m,
"bbox": seg.bbox,
"created_by": seg.created_by,
"created_at": _seg_store._iso(seg.created_at),
})
@router.post("/api/segments")
async def create_segment(
body: CreateSegmentRequest,
background_tasks: BackgroundTasks,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
user = deps._require_user(bincio_session)
if len(body.polyline) < 2:
raise HTTPException(400, "polyline must have at least 2 points")
if body.distance_m < 500:
raise HTTPException(400, "segment must be at least 500 m long")
lats = [p[0] for p in body.polyline]
lons = [p[1] for p in body.polyline]
bbox = [min(lons), min(lats), max(lons), max(lats)]
seg_id = _seg_store.make_segment_id(body.name)
from datetime import datetime, timezone as _tz
seg = _seg_models.Segment(
id=seg_id,
name=body.name,
sport=body.sport or None,
polyline=body.polyline,
distance_m=body.distance_m,
bbox=bbox,
created_by=user.handle,
created_at=datetime.now(_tz.utc),
)
dd = deps._get_data_dir()
_seg_store.save_segment(dd, seg)
background_tasks.add_task(_scan_segment_for_user, dd, user.handle, seg_id)
return JSONResponse({"id": seg_id}, status_code=201)
@router.delete("/api/segments/{segment_id}")
async def delete_segment(
segment_id: str,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
user = deps._require_user(bincio_session)
dd = deps._get_data_dir()
seg = _seg_store.load_segment(dd, segment_id)
if seg is None:
raise HTTPException(404, "Segment not found")
if seg.created_by != user.handle and not user.is_admin:
raise HTTPException(403, "Not allowed")
_seg_store.delete_segment(dd, segment_id)
return JSONResponse({"ok": True})
@router.get("/api/segments/{segment_id}/efforts")
async def get_segment_efforts(
segment_id: str,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""Return all efforts on a segment for the logged-in user."""
user = deps._require_user(bincio_session)
dd = deps._get_data_dir()
seg = _seg_store.load_segment(dd, segment_id)
if seg is None:
raise HTTPException(404, "Segment not found")
efforts = _seg_store.load_efforts(dd, user.handle, segment_id)
return JSONResponse([
{
"activity_id": e.activity_id,
"started_at": _seg_store._iso(e.started_at),
"elapsed_s": e.elapsed_s,
"avg_speed_kmh": e.avg_speed_kmh,
"avg_hr_bpm": e.avg_hr_bpm,
"avg_power_w": e.avg_power_w,
"np_power_w": e.np_power_w,
}
for e in efforts
])
@router.post("/api/segments/{segment_id}/detect")
async def trigger_detect(
segment_id: str,
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""Retroactively detect efforts on a segment for the logged-in user."""
user = deps._require_user(bincio_session)
dd = deps._get_data_dir()
if _seg_store.load_segment(dd, segment_id) is None:
raise HTTPException(404, "Segment not found")
_seg_store.save_efforts(dd, user.handle, segment_id, [])
total = _scan_segment_for_user(dd, user.handle, segment_id)
return JSONResponse({"ok": True, "efforts_found": total})
@router.post("/api/me/segment-rescan")
async def me_segment_rescan(
bincio_session: Optional[str] = Cookie(default=None),
) -> JSONResponse:
"""Retroactively detect efforts for ALL segments across ALL activities for the logged-in user."""
user = deps._require_user(bincio_session)
dd = deps._get_data_dir()
user_dir = dd / user.handle
acts_dir = user_dir / "activities"
from datetime import datetime as _datetime
from bincio.segments.detect import track_from_timeseries_json, detect_one
import json as _json
segments = _seg_store.list_segments(dd)
if not segments:
return JSONResponse({"ok": True, "efforts_found": 0})
for seg in segments:
_seg_store.save_efforts(dd, user.handle, seg.id, [])
total = 0
for detail_path in sorted(acts_dir.glob("*.json")):
if ".timeseries." in detail_path.name:
continue
try:
detail = _json.loads(detail_path.read_text(encoding="utf-8"))
except Exception:
continue
ts_url = detail.get("timeseries_url")
if not ts_url:
continue
ts_path = user_dir / ts_url
if not ts_path.exists():
continue
try:
ts = _json.loads(ts_path.read_text(encoding="utf-8"))
except Exception:
continue
started_raw = detail.get("started_at")
if not started_raw:
continue
try:
started_at = _datetime.fromisoformat(started_raw.replace("Z", "+00:00"))
except Exception:
continue
track = track_from_timeseries_json(
ts, detail.get("id", detail_path.stem),
detail.get("sport", "other"), started_at,
)
if track is None:
continue
for seg in segments:
efforts = detect_one(track, seg)
for effort in efforts:
_seg_store.add_effort(dd, user.handle, seg.id, effort)
total += len(efforts)
return JSONResponse({"ok": True, "efforts_found": total})
@router.get("/api/users/{handle}/segment_summary")
async def user_segment_summary(handle: str) -> JSONResponse:
"""Public endpoint: segments where this user has efforts, with best time and count."""
dd = deps._get_data_dir()
efforts_dir = dd / handle / "segment_efforts"
result = []
if efforts_dir.exists():
for ef_file in sorted(efforts_dir.glob("*.json")):
seg_id = ef_file.stem
efforts = _seg_store.load_efforts(dd, handle, seg_id)
if not efforts:
continue
seg = _seg_store.load_segment(dd, seg_id)
if not seg:
continue
best = min(efforts, key=lambda e: e.elapsed_s)
result.append({
"segment": {
"id": seg.id,
"name": seg.name,
"sport": seg.sport,
"distance_m": seg.distance_m,
},
"best_elapsed_s": best.elapsed_s,
"best_activity_id": best.activity_id,
"effort_count": len(efforts),
})
result.sort(key=lambda x: x["segment"]["name"].lower())
return JSONResponse(result)