"""FastAPI edit sidecar for BincioWiki — reads and writes markdown pages.""" from __future__ import annotations import asyncio import os import re import secrets import sqlite3 import time from contextlib import contextmanager from pathlib import Path import bcrypt from fastapi import Cookie, Depends, FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel # Resolved at startup relative to the project root (one level above this file) _ROOT = Path(__file__).parent.parent pages_dir: Path = _ROOT / os.environ.get("WIKI_PAGES_DIR", "site/src/content/entries") stories_dir: Path = _ROOT / os.environ.get("WIKI_STORIES_DIR", "site/src/content/blog") site_dir: Path = _ROOT / "site" _DB_PATH = _ROOT / "data" / "wiki.db" _SAFE_SLUG = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_\-/]*$") _SESSION_TTL = 7 * 24 * 3600 # 7 days def _hash_password(password: str) -> str: return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def _verify_password(password: str, hashed: str) -> bool: return bcrypt.checkpw(password.encode(), hashed.encode()) # ── Database ────────────────────────────────────────────────────────────────── @contextmanager def _db(): con = sqlite3.connect(_DB_PATH) con.row_factory = sqlite3.Row try: yield con finally: con.close() def _init_db() -> None: _DB_PATH.parent.mkdir(parents=True, exist_ok=True) with _db() as con: con.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS sessions ( token TEXT PRIMARY KEY, user_id INTEGER NOT NULL, expires_at INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); """) con.commit() # Seed first admin user from env vars if the table is empty count = con.execute("SELECT COUNT(*) FROM users").fetchone()[0] if count == 0: admin_user = os.environ.get("WIKI_ADMIN_USER") admin_pass = os.environ.get("WIKI_ADMIN_PASSWORD") if admin_user and admin_pass: ph = _hash_password(admin_pass) con.execute( "INSERT INTO users (username, password_hash) VALUES (?, ?)", (admin_user, ph), ) con.commit() _init_db() # ── Auth helpers ────────────────────────────────────────────────────────────── def _get_session_user(token: str | None) -> dict | None: if not token: return None with _db() as con: row = con.execute( """SELECT u.id, u.username FROM sessions s JOIN users u ON u.id = s.user_id WHERE s.token = ? AND s.expires_at > ?""", (token, int(time.time())), ).fetchone() return dict(row) if row else None async def require_auth(bincio_session: str | None = Cookie(default=None)) -> dict: user = _get_session_user(bincio_session) if not user: raise HTTPException(401, "Authentication required") return user # ── App setup ───────────────────────────────────────────────────────────────── _extra_origin = os.environ.get("WIKI_ORIGIN", "") _origins = [_extra_origin] if _extra_origin else [] app = FastAPI(title="BincioWiki Edit Sidecar", docs_url=None, redoc_url=None) app.add_middleware(GZipMiddleware, minimum_size=1024) app.add_middleware( CORSMiddleware, allow_origins=_origins, allow_origin_regex=r"https?://localhost(:\d+)?", allow_credentials=True, allow_methods=["GET", "POST", "DELETE"], allow_headers=["Content-Type"], ) # ── Auth endpoints ──────────────────────────────────────────────────────────── class LoginBody(BaseModel): username: str password: str @app.post("/api/auth/login") async def login(body: LoginBody) -> JSONResponse: with _db() as con: row = con.execute( "SELECT id, username, password_hash FROM users WHERE username = ?", (body.username,), ).fetchone() if not row or not _verify_password(body.password, row["password_hash"]): raise HTTPException(401, "Credenziali non valide") token = secrets.token_urlsafe(32) expires = int(time.time()) + _SESSION_TTL with _db() as con: con.execute( "INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)", (token, row["id"], expires), ) con.commit() resp = JSONResponse({"username": row["username"]}) resp.set_cookie("bincio_session", token, httponly=True, samesite="lax", max_age=_SESSION_TTL) return resp @app.post("/api/auth/logout") async def logout(bincio_session: str | None = Cookie(default=None)) -> JSONResponse: if bincio_session: with _db() as con: con.execute("DELETE FROM sessions WHERE token = ?", (bincio_session,)) con.commit() resp = JSONResponse({"ok": True}) resp.delete_cookie("bincio_session", httponly=True, samesite="lax") return resp @app.get("/api/me") async def me(user: dict = Depends(require_auth)) -> JSONResponse: return JSONResponse({"username": user["username"]}) # ── File helpers ────────────────────────────────────────────────────────────── def _slug_to_path(slug: str, base: Path) -> Path: if not _SAFE_SLUG.match(slug): raise HTTPException(400, "Invalid slug — only alphanumeric, hyphens, underscores, and slashes allowed") resolved = (base / f"{slug}.md").resolve() if not str(resolved).startswith(str(base.resolve())): raise HTTPException(400, "Path traversal detected") return resolved class PageBody(BaseModel): content: str # ── Page endpoints (all require auth) ──────────────────────────────────────── def _list(base: Path) -> list[str]: if not base.exists(): return [] return [str(p.relative_to(base).with_suffix("")) for p in sorted(base.rglob("*.md"))] def _get(slug: str, base: Path) -> JSONResponse: path = _slug_to_path(slug, base) if not path.exists(): raise HTTPException(404, "Not found") return JSONResponse({"slug": slug, "content": path.read_text(encoding="utf-8")}) def _save(slug: str, body: PageBody, base: Path) -> JSONResponse: path = _slug_to_path(slug, base) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(body.content, encoding="utf-8") return JSONResponse({"slug": slug, "saved": True}) def _delete(slug: str, base: Path) -> JSONResponse: path = _slug_to_path(slug, base) if not path.exists(): raise HTTPException(404, "Not found") path.unlink() return JSONResponse({"slug": slug, "deleted": True}) # ── Page endpoints ──────────────────────────────────────────────────────────── @app.get("/pages") async def list_pages(user: dict = Depends(require_auth)) -> JSONResponse: return JSONResponse({"pages": _list(pages_dir)}) @app.get("/pages/{slug:path}") async def get_page(slug: str, user: dict = Depends(require_auth)) -> JSONResponse: return _get(slug, pages_dir) @app.post("/pages/{slug:path}") async def save_page(slug: str, body: PageBody, user: dict = Depends(require_auth)) -> JSONResponse: return _save(slug, body, pages_dir) @app.delete("/pages/{slug:path}") async def delete_page(slug: str, user: dict = Depends(require_auth)) -> JSONResponse: return _delete(slug, pages_dir) # ── Story endpoints ─────────────────────────────────────────────────────────── @app.get("/stories") async def list_stories(user: dict = Depends(require_auth)) -> JSONResponse: return JSONResponse({"stories": _list(stories_dir)}) @app.get("/stories/{slug:path}") async def get_story(slug: str, user: dict = Depends(require_auth)) -> JSONResponse: return _get(slug, stories_dir) @app.post("/stories/{slug:path}") async def save_story(slug: str, body: PageBody, user: dict = Depends(require_auth)) -> JSONResponse: return _save(slug, body, stories_dir) @app.delete("/stories/{slug:path}") async def delete_story(slug: str, user: dict = Depends(require_auth)) -> JSONResponse: return _delete(slug, stories_dir) @app.post("/rebuild") async def rebuild(user: dict = Depends(require_auth)) -> JSONResponse: """Trigger an astro build of the site (non-blocking).""" try: proc = await asyncio.create_subprocess_exec( "npm", "run", "build", cwd=site_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120) except asyncio.TimeoutError: proc.kill() raise HTTPException(504, "Build timed out after 120s") return JSONResponse({ "success": proc.returncode == 0, "stdout": stdout.decode()[-3000:] if stdout else "", "stderr": stderr.decode()[-3000:] if stderr else "", }) except HTTPException: raise except Exception as e: raise HTTPException(500, str(e))