Files
bincio-wiki/edit/server.py
T

245 lines
8.7 KiB
Python

"""FastAPI edit sidecar for BincioWiki — reads and writes markdown pages."""
from __future__ import annotations
import os
import re
import secrets
import sqlite3
import subprocess
import time
from contextlib import contextmanager
from pathlib import Path
import bcrypt
from fastapi import Cookie, Depends, FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# Resolved at startup relative to the project root (one level above this file)
_ROOT = Path(__file__).parent.parent
pages_dir: Path = _ROOT / os.environ.get("WIKI_PAGES_DIR", "pages")
site_dir: Path = _ROOT / "site"
_DB_PATH = _ROOT / "data" / "wiki.db"
_SAFE_SLUG = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_\-/]*$")
_SESSION_TTL = 7 * 24 * 3600 # 7 days
def _hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def _verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
# ── Database ──────────────────────────────────────────────────────────────────
@contextmanager
def _db():
con = sqlite3.connect(_DB_PATH)
con.row_factory = sqlite3.Row
try:
yield con
finally:
con.close()
def _init_db() -> None:
_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with _db() as con:
con.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
""")
con.commit()
# Seed first admin user from env vars if the table is empty
count = con.execute("SELECT COUNT(*) FROM users").fetchone()[0]
if count == 0:
admin_user = os.environ.get("WIKI_ADMIN_USER")
admin_pass = os.environ.get("WIKI_ADMIN_PASSWORD")
if admin_user and admin_pass:
ph = _hash_password(admin_pass)
con.execute(
"INSERT INTO users (username, password_hash) VALUES (?, ?)",
(admin_user, ph),
)
con.commit()
_init_db()
# ── Auth helpers ──────────────────────────────────────────────────────────────
def _get_session_user(token: str | None) -> dict | None:
if not token:
return None
with _db() as con:
row = con.execute(
"""SELECT u.id, u.username
FROM sessions s JOIN users u ON u.id = s.user_id
WHERE s.token = ? AND s.expires_at > ?""",
(token, int(time.time())),
).fetchone()
return dict(row) if row else None
async def require_auth(bincio_session: str | None = Cookie(default=None)) -> dict:
user = _get_session_user(bincio_session)
if not user:
raise HTTPException(401, "Authentication required")
return user
# ── App setup ─────────────────────────────────────────────────────────────────
_extra_origin = os.environ.get("WIKI_ORIGIN", "")
_origins = [_extra_origin] if _extra_origin else []
app = FastAPI(title="BincioWiki Edit Sidecar", docs_url=None, redoc_url=None)
app.add_middleware(GZipMiddleware, minimum_size=1024)
app.add_middleware(
CORSMiddleware,
allow_origins=_origins,
allow_origin_regex=r"https?://localhost(:\d+)?",
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["Content-Type"],
)
# ── Auth endpoints ────────────────────────────────────────────────────────────
class LoginBody(BaseModel):
username: str
password: str
@app.post("/api/auth/login")
async def login(body: LoginBody) -> JSONResponse:
with _db() as con:
row = con.execute(
"SELECT id, username, password_hash FROM users WHERE username = ?",
(body.username,),
).fetchone()
if not row or not _verify_password(body.password, row["password_hash"]):
raise HTTPException(401, "Credenziali non valide")
token = secrets.token_urlsafe(32)
expires = int(time.time()) + _SESSION_TTL
with _db() as con:
con.execute(
"INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)",
(token, row["id"], expires),
)
con.commit()
resp = JSONResponse({"username": row["username"]})
resp.set_cookie("bincio_session", token, httponly=True, samesite="lax", max_age=_SESSION_TTL)
return resp
@app.post("/api/auth/logout")
async def logout(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
if bincio_session:
with _db() as con:
con.execute("DELETE FROM sessions WHERE token = ?", (bincio_session,))
con.commit()
resp = JSONResponse({"ok": True})
resp.delete_cookie("bincio_session", httponly=True, samesite="lax")
return resp
@app.get("/api/me")
async def me(user: dict = Depends(require_auth)) -> JSONResponse:
return JSONResponse({"username": user["username"]})
# ── Page helpers ──────────────────────────────────────────────────────────────
def _slug_to_path(slug: str) -> Path:
if not _SAFE_SLUG.match(slug):
raise HTTPException(400, "Invalid page slug — only alphanumeric, hyphens, underscores, and slashes allowed")
resolved = (pages_dir / f"{slug}.md").resolve()
if not str(resolved).startswith(str(pages_dir.resolve())):
raise HTTPException(400, "Path traversal detected")
return resolved
class PageBody(BaseModel):
content: str
# ── Page endpoints (all require auth) ────────────────────────────────────────
@app.get("/pages")
async def list_pages(user: dict = Depends(require_auth)) -> JSONResponse:
"""Return all wiki page slugs."""
if not pages_dir.exists():
return JSONResponse({"pages": []})
slugs = [
str(p.relative_to(pages_dir).with_suffix(""))
for p in sorted(pages_dir.rglob("*.md"))
]
return JSONResponse({"pages": slugs})
@app.get("/pages/{slug:path}")
async def get_page(slug: str, user: dict = Depends(require_auth)) -> JSONResponse:
"""Return raw markdown content for a page."""
path = _slug_to_path(slug)
if not path.exists():
raise HTTPException(404, "Page not found")
return JSONResponse({"slug": slug, "content": path.read_text(encoding="utf-8")})
@app.post("/pages/{slug:path}")
async def save_page(slug: str, body: PageBody, user: dict = Depends(require_auth)) -> JSONResponse:
"""Create or update a wiki page."""
path = _slug_to_path(slug)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(body.content, encoding="utf-8")
return JSONResponse({"slug": slug, "saved": True})
@app.delete("/pages/{slug:path}")
async def delete_page(slug: str, user: dict = Depends(require_auth)) -> JSONResponse:
"""Delete a wiki page."""
path = _slug_to_path(slug)
if not path.exists():
raise HTTPException(404, "Page not found")
path.unlink()
return JSONResponse({"slug": slug, "deleted": True})
@app.post("/rebuild")
async def rebuild(user: dict = Depends(require_auth)) -> JSONResponse:
"""Trigger an astro build of the site."""
try:
result = subprocess.run(
["npm", "run", "build"],
cwd=site_dir,
capture_output=True,
text=True,
timeout=120,
)
return JSONResponse({
"success": result.returncode == 0,
"stdout": result.stdout[-3000:] if result.stdout else "",
"stderr": result.stderr[-3000:] if result.stderr else "",
})
except subprocess.TimeoutExpired:
raise HTTPException(504, "Build timed out after 120s")
except Exception as e:
raise HTTPException(500, str(e))