"""BincioPlanner API — save/load route plans. Auth: validates bincio_session JWT from bincio-auth. Accepts RS256 OIDC tokens (BINCIO_OIDC_ISSUER) and HS256 session tokens (BINCIO_AUTH_JWT_SECRET). Storage: JSON files at $PLANS_DIR/{handle}/{plan_id}.json Collections: $PLANS_DIR/{handle}/_collections.json Shared plans: $PLANS_DIR/_shared/{plan_id}.json (any authed user can read/write/delete) Run: uvicorn server:app --host 127.0.0.1 --port 8002 Env vars: BINCIO_OIDC_ISSUER OIDC issuer URL — JWKS fetched from {issuer}/.well-known/jwks.json BINCIO_AUTH_JWT_SECRET HS256 fallback secret PLANS_DIR root dir for plan JSON files (default: /var/bincio_planner) DEV_HANDLE bypass auth in local dev """ from __future__ import annotations import json import os import re import secrets import time import urllib.request from dataclasses import dataclass from pathlib import Path from typing import Optional import jwt as _jwt from jwt.algorithms import RSAAlgorithm as _RSAAlgorithm from fastapi import Cookie, Depends, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel _JWT_SECRET = os.environ.get("BINCIO_AUTH_JWT_SECRET", "") _OIDC_ISSUER = os.environ.get("BINCIO_OIDC_ISSUER", "") _PLANS_DIR = Path(os.environ.get("PLANS_DIR", "/var/bincio_planner")) _DEV_HANDLE = os.environ.get("DEV_HANDLE", "") _SAFE_ID = re.compile(r"^[A-Za-z0-9_-]{1,32}$") # Load RS256 public key from JWKS at startup. _rs256_key = None if _OIDC_ISSUER: try: with urllib.request.urlopen(f"{_OIDC_ISSUER}/.well-known/jwks.json", timeout=5) as _r: _jwks = json.loads(_r.read()) for _k in _jwks.get("keys", []): if _k.get("kty") == "RSA": _rs256_key = _RSAAlgorithm.from_jwk(json.dumps(_k)) break except Exception as _e: print(f"Warning: could not load JWKS from {_OIDC_ISSUER}: {_e}") # ── Auth ─────────────────────────────────────────────────────────────────────── @dataclass class User: handle: str display_name: str def _decode_session(token: str) -> Optional[User]: # Try RS256 (OIDC id_token from bincio-auth) if _rs256_key: try: payload = _jwt.decode(token, _rs256_key, algorithms=["RS256"], options={"verify_aud": False}) handle = payload.get("sub") if handle and payload.get("activity_access"): return User(handle=handle, display_name=payload.get("name") or payload.get("display_name") or handle) except _jwt.PyJWTError: pass # Fall back to HS256 session token if not _JWT_SECRET: return None try: payload = _jwt.decode(token, _JWT_SECRET, algorithms=["HS256"]) except _jwt.PyJWTError: return None handle = payload.get("sub") if not handle or not payload.get("activity_access"): return None return User(handle=handle, display_name=payload.get("display_name", handle)) async def require_auth(bincio_session: Optional[str] = Cookie(default=None)) -> User: if _DEV_HANDLE: return User(handle=_DEV_HANDLE, display_name=_DEV_HANDLE) if not bincio_session: raise HTTPException(401, "Authentication required") user = _decode_session(bincio_session) if not user: raise HTTPException(401, "Authentication required") return user # ── App ──────────────────────────────────────────────────────────────────────── app = FastAPI(title="BincioPlanner API", docs_url=None, redoc_url=None) app.add_middleware(GZipMiddleware, minimum_size=1024) app.add_middleware( CORSMiddleware, allow_origin_regex=r"https?://localhost(:\d+)?", allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Content-Type"], ) # ── Storage helpers ──────────────────────────────────────────────────────────── def _user_dir(handle: str) -> Path: d = _PLANS_DIR / handle d.mkdir(parents=True, exist_ok=True) return d def _plan_path(handle: str, plan_id: str) -> Path: if not _SAFE_ID.match(plan_id): raise HTTPException(400, "Invalid plan id") return _PLANS_DIR / handle / f"{plan_id}.json" def _collections_path(handle: str) -> Path: return _PLANS_DIR / handle / "_collections.json" def _read_collections(handle: str) -> list[dict]: p = _collections_path(handle) if not p.exists(): return [] try: return json.loads(p.read_text(encoding="utf-8")) except Exception: return [] def _write_collections(handle: str, cols: list[dict]) -> None: _user_dir(handle) _collections_path(handle).write_text(json.dumps(cols), encoding="utf-8") def _shared_dir() -> Path: d = _PLANS_DIR / "_shared" d.mkdir(parents=True, exist_ok=True) return d def _shared_plan_path(plan_id: str) -> Path: if not _SAFE_ID.match(plan_id): raise HTTPException(400, "Invalid plan id") return _PLANS_DIR / "_shared" / f"{plan_id}.json" # ── Endpoints ────────────────────────────────────────────────────────────────── class PlanBody(BaseModel): name: str waypoints: list[dict] profile: str description: str | None = None geojson: dict | None = None gpxTrack: dict | None = None # full GeoJSON of reference track, if any GPX segments used gpxColorIdx: int | None = None gpxOpacity: float | None = None collection_id: str | None = None dist_km: float | None = None elevation_gain: int | None = None class CollectionBody(BaseModel): name: str @app.get("/api/plans") async def list_plans(user: User = Depends(require_auth)) -> JSONResponse: d = _PLANS_DIR / user.handle if not d.exists(): return JSONResponse({"plans": []}) plans = [] _STRIP = {"geojson", "gpxTrack"} for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True): if p.name.startswith("_"): continue try: data = json.loads(p.read_text(encoding="utf-8")) plans.append({k: v for k, v in data.items() if k not in _STRIP}) except Exception: pass return JSONResponse({"plans": plans}) @app.post("/api/plans") async def create_plan(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse: plan_id = secrets.token_urlsafe(12) now = int(time.time()) data: dict = { "id": plan_id, "name": body.name.strip() or "Unnamed route", "waypoints": body.waypoints, "profile": body.profile, "created_at": now, "updated_at": now, } if body.description: data["description"] = body.description if body.collection_id: data["collection_id"] = body.collection_id if body.dist_km is not None: data["dist_km"] = body.dist_km if body.elevation_gain is not None: data["elevation_gain"] = body.elevation_gain if body.geojson is not None: data["geojson"] = body.geojson if body.gpxTrack is not None: data["gpxTrack"] = body.gpxTrack data["gpxColorIdx"] = body.gpxColorIdx data["gpxOpacity"] = body.gpxOpacity _user_dir(user.handle).joinpath(f"{plan_id}.json").write_text( json.dumps(data), encoding="utf-8" ) return JSONResponse({"id": plan_id, "created_at": now}) @app.get("/api/plans/{plan_id}") async def get_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") return JSONResponse(json.loads(p.read_text(encoding="utf-8"))) @app.put("/api/plans/{plan_id}") async def update_plan( plan_id: str, body: PlanBody, user: User = Depends(require_auth) ) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") data = json.loads(p.read_text(encoding="utf-8")) data.update({ "name": body.name.strip() or data.get("name", "Unnamed route"), "waypoints": body.waypoints, "profile": body.profile, "updated_at": int(time.time()), }) # description: non-null sets, null clears if body.description: data["description"] = body.description else: data.pop("description", None) # collection_id: non-null assigns, null unassigns if body.collection_id: data["collection_id"] = body.collection_id else: data.pop("collection_id", None) if body.dist_km is not None: data["dist_km"] = body.dist_km if body.elevation_gain is not None: data["elevation_gain"] = body.elevation_gain if body.geojson is not None: data["geojson"] = body.geojson if body.gpxTrack is not None: data["gpxTrack"] = body.gpxTrack data["gpxColorIdx"] = body.gpxColorIdx data["gpxOpacity"] = body.gpxOpacity elif "gpxTrack" in data: del data["gpxTrack"] data.pop("gpxColorIdx", None) data.pop("gpxOpacity", None) p.write_text(json.dumps(data), encoding="utf-8") return JSONResponse({"id": plan_id, "updated": True}) @app.delete("/api/plans/{plan_id}") async def delete_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") p.unlink() return JSONResponse({"id": plan_id, "deleted": True}) # ── Collection endpoints ─────────────────────────────────────────────────────── @app.get("/api/collections") async def list_collections(user: User = Depends(require_auth)) -> JSONResponse: return JSONResponse({"collections": _read_collections(user.handle)}) @app.post("/api/collections") async def create_collection(body: CollectionBody, user: User = Depends(require_auth)) -> JSONResponse: col_id = secrets.token_urlsafe(12) now = int(time.time()) cols = _read_collections(user.handle) cols.append({"id": col_id, "name": body.name.strip() or "Unnamed collection", "created_at": now}) _write_collections(user.handle, cols) return JSONResponse({"id": col_id, "created_at": now}) @app.put("/api/collections/{col_id}") async def update_collection( col_id: str, body: CollectionBody, user: User = Depends(require_auth) ) -> JSONResponse: cols = _read_collections(user.handle) for c in cols: if c["id"] == col_id: c["name"] = body.name.strip() or c["name"] _write_collections(user.handle, cols) return JSONResponse({"id": col_id, "updated": True}) raise HTTPException(404, "Collection not found") @app.delete("/api/collections/{col_id}") async def delete_collection( col_id: str, mode: str = Query(default="unassign"), user: User = Depends(require_auth), ) -> JSONResponse: cols = _read_collections(user.handle) if not any(c["id"] == col_id for c in cols): raise HTTPException(404, "Collection not found") _write_collections(user.handle, [c for c in cols if c["id"] != col_id]) d = _PLANS_DIR / user.handle if d.exists(): for p in d.glob("*.json"): if p.name.startswith("_"): continue try: data = json.loads(p.read_text(encoding="utf-8")) if data.get("collection_id") == col_id: if mode == "delete": p.unlink() else: data.pop("collection_id", None) p.write_text(json.dumps(data), encoding="utf-8") except Exception: pass return JSONResponse({"id": col_id, "deleted": True}) # ── Shared plan endpoints ────────────────────────────────────────────────────── _SHARED_STRIP = {"geojson", "gpxTrack"} @app.get("/api/shared") async def list_shared(user: User = Depends(require_auth)) -> JSONResponse: d = _PLANS_DIR / "_shared" if not d.exists(): return JSONResponse({"plans": []}) plans = [] for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True): try: data = json.loads(p.read_text(encoding="utf-8")) plans.append({k: v for k, v in data.items() if k not in _SHARED_STRIP}) except Exception: pass return JSONResponse({"plans": plans}) @app.post("/api/shared") async def create_shared(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse: plan_id = secrets.token_urlsafe(12) now = int(time.time()) data: dict = { "id": plan_id, "author": user.handle, "name": body.name.strip() or "Unnamed route", "waypoints": body.waypoints, "profile": body.profile, "created_at": now, "updated_at": now, } if body.description: data["description"] = body.description if body.dist_km is not None: data["dist_km"] = body.dist_km if body.elevation_gain is not None: data["elevation_gain"] = body.elevation_gain if body.geojson is not None: data["geojson"] = body.geojson if body.gpxTrack is not None: data["gpxTrack"] = body.gpxTrack data["gpxColorIdx"] = body.gpxColorIdx data["gpxOpacity"] = body.gpxOpacity _shared_dir().joinpath(f"{plan_id}.json").write_text(json.dumps(data), encoding="utf-8") return JSONResponse({"id": plan_id, "created_at": now}) @app.get("/api/shared/{plan_id}") async def get_shared(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _shared_plan_path(plan_id) if not p.exists(): raise HTTPException(404, "Shared plan not found") return JSONResponse(json.loads(p.read_text(encoding="utf-8"))) @app.put("/api/shared/{plan_id}") async def update_shared( plan_id: str, body: PlanBody, user: User = Depends(require_auth) ) -> JSONResponse: p = _shared_plan_path(plan_id) if not p.exists(): raise HTTPException(404, "Shared plan not found") data = json.loads(p.read_text(encoding="utf-8")) data.update({ "name": body.name.strip() or data.get("name", "Unnamed route"), "waypoints": body.waypoints, "profile": body.profile, "updated_at": int(time.time()), }) if body.description: data["description"] = body.description else: data.pop("description", None) if body.dist_km is not None: data["dist_km"] = body.dist_km if body.elevation_gain is not None: data["elevation_gain"] = body.elevation_gain if body.geojson is not None: data["geojson"] = body.geojson if body.gpxTrack is not None: data["gpxTrack"] = body.gpxTrack data["gpxColorIdx"] = body.gpxColorIdx data["gpxOpacity"] = body.gpxOpacity elif "gpxTrack" in data: del data["gpxTrack"] data.pop("gpxColorIdx", None) data.pop("gpxOpacity", None) p.write_text(json.dumps(data), encoding="utf-8") return JSONResponse({"id": plan_id, "updated": True}) @app.delete("/api/shared/{plan_id}") async def delete_shared(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _shared_plan_path(plan_id) if not p.exists(): raise HTTPException(404, "Shared plan not found") p.unlink() return JSONResponse({"id": plan_id, "deleted": True})