"""BincioPlanner API — save/load route plans. Auth: reads bincio_session cookie, validates against shared instance.db (same strategy as bincio_wiki). Storage: JSON files at $PLANS_DIR/{handle}/{plan_id}.json Run: uvicorn server:app --host 127.0.0.1 --port 8002 Env vars: SHARED_DB_PATH path to bincio_activity instance.db (default: /var/bincio/data/instance.db) PLANS_DIR root dir for plan JSON files (default: /var/bincio_planner) """ from __future__ import annotations import json import os import re import secrets import sqlite3 import time from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path from typing import Optional from fastapi import Cookie, Depends, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel _SHARED_DB_PATH = Path(os.environ.get("SHARED_DB_PATH", "/var/bincio/data/instance.db")) _PLANS_DIR = Path(os.environ.get("PLANS_DIR", "/var/bincio_planner")) _SESSION_COOKIE = "bincio_session" _SAFE_ID = re.compile(r"^[A-Za-z0-9_-]{1,32}$") # ── Auth ─────────────────────────────────────────────────────────────────────── @dataclass class User: handle: str display_name: str @contextmanager def _db(): if not _SHARED_DB_PATH.exists(): raise HTTPException(503, f"Shared DB not found at {_SHARED_DB_PATH}. " "Set SHARED_DB_PATH or run bincio_activity first.") con = sqlite3.connect(_SHARED_DB_PATH, check_same_thread=False) con.row_factory = sqlite3.Row con.execute("PRAGMA journal_mode=WAL") try: yield con finally: con.close() def _get_session_user(token: str) -> Optional[User]: try: with _db() as con: row = con.execute( "SELECT s.handle, s.expires_at, u.display_name, u.activity_access, u.suspended " "FROM sessions s JOIN users u ON s.handle = u.handle " "WHERE s.token = ?", (token,), ).fetchone() except HTTPException: raise except Exception: return None if not row: return None if row["expires_at"] < int(time.time()): return None if not row["activity_access"]: return None if row["suspended"]: return None return User(handle=row["handle"], display_name=row["display_name"]) async def require_auth(bincio_session: Optional[str] = Cookie(default=None)) -> User: if not bincio_session: raise HTTPException(401, "Authentication required") user = _get_session_user(bincio_session) if not user: raise HTTPException(401, "Authentication required") return user # ── App ──────────────────────────────────────────────────────────────────────── app = FastAPI(title="BincioPlanner API", docs_url=None, redoc_url=None) app.add_middleware(GZipMiddleware, minimum_size=1024) app.add_middleware( CORSMiddleware, allow_origin_regex=r"https?://localhost(:\d+)?", allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Content-Type"], ) # ── Storage helpers ──────────────────────────────────────────────────────────── def _user_dir(handle: str) -> Path: d = _PLANS_DIR / handle d.mkdir(parents=True, exist_ok=True) return d def _plan_path(handle: str, plan_id: str) -> Path: if not _SAFE_ID.match(plan_id): raise HTTPException(400, "Invalid plan id") return _PLANS_DIR / handle / f"{plan_id}.json" # ── Endpoints ────────────────────────────────────────────────────────────────── class PlanBody(BaseModel): name: str waypoints: list[dict] profile: str geojson: dict | None = None @app.get("/api/plans") async def list_plans(user: User = Depends(require_auth)) -> JSONResponse: d = _PLANS_DIR / user.handle if not d.exists(): return JSONResponse({"plans": []}) plans = [] for p in sorted(d.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True): try: data = json.loads(p.read_text(encoding="utf-8")) plans.append({k: v for k, v in data.items() if k != "geojson"}) except Exception: pass return JSONResponse({"plans": plans}) @app.post("/api/plans") async def create_plan(body: PlanBody, user: User = Depends(require_auth)) -> JSONResponse: plan_id = secrets.token_urlsafe(12) now = int(time.time()) data: dict = { "id": plan_id, "name": body.name.strip() or "Unnamed route", "waypoints": body.waypoints, "profile": body.profile, "created_at": now, "updated_at": now, } if body.geojson is not None: data["geojson"] = body.geojson _user_dir(user.handle).joinpath(f"{plan_id}.json").write_text( json.dumps(data), encoding="utf-8" ) return JSONResponse({"id": plan_id, "created_at": now}) @app.get("/api/plans/{plan_id}") async def get_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") return JSONResponse(json.loads(p.read_text(encoding="utf-8"))) @app.put("/api/plans/{plan_id}") async def update_plan( plan_id: str, body: PlanBody, user: User = Depends(require_auth) ) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") data = json.loads(p.read_text(encoding="utf-8")) data.update({ "name": body.name.strip() or data.get("name", "Unnamed route"), "waypoints": body.waypoints, "profile": body.profile, "updated_at": int(time.time()), }) if body.geojson is not None: data["geojson"] = body.geojson p.write_text(json.dumps(data), encoding="utf-8") return JSONResponse({"id": plan_id, "updated": True}) @app.delete("/api/plans/{plan_id}") async def delete_plan(plan_id: str, user: User = Depends(require_auth)) -> JSONResponse: p = _plan_path(user.handle, plan_id) if not p.exists(): raise HTTPException(404, "Plan not found") p.unlink() return JSONResponse({"id": plan_id, "deleted": True})