"""BincioPlanner API — save/load route plans. Auth: reads bincio_session cookie, validates against shared instance.db (same strategy as bincio_wiki). Storage: JSON files at $PLANS_DIR/{handle}/{plan_id}.json Collections: $PLANS_DIR/{handle}/_collections.json Shared plans: $PLANS_DIR/_shared/{plan_id}.json (any authed user can read/write/delete) Run: uvicorn server:app --host 127.0.0.1 --port 8002 Env vars: SHARED_DB_PATH path to bincio_activity instance.db (default: /var/bincio/data/instance.db) PLANS_DIR root dir for plan JSON files (default: /var/bincio_planner) """ from __future__ import annotations import json import os import re import secrets import sqlite3 import time from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path from typing import Optional from fastapi import Cookie, Depends, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel _SHARED_DB_PATH = Path(os.environ.get("SHARED_DB_PATH", "/var/bincio/data/instance.db")) _PLANS_DIR = Path(os.environ.get("PLANS_DIR", "/var/bincio_planner")) _SESSION_COOKIE = "bincio_session" _DEV_HANDLE = os.environ.get("DEV_HANDLE", "") _SAFE_ID = re.compile(r"^[A-Za-z0-9_-]{1,32}$") # ── Auth ─────────────────────────────────────────────────────────────────────── @dataclass class User: handle: str display_name: str @contextmanager def _db(): if not _SHARED_DB_PATH.exists(): raise HTTPException(503, f"Shared DB not found at {_SHARED_DB_PATH}. " "Set SHARED_DB_PATH or run bincio_activity first.") con = sqlite3.connect(_SHARED_DB_PATH, check_same_thread=False) con.row_factory = sqlite3.Row con.execute("PRAGMA journal_mode=WAL") try: yield con finally: con.close() def _get_session_user(token: str) -> Optional[User]: try: with _db() as con: row = con.execute( "SELECT s.handle, s.expires_at, u.display_name, u.activity_access, u.suspended " "FROM sessions s JOIN users u ON s.handle = u.handle " "WHERE s.token = ?", (token,), ).fetchone() except HTTPException: raise except Exception: return None if not row: return None if row["expires_at"] < int(time.time()): return None if not row["activity_access"]: return None if row["suspended"]: return None return User(handle=row["handle"], display_name=row["display_name"]) async def require_auth(bincio_session: Optional[str] = Cookie(default=None)) -> User: if _DEV_HANDLE: return User(handle=_DEV_HANDLE, display_name=_DEV_HANDLE) if not bincio_session: raise HTTPException(401, "Authentication required") user = _get_session_user(bincio_session) if not user: raise HTTPException(401, "Authentication required") return user # ── App ──────────────────────────────────────────────────────────────────────── app = FastAPI(title="BincioPlanner API", docs_url=None, redoc_url=None) app.add_middleware(GZipMiddleware, minimum_size=1024) app.add_middleware( CORSMiddleware, allow_origin_regex=r"https?://localhost(:\d+)?", allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Content-Type"], ) # ── Storage helpers ──────────────────────────────────────────────────────────── def _user_dir(handle: str) -> Path: d = _PLANS_DIR / handle d.mkdir(parents=True, exist_ok=True) return d def _plan_path(handle: str, plan_id: str) -> Path: if not _SAFE_ID.match(plan_id): raise HTTPException(400, "Invalid plan id") return _PLANS_DIR / handle / f"{plan_id}.json" def _collections_path(handle: str) -> Path: return _PLANS_DIR / handle / "_collections.json" def _read_collections(handle: str) -> list[dict]: p = _collections_path(handle) if not p.exists(): return [] try: return json.loads(p.read_text(encoding="utf-8")) except Exception: return [] def _write_collections(handle: str, cols: list[dict]) -> None: _user_dir(handle) _collections_path(handle).write_text(json.dumps(cols), encoding="utf-8") def _shared_dir() -> Path: d = _PLANS_DIR / "_shared" d.mkdir(parents=True, exist_ok=True) return d def _shared_plan_path(plan_id: str) -> Path: if not _SAFE_ID.match(plan_id): raise HTTPException(400, "Invalid plan id") return _PLANS_DIR / "_shared" / f"{plan_id}.json" # ── Endpoints ────────────────────────────────────────────────────────────────── class PlanBody(BaseModel): name: str waypoints: list[dict] profile: str description: str | None = None geojson: dict | None = None gpxTrack: dict | None = None # full GeoJSON of reference track, if any GPX segments used gpxColorIdx: int | None = None gpxOpacity: float | None = None collection_id: str | None = None dist_km: float | None = None elevation_gain: int | None = None class CollectionBody(BaseModel): name: str @app.get("/api/plans") async def list_plans(user: User = Depends(require_auth)) -> JSONResponse: d = _PLANS_DIR / user.handle if not d.exists(): return JSONResponse({"plans": []}) plans = [] _STRIP = {"geojson", "gpxTrack"} for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True): if p.name.startswith("_"): continue try: data = json.loads(p.read_text(encoding="utf-8")) plans.append({k: v for k, v in data.items() if k not in _STRIP}) except Exception: pass return JSONResponse({"plans": plans}) @app.post("/api/plans") async def create_plan(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse: plan_id = secrets.token_urlsafe(12) now = int(time.time()) data: dict = { "id": plan_id, "name": body.name.strip() or "Unnamed route", "waypoints": body.waypoints, "profile": body.profile, "created_at": now, "updated_at": now, } if body.description: data["description"] = body.description if body.collection_id: data["collection_id"] = body.collection_id if body.dist_km is not None: data["dist_km"] = body.dist_km if body.elevation_gain is not None: data["elevation_gain"] = body.elevation_gain if body.geojson is not None: data["geojson"] = body.geojson if body.gpxTrack is not None: data["gpxTrack"] = body.gpxTrack data["gpxColorIdx"] = body.gpxColorIdx data["gpxOpacity"] = body.gpxOpacity _user_dir(user.handle).joinpath(f"{plan_id}.json").write_text( json.dumps(data), encoding="utf-8" ) return JSONResponse({"id": plan_id, "created_at": now}) @app.get("/api/plans/{plan_id}") async def get_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") return JSONResponse(json.loads(p.read_text(encoding="utf-8"))) @app.put("/api/plans/{plan_id}") async def update_plan( plan_id: str, body: PlanBody, user: User = Depends(require_auth) ) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") data = json.loads(p.read_text(encoding="utf-8")) data.update({ "name": body.name.strip() or data.get("name", "Unnamed route"), "waypoints": body.waypoints, "profile": body.profile, "updated_at": int(time.time()), }) # description: non-null sets, null clears if body.description: data["description"] = body.description else: data.pop("description", None) # collection_id: non-null assigns, null unassigns if body.collection_id: data["collection_id"] = body.collection_id else: data.pop("collection_id", None) if body.dist_km is not None: data["dist_km"] = body.dist_km if body.elevation_gain is not None: data["elevation_gain"] = body.elevation_gain if body.geojson is not None: data["geojson"] = body.geojson if body.gpxTrack is not None: data["gpxTrack"] = body.gpxTrack data["gpxColorIdx"] = body.gpxColorIdx data["gpxOpacity"] = body.gpxOpacity elif "gpxTrack" in data: del data["gpxTrack"] data.pop("gpxColorIdx", None) data.pop("gpxOpacity", None) p.write_text(json.dumps(data), encoding="utf-8") return JSONResponse({"id": plan_id, "updated": True}) @app.delete("/api/plans/{plan_id}") async def delete_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") p.unlink() return JSONResponse({"id": plan_id, "deleted": True}) # ── Collection endpoints ─────────────────────────────────────────────────────── @app.get("/api/collections") async def list_collections(user: User = Depends(require_auth)) -> JSONResponse: return JSONResponse({"collections": _read_collections(user.handle)}) @app.post("/api/collections") async def create_collection(body: CollectionBody, user: User = Depends(require_auth)) -> JSONResponse: col_id = secrets.token_urlsafe(12) now = int(time.time()) cols = _read_collections(user.handle) cols.append({"id": col_id, "name": body.name.strip() or "Unnamed collection", "created_at": now}) _write_collections(user.handle, cols) return JSONResponse({"id": col_id, "created_at": now}) @app.put("/api/collections/{col_id}") async def update_collection( col_id: str, body: CollectionBody, user: User = Depends(require_auth) ) -> JSONResponse: cols = _read_collections(user.handle) for c in cols: if c["id"] == col_id: c["name"] = body.name.strip() or c["name"] _write_collections(user.handle, cols) return JSONResponse({"id": col_id, "updated": True}) raise HTTPException(404, "Collection not found") @app.delete("/api/collections/{col_id}") async def delete_collection( col_id: str, mode: str = Query(default="unassign"), user: User = Depends(require_auth), ) -> JSONResponse: cols = _read_collections(user.handle) if not any(c["id"] == col_id for c in cols): raise HTTPException(404, "Collection not found") _write_collections(user.handle, [c for c in cols if c["id"] != col_id]) d = _PLANS_DIR / user.handle if d.exists(): for p in d.glob("*.json"): if p.name.startswith("_"): continue try: data = json.loads(p.read_text(encoding="utf-8")) if data.get("collection_id") == col_id: if mode == "delete": p.unlink() else: data.pop("collection_id", None) p.write_text(json.dumps(data), encoding="utf-8") except Exception: pass return JSONResponse({"id": col_id, "deleted": True}) # ── Shared plan endpoints ────────────────────────────────────────────────────── _SHARED_STRIP = {"geojson", "gpxTrack"} @app.get("/api/shared") async def list_shared(user: User = Depends(require_auth)) -> JSONResponse: d = _PLANS_DIR / "_shared" if not d.exists(): return JSONResponse({"plans": []}) plans = [] for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True): try: data = json.loads(p.read_text(encoding="utf-8")) plans.append({k: v for k, v in data.items() if k not in _SHARED_STRIP}) except Exception: pass return JSONResponse({"plans": plans}) @app.post("/api/shared") async def create_shared(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse: plan_id = secrets.token_urlsafe(12) now = int(time.time()) data: dict = { "id": plan_id, "author": user.handle, "name": body.name.strip() or "Unnamed route", "waypoints": body.waypoints, "profile": body.profile, "created_at": now, "updated_at": now, } if body.description: data["description"] = body.description if body.dist_km is not None: data["dist_km"] = body.dist_km if body.elevation_gain is not None: data["elevation_gain"] = body.elevation_gain if body.geojson is not None: data["geojson"] = body.geojson if body.gpxTrack is not None: data["gpxTrack"] = body.gpxTrack data["gpxColorIdx"] = body.gpxColorIdx data["gpxOpacity"] = body.gpxOpacity _shared_dir().joinpath(f"{plan_id}.json").write_text(json.dumps(data), encoding="utf-8") return JSONResponse({"id": plan_id, "created_at": now}) @app.get("/api/shared/{plan_id}") async def get_shared(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _shared_plan_path(plan_id) if not p.exists(): raise HTTPException(404, "Shared plan not found") return JSONResponse(json.loads(p.read_text(encoding="utf-8"))) @app.put("/api/shared/{plan_id}") async def update_shared( plan_id: str, body: PlanBody, user: User = Depends(require_auth) ) -> JSONResponse: p = _shared_plan_path(plan_id) if not p.exists(): raise HTTPException(404, "Shared plan not found") data = json.loads(p.read_text(encoding="utf-8")) data.update({ "name": body.name.strip() or data.get("name", "Unnamed route"), "waypoints": body.waypoints, "profile": body.profile, "updated_at": int(time.time()), }) if body.description: data["description"] = body.description else: data.pop("description", None) if body.dist_km is not None: data["dist_km"] = body.dist_km if body.elevation_gain is not None: data["elevation_gain"] = body.elevation_gain if body.geojson is not None: data["geojson"] = body.geojson if body.gpxTrack is not None: data["gpxTrack"] = body.gpxTrack data["gpxColorIdx"] = body.gpxColorIdx data["gpxOpacity"] = body.gpxOpacity elif "gpxTrack" in data: del data["gpxTrack"] data.pop("gpxColorIdx", None) data.pop("gpxOpacity", None) p.write_text(json.dumps(data), encoding="utf-8") return JSONResponse({"id": plan_id, "updated": True}) @app.delete("/api/shared/{plan_id}") async def delete_shared(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _shared_plan_path(plan_id) if not p.exists(): raise HTTPException(404, "Shared plan not found") p.unlink() return JSONResponse({"id": plan_id, "deleted": True})