Files
bincio-planner/server/server.py
T
2026-06-03 21:24:03 +02:00

446 lines
16 KiB
Python

"""BincioPlanner API — save/load route plans.
Auth: validates bincio_session JWT from bincio-auth.
Accepts RS256 OIDC tokens (BINCIO_OIDC_ISSUER) and HS256 session tokens (BINCIO_AUTH_JWT_SECRET).
Storage: JSON files at $PLANS_DIR/{handle}/{plan_id}.json
Collections: $PLANS_DIR/{handle}/_collections.json
Shared plans: $PLANS_DIR/_shared/{plan_id}.json (any authed user can read/write/delete)
Run: uvicorn server:app --host 127.0.0.1 --port 8002
Env vars:
BINCIO_OIDC_ISSUER OIDC issuer URL — JWKS fetched from {issuer}/.well-known/jwks.json
BINCIO_AUTH_JWT_SECRET HS256 fallback secret
PLANS_DIR root dir for plan JSON files (default: /var/bincio_planner)
DEV_HANDLE bypass auth in local dev
"""
from __future__ import annotations
import json
import os
import re
import secrets
import time
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import jwt as _jwt
from jwt.algorithms import RSAAlgorithm as _RSAAlgorithm
from fastapi import Cookie, Depends, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
_JWT_SECRET = os.environ.get("BINCIO_AUTH_JWT_SECRET", "")
_OIDC_ISSUER = os.environ.get("BINCIO_OIDC_ISSUER", "")
_PLANS_DIR = Path(os.environ.get("PLANS_DIR", "/var/bincio_planner"))
_DEV_HANDLE = os.environ.get("DEV_HANDLE", "")
_SAFE_ID = re.compile(r"^[A-Za-z0-9_-]{1,32}$")
# Load RS256 public key from JWKS at startup.
_rs256_key = None
if _OIDC_ISSUER:
try:
with urllib.request.urlopen(f"{_OIDC_ISSUER}/.well-known/jwks.json", timeout=5) as _r:
_jwks = json.loads(_r.read())
for _k in _jwks.get("keys", []):
if _k.get("kty") == "RSA":
_rs256_key = _RSAAlgorithm.from_jwk(json.dumps(_k))
break
except Exception as _e:
print(f"Warning: could not load JWKS from {_OIDC_ISSUER}: {_e}")
# ── Auth ───────────────────────────────────────────────────────────────────────
@dataclass
class User:
handle: str
display_name: str
def _decode_session(token: str) -> Optional[User]:
# Try RS256 (OIDC id_token from bincio-auth)
if _rs256_key:
try:
payload = _jwt.decode(token, _rs256_key, algorithms=["RS256"],
options={"verify_aud": False})
handle = payload.get("sub")
if handle and payload.get("activity_access"):
return User(handle=handle,
display_name=payload.get("name") or payload.get("display_name") or handle)
except _jwt.PyJWTError:
pass
# Fall back to HS256 session token
if not _JWT_SECRET:
return None
try:
payload = _jwt.decode(token, _JWT_SECRET, algorithms=["HS256"])
except _jwt.PyJWTError:
return None
handle = payload.get("sub")
if not handle or not payload.get("activity_access"):
return None
return User(handle=handle, display_name=payload.get("display_name", handle))
async def require_auth(bincio_session: Optional[str] = Cookie(default=None)) -> User:
if _DEV_HANDLE:
return User(handle=_DEV_HANDLE, display_name=_DEV_HANDLE)
if not bincio_session:
raise HTTPException(401, "Authentication required")
user = _decode_session(bincio_session)
if not user:
raise HTTPException(401, "Authentication required")
return user
# ── App ────────────────────────────────────────────────────────────────────────
app = FastAPI(title="BincioPlanner API", docs_url=None, redoc_url=None)
app.add_middleware(GZipMiddleware, minimum_size=1024)
app.add_middleware(
CORSMiddleware,
allow_origin_regex=r"https?://localhost(:\d+)?",
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Content-Type"],
)
# ── Storage helpers ────────────────────────────────────────────────────────────
def _user_dir(handle: str) -> Path:
d = _PLANS_DIR / handle
d.mkdir(parents=True, exist_ok=True)
return d
def _plan_path(handle: str, plan_id: str) -> Path:
if not _SAFE_ID.match(plan_id):
raise HTTPException(400, "Invalid plan id")
return _PLANS_DIR / handle / f"{plan_id}.json"
def _collections_path(handle: str) -> Path:
return _PLANS_DIR / handle / "_collections.json"
def _read_collections(handle: str) -> list[dict]:
p = _collections_path(handle)
if not p.exists():
return []
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
return []
def _write_collections(handle: str, cols: list[dict]) -> None:
_user_dir(handle)
_collections_path(handle).write_text(json.dumps(cols), encoding="utf-8")
def _shared_dir() -> Path:
d = _PLANS_DIR / "_shared"
d.mkdir(parents=True, exist_ok=True)
return d
def _shared_plan_path(plan_id: str) -> Path:
if not _SAFE_ID.match(plan_id):
raise HTTPException(400, "Invalid plan id")
return _PLANS_DIR / "_shared" / f"{plan_id}.json"
# ── Endpoints ──────────────────────────────────────────────────────────────────
class PlanBody(BaseModel):
name: str
waypoints: list[dict]
profile: str
description: str | None = None
geojson: dict | None = None
gpxTrack: dict | None = None # full GeoJSON of reference track, if any GPX segments used
gpxColorIdx: int | None = None
gpxOpacity: float | None = None
collection_id: str | None = None
dist_km: float | None = None
elevation_gain: int | None = None
class CollectionBody(BaseModel):
name: str
@app.get("/api/plans")
async def list_plans(user: User = Depends(require_auth)) -> JSONResponse:
d = _PLANS_DIR / user.handle
if not d.exists():
return JSONResponse({"plans": []})
plans = []
_STRIP = {"geojson", "gpxTrack"}
for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True):
if p.name.startswith("_"):
continue
try:
data = json.loads(p.read_text(encoding="utf-8"))
plans.append({k: v for k, v in data.items() if k not in _STRIP})
except Exception:
pass
return JSONResponse({"plans": plans})
@app.post("/api/plans")
async def create_plan(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse:
plan_id = secrets.token_urlsafe(12)
now = int(time.time())
data: dict = {
"id": plan_id,
"name": body.name.strip() or "Unnamed route",
"waypoints": body.waypoints,
"profile": body.profile,
"created_at": now,
"updated_at": now,
}
if body.description:
data["description"] = body.description
if body.collection_id:
data["collection_id"] = body.collection_id
if body.dist_km is not None:
data["dist_km"] = body.dist_km
if body.elevation_gain is not None:
data["elevation_gain"] = body.elevation_gain
if body.geojson is not None:
data["geojson"] = body.geojson
if body.gpxTrack is not None:
data["gpxTrack"] = body.gpxTrack
data["gpxColorIdx"] = body.gpxColorIdx
data["gpxOpacity"] = body.gpxOpacity
_user_dir(user.handle).joinpath(f"{plan_id}.json").write_text(
json.dumps(data), encoding="utf-8"
)
return JSONResponse({"id": plan_id, "created_at": now})
@app.get("/api/plans/{plan_id}")
async def get_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse:
p = _plan_path(user.handle, plan_id)
if not p.exists():
raise HTTPException(404, "Plan not found")
return JSONResponse(json.loads(p.read_text(encoding="utf-8")))
@app.put("/api/plans/{plan_id}")
async def update_plan(
plan_id: str, body: PlanBody, user: User = Depends(require_auth)
) -> JSONResponse:
p = _plan_path(user.handle, plan_id)
if not p.exists():
raise HTTPException(404, "Plan not found")
data = json.loads(p.read_text(encoding="utf-8"))
data.update({
"name": body.name.strip() or data.get("name", "Unnamed route"),
"waypoints": body.waypoints,
"profile": body.profile,
"updated_at": int(time.time()),
})
# description: non-null sets, null clears
if body.description:
data["description"] = body.description
else:
data.pop("description", None)
# collection_id: non-null assigns, null unassigns
if body.collection_id:
data["collection_id"] = body.collection_id
else:
data.pop("collection_id", None)
if body.dist_km is not None:
data["dist_km"] = body.dist_km
if body.elevation_gain is not None:
data["elevation_gain"] = body.elevation_gain
if body.geojson is not None:
data["geojson"] = body.geojson
if body.gpxTrack is not None:
data["gpxTrack"] = body.gpxTrack
data["gpxColorIdx"] = body.gpxColorIdx
data["gpxOpacity"] = body.gpxOpacity
elif "gpxTrack" in data:
del data["gpxTrack"]
data.pop("gpxColorIdx", None)
data.pop("gpxOpacity", None)
p.write_text(json.dumps(data), encoding="utf-8")
return JSONResponse({"id": plan_id, "updated": True})
@app.delete("/api/plans/{plan_id}")
async def delete_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse:
p = _plan_path(user.handle, plan_id)
if not p.exists():
raise HTTPException(404, "Plan not found")
p.unlink()
return JSONResponse({"id": plan_id, "deleted": True})
# ── Collection endpoints ───────────────────────────────────────────────────────
@app.get("/api/collections")
async def list_collections(user: User = Depends(require_auth)) -> JSONResponse:
return JSONResponse({"collections": _read_collections(user.handle)})
@app.post("/api/collections")
async def create_collection(body: CollectionBody, user: User = Depends(require_auth)) -> JSONResponse:
col_id = secrets.token_urlsafe(12)
now = int(time.time())
cols = _read_collections(user.handle)
cols.append({"id": col_id, "name": body.name.strip() or "Unnamed collection", "created_at": now})
_write_collections(user.handle, cols)
return JSONResponse({"id": col_id, "created_at": now})
@app.put("/api/collections/{col_id}")
async def update_collection(
col_id: str, body: CollectionBody, user: User = Depends(require_auth)
) -> JSONResponse:
cols = _read_collections(user.handle)
for c in cols:
if c["id"] == col_id:
c["name"] = body.name.strip() or c["name"]
_write_collections(user.handle, cols)
return JSONResponse({"id": col_id, "updated": True})
raise HTTPException(404, "Collection not found")
@app.delete("/api/collections/{col_id}")
async def delete_collection(
col_id: str,
mode: str = Query(default="unassign"),
user: User = Depends(require_auth),
) -> JSONResponse:
cols = _read_collections(user.handle)
if not any(c["id"] == col_id for c in cols):
raise HTTPException(404, "Collection not found")
_write_collections(user.handle, [c for c in cols if c["id"] != col_id])
d = _PLANS_DIR / user.handle
if d.exists():
for p in d.glob("*.json"):
if p.name.startswith("_"):
continue
try:
data = json.loads(p.read_text(encoding="utf-8"))
if data.get("collection_id") == col_id:
if mode == "delete":
p.unlink()
else:
data.pop("collection_id", None)
p.write_text(json.dumps(data), encoding="utf-8")
except Exception:
pass
return JSONResponse({"id": col_id, "deleted": True})
# ── Shared plan endpoints ──────────────────────────────────────────────────────
_SHARED_STRIP = {"geojson", "gpxTrack"}
@app.get("/api/shared")
async def list_shared(user: User = Depends(require_auth)) -> JSONResponse:
d = _PLANS_DIR / "_shared"
if not d.exists():
return JSONResponse({"plans": []})
plans = []
for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True):
try:
data = json.loads(p.read_text(encoding="utf-8"))
plans.append({k: v for k, v in data.items() if k not in _SHARED_STRIP})
except Exception:
pass
return JSONResponse({"plans": plans})
@app.post("/api/shared")
async def create_shared(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse:
plan_id = secrets.token_urlsafe(12)
now = int(time.time())
data: dict = {
"id": plan_id,
"author": user.handle,
"name": body.name.strip() or "Unnamed route",
"waypoints": body.waypoints,
"profile": body.profile,
"created_at": now,
"updated_at": now,
}
if body.description:
data["description"] = body.description
if body.dist_km is not None:
data["dist_km"] = body.dist_km
if body.elevation_gain is not None:
data["elevation_gain"] = body.elevation_gain
if body.geojson is not None:
data["geojson"] = body.geojson
if body.gpxTrack is not None:
data["gpxTrack"] = body.gpxTrack
data["gpxColorIdx"] = body.gpxColorIdx
data["gpxOpacity"] = body.gpxOpacity
_shared_dir().joinpath(f"{plan_id}.json").write_text(json.dumps(data), encoding="utf-8")
return JSONResponse({"id": plan_id, "created_at": now})
@app.get("/api/shared/{plan_id}")
async def get_shared(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse:
p = _shared_plan_path(plan_id)
if not p.exists():
raise HTTPException(404, "Shared plan not found")
return JSONResponse(json.loads(p.read_text(encoding="utf-8")))
@app.put("/api/shared/{plan_id}")
async def update_shared(
plan_id: str, body: PlanBody, user: User = Depends(require_auth)
) -> JSONResponse:
p = _shared_plan_path(plan_id)
if not p.exists():
raise HTTPException(404, "Shared plan not found")
data = json.loads(p.read_text(encoding="utf-8"))
data.update({
"name": body.name.strip() or data.get("name", "Unnamed route"),
"waypoints": body.waypoints,
"profile": body.profile,
"updated_at": int(time.time()),
})
if body.description:
data["description"] = body.description
else:
data.pop("description", None)
if body.dist_km is not None:
data["dist_km"] = body.dist_km
if body.elevation_gain is not None:
data["elevation_gain"] = body.elevation_gain
if body.geojson is not None:
data["geojson"] = body.geojson
if body.gpxTrack is not None:
data["gpxTrack"] = body.gpxTrack
data["gpxColorIdx"] = body.gpxColorIdx
data["gpxOpacity"] = body.gpxOpacity
elif "gpxTrack" in data:
del data["gpxTrack"]
data.pop("gpxColorIdx", None)
data.pop("gpxOpacity", None)
p.write_text(json.dumps(data), encoding="utf-8")
return JSONResponse({"id": plan_id, "updated": True})
@app.delete("/api/shared/{plan_id}")
async def delete_shared(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse:
p = _shared_plan_path(plan_id)
if not p.exists():
raise HTTPException(404, "Shared plan not found")
p.unlink()
return JSONResponse({"id": plan_id, "deleted": True})