Files
bincio-planner/server/server.py
T
2026-05-18 21:24:11 +02:00

427 lines
15 KiB
Python

"""BincioPlanner API — save/load route plans.
Auth: reads bincio_session cookie, validates against shared instance.db (same strategy as bincio_wiki).
Storage: JSON files at $PLANS_DIR/{handle}/{plan_id}.json
Collections: $PLANS_DIR/{handle}/_collections.json
Shared plans: $PLANS_DIR/_shared/{plan_id}.json (any authed user can read/write/delete)
Run: uvicorn server:app --host 127.0.0.1 --port 8002
Env vars:
SHARED_DB_PATH path to bincio_activity instance.db (default: /var/bincio/data/instance.db)
PLANS_DIR root dir for plan JSON files (default: /var/bincio_planner)
"""
from __future__ import annotations
import json
import os
import re
import secrets
import sqlite3
import time
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from fastapi import Cookie, Depends, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
_SHARED_DB_PATH = Path(os.environ.get("SHARED_DB_PATH", "/var/bincio/data/instance.db"))
_PLANS_DIR = Path(os.environ.get("PLANS_DIR", "/var/bincio_planner"))
_SESSION_COOKIE = "bincio_session"
_DEV_HANDLE = os.environ.get("DEV_HANDLE", "")
_SAFE_ID = re.compile(r"^[A-Za-z0-9_-]{1,32}$")
# ── Auth ───────────────────────────────────────────────────────────────────────
@dataclass
class User:
handle: str
display_name: str
@contextmanager
def _db():
if not _SHARED_DB_PATH.exists():
raise HTTPException(503, f"Shared DB not found at {_SHARED_DB_PATH}. "
"Set SHARED_DB_PATH or run bincio_activity first.")
con = sqlite3.connect(_SHARED_DB_PATH, check_same_thread=False)
con.row_factory = sqlite3.Row
con.execute("PRAGMA journal_mode=WAL")
try:
yield con
finally:
con.close()
def _get_session_user(token: str) -> Optional[User]:
try:
with _db() as con:
row = con.execute(
"SELECT s.handle, s.expires_at, u.display_name, u.activity_access, u.suspended "
"FROM sessions s JOIN users u ON s.handle = u.handle "
"WHERE s.token = ?",
(token,),
).fetchone()
except HTTPException:
raise
except Exception:
return None
if not row:
return None
if row["expires_at"] < int(time.time()):
return None
if not row["activity_access"]:
return None
if row["suspended"]:
return None
return User(handle=row["handle"], display_name=row["display_name"])
async def require_auth(bincio_session: Optional[str] = Cookie(default=None)) -> User:
if _DEV_HANDLE:
return User(handle=_DEV_HANDLE, display_name=_DEV_HANDLE)
if not bincio_session:
raise HTTPException(401, "Authentication required")
user = _get_session_user(bincio_session)
if not user:
raise HTTPException(401, "Authentication required")
return user
# ── App ────────────────────────────────────────────────────────────────────────
app = FastAPI(title="BincioPlanner API", docs_url=None, redoc_url=None)
app.add_middleware(GZipMiddleware, minimum_size=1024)
app.add_middleware(
CORSMiddleware,
allow_origin_regex=r"https?://localhost(:\d+)?",
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Content-Type"],
)
# ── Storage helpers ────────────────────────────────────────────────────────────
def _user_dir(handle: str) -> Path:
d = _PLANS_DIR / handle
d.mkdir(parents=True, exist_ok=True)
return d
def _plan_path(handle: str, plan_id: str) -> Path:
if not _SAFE_ID.match(plan_id):
raise HTTPException(400, "Invalid plan id")
return _PLANS_DIR / handle / f"{plan_id}.json"
def _collections_path(handle: str) -> Path:
return _PLANS_DIR / handle / "_collections.json"
def _read_collections(handle: str) -> list[dict]:
p = _collections_path(handle)
if not p.exists():
return []
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
return []
def _write_collections(handle: str, cols: list[dict]) -> None:
_user_dir(handle)
_collections_path(handle).write_text(json.dumps(cols), encoding="utf-8")
def _shared_dir() -> Path:
d = _PLANS_DIR / "_shared"
d.mkdir(parents=True, exist_ok=True)
return d
def _shared_plan_path(plan_id: str) -> Path:
if not _SAFE_ID.match(plan_id):
raise HTTPException(400, "Invalid plan id")
return _PLANS_DIR / "_shared" / f"{plan_id}.json"
# ── Endpoints ──────────────────────────────────────────────────────────────────
class PlanBody(BaseModel):
name: str
waypoints: list[dict]
profile: str
geojson: dict | None = None
gpxTrack: dict | None = None # full GeoJSON of reference track, if any GPX segments used
gpxColorIdx: int | None = None
gpxOpacity: float | None = None
collection_id: str | None = None
dist_km: float | None = None
elevation_gain: int | None = None
class CollectionBody(BaseModel):
name: str
@app.get("/api/plans")
async def list_plans(user: User = Depends(require_auth)) -> JSONResponse:
d = _PLANS_DIR / user.handle
if not d.exists():
return JSONResponse({"plans": []})
plans = []
_STRIP = {"geojson", "gpxTrack"}
for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True):
if p.name.startswith("_"):
continue
try:
data = json.loads(p.read_text(encoding="utf-8"))
plans.append({k: v for k, v in data.items() if k not in _STRIP})
except Exception:
pass
return JSONResponse({"plans": plans})
@app.post("/api/plans")
async def create_plan(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse:
plan_id = secrets.token_urlsafe(12)
now = int(time.time())
data: dict = {
"id": plan_id,
"name": body.name.strip() or "Unnamed route",
"waypoints": body.waypoints,
"profile": body.profile,
"created_at": now,
"updated_at": now,
}
if body.collection_id:
data["collection_id"] = body.collection_id
if body.dist_km is not None:
data["dist_km"] = body.dist_km
if body.elevation_gain is not None:
data["elevation_gain"] = body.elevation_gain
if body.geojson is not None:
data["geojson"] = body.geojson
if body.gpxTrack is not None:
data["gpxTrack"] = body.gpxTrack
data["gpxColorIdx"] = body.gpxColorIdx
data["gpxOpacity"] = body.gpxOpacity
_user_dir(user.handle).joinpath(f"{plan_id}.json").write_text(
json.dumps(data), encoding="utf-8"
)
return JSONResponse({"id": plan_id, "created_at": now})
@app.get("/api/plans/{plan_id}")
async def get_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse:
p = _plan_path(user.handle, plan_id)
if not p.exists():
raise HTTPException(404, "Plan not found")
return JSONResponse(json.loads(p.read_text(encoding="utf-8")))
@app.put("/api/plans/{plan_id}")
async def update_plan(
plan_id: str, body: PlanBody, user: User = Depends(require_auth)
) -> JSONResponse:
p = _plan_path(user.handle, plan_id)
if not p.exists():
raise HTTPException(404, "Plan not found")
data = json.loads(p.read_text(encoding="utf-8"))
data.update({
"name": body.name.strip() or data.get("name", "Unnamed route"),
"waypoints": body.waypoints,
"profile": body.profile,
"updated_at": int(time.time()),
})
# collection_id: non-null assigns, null unassigns
if body.collection_id:
data["collection_id"] = body.collection_id
else:
data.pop("collection_id", None)
if body.dist_km is not None:
data["dist_km"] = body.dist_km
if body.elevation_gain is not None:
data["elevation_gain"] = body.elevation_gain
if body.geojson is not None:
data["geojson"] = body.geojson
if body.gpxTrack is not None:
data["gpxTrack"] = body.gpxTrack
data["gpxColorIdx"] = body.gpxColorIdx
data["gpxOpacity"] = body.gpxOpacity
elif "gpxTrack" in data:
del data["gpxTrack"]
data.pop("gpxColorIdx", None)
data.pop("gpxOpacity", None)
p.write_text(json.dumps(data), encoding="utf-8")
return JSONResponse({"id": plan_id, "updated": True})
@app.delete("/api/plans/{plan_id}")
async def delete_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse:
p = _plan_path(user.handle, plan_id)
if not p.exists():
raise HTTPException(404, "Plan not found")
p.unlink()
return JSONResponse({"id": plan_id, "deleted": True})
# ── Collection endpoints ───────────────────────────────────────────────────────
@app.get("/api/collections")
async def list_collections(user: User = Depends(require_auth)) -> JSONResponse:
return JSONResponse({"collections": _read_collections(user.handle)})
@app.post("/api/collections")
async def create_collection(body: CollectionBody, user: User = Depends(require_auth)) -> JSONResponse:
col_id = secrets.token_urlsafe(12)
now = int(time.time())
cols = _read_collections(user.handle)
cols.append({"id": col_id, "name": body.name.strip() or "Unnamed collection", "created_at": now})
_write_collections(user.handle, cols)
return JSONResponse({"id": col_id, "created_at": now})
@app.put("/api/collections/{col_id}")
async def update_collection(
col_id: str, body: CollectionBody, user: User = Depends(require_auth)
) -> JSONResponse:
cols = _read_collections(user.handle)
for c in cols:
if c["id"] == col_id:
c["name"] = body.name.strip() or c["name"]
_write_collections(user.handle, cols)
return JSONResponse({"id": col_id, "updated": True})
raise HTTPException(404, "Collection not found")
@app.delete("/api/collections/{col_id}")
async def delete_collection(
col_id: str,
mode: str = Query(default="unassign"),
user: User = Depends(require_auth),
) -> JSONResponse:
cols = _read_collections(user.handle)
if not any(c["id"] == col_id for c in cols):
raise HTTPException(404, "Collection not found")
_write_collections(user.handle, [c for c in cols if c["id"] != col_id])
d = _PLANS_DIR / user.handle
if d.exists():
for p in d.glob("*.json"):
if p.name.startswith("_"):
continue
try:
data = json.loads(p.read_text(encoding="utf-8"))
if data.get("collection_id") == col_id:
if mode == "delete":
p.unlink()
else:
data.pop("collection_id", None)
p.write_text(json.dumps(data), encoding="utf-8")
except Exception:
pass
return JSONResponse({"id": col_id, "deleted": True})
# ── Shared plan endpoints ──────────────────────────────────────────────────────
_SHARED_STRIP = {"geojson", "gpxTrack"}
@app.get("/api/shared")
async def list_shared(user: User = Depends(require_auth)) -> JSONResponse:
d = _PLANS_DIR / "_shared"
if not d.exists():
return JSONResponse({"plans": []})
plans = []
for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True):
try:
data = json.loads(p.read_text(encoding="utf-8"))
plans.append({k: v for k, v in data.items() if k not in _SHARED_STRIP})
except Exception:
pass
return JSONResponse({"plans": plans})
@app.post("/api/shared")
async def create_shared(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse:
plan_id = secrets.token_urlsafe(12)
now = int(time.time())
data: dict = {
"id": plan_id,
"author": user.handle,
"name": body.name.strip() or "Unnamed route",
"waypoints": body.waypoints,
"profile": body.profile,
"created_at": now,
"updated_at": now,
}
if body.dist_km is not None:
data["dist_km"] = body.dist_km
if body.elevation_gain is not None:
data["elevation_gain"] = body.elevation_gain
if body.geojson is not None:
data["geojson"] = body.geojson
if body.gpxTrack is not None:
data["gpxTrack"] = body.gpxTrack
data["gpxColorIdx"] = body.gpxColorIdx
data["gpxOpacity"] = body.gpxOpacity
_shared_dir().joinpath(f"{plan_id}.json").write_text(json.dumps(data), encoding="utf-8")
return JSONResponse({"id": plan_id, "created_at": now})
@app.get("/api/shared/{plan_id}")
async def get_shared(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse:
p = _shared_plan_path(plan_id)
if not p.exists():
raise HTTPException(404, "Shared plan not found")
return JSONResponse(json.loads(p.read_text(encoding="utf-8")))
@app.put("/api/shared/{plan_id}")
async def update_shared(
plan_id: str, body: PlanBody, user: User = Depends(require_auth)
) -> JSONResponse:
p = _shared_plan_path(plan_id)
if not p.exists():
raise HTTPException(404, "Shared plan not found")
data = json.loads(p.read_text(encoding="utf-8"))
data.update({
"name": body.name.strip() or data.get("name", "Unnamed route"),
"waypoints": body.waypoints,
"profile": body.profile,
"updated_at": int(time.time()),
})
if body.dist_km is not None:
data["dist_km"] = body.dist_km
if body.elevation_gain is not None:
data["elevation_gain"] = body.elevation_gain
if body.geojson is not None:
data["geojson"] = body.geojson
if body.gpxTrack is not None:
data["gpxTrack"] = body.gpxTrack
data["gpxColorIdx"] = body.gpxColorIdx
data["gpxOpacity"] = body.gpxOpacity
elif "gpxTrack" in data:
del data["gpxTrack"]
data.pop("gpxColorIdx", None)
data.pop("gpxOpacity", None)
p.write_text(json.dumps(data), encoding="utf-8")
return JSONResponse({"id": plan_id, "updated": True})
@app.delete("/api/shared/{plan_id}")
async def delete_shared(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse:
p = _shared_plan_path(plan_id)
if not p.exists():
raise HTTPException(404, "Shared plan not found")
p.unlink()
return JSONResponse({"id": plan_id, "deleted": True})