269 lines
9.9 KiB
Python
269 lines
9.9 KiB
Python
"""FastAPI edit sidecar for BincioWiki — reads and writes markdown pages."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import secrets
|
|
import sqlite3
|
|
import subprocess
|
|
import time
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
|
|
import bcrypt
|
|
from fastapi import Cookie, Depends, FastAPI, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.middleware.gzip import GZipMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
|
|
# Resolved at startup relative to the project root (one level above this file)
|
|
_ROOT = Path(__file__).parent.parent
|
|
pages_dir: Path = _ROOT / os.environ.get("WIKI_PAGES_DIR", "pages")
|
|
stories_dir: Path = _ROOT / os.environ.get("WIKI_STORIES_DIR", "stories")
|
|
site_dir: Path = _ROOT / "site"
|
|
_DB_PATH = _ROOT / "data" / "wiki.db"
|
|
|
|
_SAFE_SLUG = re.compile(r"^[a-zA-Z0-9_][a-zA-Z0-9_\-/]*$")
|
|
_SESSION_TTL = 7 * 24 * 3600 # 7 days
|
|
|
|
|
|
def _hash_password(password: str) -> str:
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def _verify_password(password: str, hashed: str) -> bool:
|
|
return bcrypt.checkpw(password.encode(), hashed.encode())
|
|
|
|
# ── Database ──────────────────────────────────────────────────────────────────
|
|
|
|
@contextmanager
|
|
def _db():
|
|
con = sqlite3.connect(_DB_PATH)
|
|
con.row_factory = sqlite3.Row
|
|
try:
|
|
yield con
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def _init_db() -> None:
|
|
_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with _db() as con:
|
|
con.executescript("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL
|
|
);
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
token TEXT PRIMARY KEY,
|
|
user_id INTEGER NOT NULL,
|
|
expires_at INTEGER NOT NULL,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
);
|
|
""")
|
|
con.commit()
|
|
# Seed first admin user from env vars if the table is empty
|
|
count = con.execute("SELECT COUNT(*) FROM users").fetchone()[0]
|
|
if count == 0:
|
|
admin_user = os.environ.get("WIKI_ADMIN_USER")
|
|
admin_pass = os.environ.get("WIKI_ADMIN_PASSWORD")
|
|
if admin_user and admin_pass:
|
|
ph = _hash_password(admin_pass)
|
|
con.execute(
|
|
"INSERT INTO users (username, password_hash) VALUES (?, ?)",
|
|
(admin_user, ph),
|
|
)
|
|
con.commit()
|
|
|
|
|
|
_init_db()
|
|
|
|
# ── Auth helpers ──────────────────────────────────────────────────────────────
|
|
|
|
def _get_session_user(token: str | None) -> dict | None:
|
|
if not token:
|
|
return None
|
|
with _db() as con:
|
|
row = con.execute(
|
|
"""SELECT u.id, u.username
|
|
FROM sessions s JOIN users u ON u.id = s.user_id
|
|
WHERE s.token = ? AND s.expires_at > ?""",
|
|
(token, int(time.time())),
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
async def require_auth(bincio_session: str | None = Cookie(default=None)) -> dict:
|
|
user = _get_session_user(bincio_session)
|
|
if not user:
|
|
raise HTTPException(401, "Authentication required")
|
|
return user
|
|
|
|
|
|
# ── App setup ─────────────────────────────────────────────────────────────────
|
|
|
|
_extra_origin = os.environ.get("WIKI_ORIGIN", "")
|
|
_origins = [_extra_origin] if _extra_origin else []
|
|
|
|
app = FastAPI(title="BincioWiki Edit Sidecar", docs_url=None, redoc_url=None)
|
|
|
|
app.add_middleware(GZipMiddleware, minimum_size=1024)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=_origins,
|
|
allow_origin_regex=r"https?://localhost(:\d+)?",
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST", "DELETE"],
|
|
allow_headers=["Content-Type"],
|
|
)
|
|
|
|
# ── Auth endpoints ────────────────────────────────────────────────────────────
|
|
|
|
|
|
class LoginBody(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
|
|
@app.post("/api/auth/login")
|
|
async def login(body: LoginBody) -> JSONResponse:
|
|
with _db() as con:
|
|
row = con.execute(
|
|
"SELECT id, username, password_hash FROM users WHERE username = ?",
|
|
(body.username,),
|
|
).fetchone()
|
|
if not row or not _verify_password(body.password, row["password_hash"]):
|
|
raise HTTPException(401, "Credenziali non valide")
|
|
token = secrets.token_urlsafe(32)
|
|
expires = int(time.time()) + _SESSION_TTL
|
|
with _db() as con:
|
|
con.execute(
|
|
"INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)",
|
|
(token, row["id"], expires),
|
|
)
|
|
con.commit()
|
|
resp = JSONResponse({"username": row["username"]})
|
|
resp.set_cookie("bincio_session", token, httponly=True, samesite="lax", max_age=_SESSION_TTL)
|
|
return resp
|
|
|
|
|
|
@app.post("/api/auth/logout")
|
|
async def logout(bincio_session: str | None = Cookie(default=None)) -> JSONResponse:
|
|
if bincio_session:
|
|
with _db() as con:
|
|
con.execute("DELETE FROM sessions WHERE token = ?", (bincio_session,))
|
|
con.commit()
|
|
resp = JSONResponse({"ok": True})
|
|
resp.delete_cookie("bincio_session", httponly=True, samesite="lax")
|
|
return resp
|
|
|
|
|
|
@app.get("/api/me")
|
|
async def me(user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return JSONResponse({"username": user["username"]})
|
|
|
|
|
|
# ── File helpers ──────────────────────────────────────────────────────────────
|
|
|
|
def _slug_to_path(slug: str, base: Path) -> Path:
|
|
if not _SAFE_SLUG.match(slug):
|
|
raise HTTPException(400, "Invalid slug — only alphanumeric, hyphens, underscores, and slashes allowed")
|
|
resolved = (base / f"{slug}.md").resolve()
|
|
if not str(resolved).startswith(str(base.resolve())):
|
|
raise HTTPException(400, "Path traversal detected")
|
|
return resolved
|
|
|
|
|
|
class PageBody(BaseModel):
|
|
content: str
|
|
|
|
|
|
# ── Page endpoints (all require auth) ────────────────────────────────────────
|
|
|
|
def _list(base: Path) -> list[str]:
|
|
if not base.exists():
|
|
return []
|
|
return [str(p.relative_to(base).with_suffix("")) for p in sorted(base.rglob("*.md"))]
|
|
|
|
def _get(slug: str, base: Path) -> JSONResponse:
|
|
path = _slug_to_path(slug, base)
|
|
if not path.exists():
|
|
raise HTTPException(404, "Not found")
|
|
return JSONResponse({"slug": slug, "content": path.read_text(encoding="utf-8")})
|
|
|
|
def _save(slug: str, body: PageBody, base: Path) -> JSONResponse:
|
|
path = _slug_to_path(slug, base)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(body.content, encoding="utf-8")
|
|
return JSONResponse({"slug": slug, "saved": True})
|
|
|
|
def _delete(slug: str, base: Path) -> JSONResponse:
|
|
path = _slug_to_path(slug, base)
|
|
if not path.exists():
|
|
raise HTTPException(404, "Not found")
|
|
path.unlink()
|
|
return JSONResponse({"slug": slug, "deleted": True})
|
|
|
|
|
|
# ── Page endpoints ────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/pages")
|
|
async def list_pages(user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return JSONResponse({"pages": _list(pages_dir)})
|
|
|
|
@app.get("/pages/{slug:path}")
|
|
async def get_page(slug: str, user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return _get(slug, pages_dir)
|
|
|
|
@app.post("/pages/{slug:path}")
|
|
async def save_page(slug: str, body: PageBody, user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return _save(slug, body, pages_dir)
|
|
|
|
@app.delete("/pages/{slug:path}")
|
|
async def delete_page(slug: str, user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return _delete(slug, pages_dir)
|
|
|
|
|
|
# ── Story endpoints ───────────────────────────────────────────────────────────
|
|
|
|
@app.get("/stories")
|
|
async def list_stories(user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return JSONResponse({"stories": _list(stories_dir)})
|
|
|
|
@app.get("/stories/{slug:path}")
|
|
async def get_story(slug: str, user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return _get(slug, stories_dir)
|
|
|
|
@app.post("/stories/{slug:path}")
|
|
async def save_story(slug: str, body: PageBody, user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return _save(slug, body, stories_dir)
|
|
|
|
@app.delete("/stories/{slug:path}")
|
|
async def delete_story(slug: str, user: dict = Depends(require_auth)) -> JSONResponse:
|
|
return _delete(slug, stories_dir)
|
|
|
|
|
|
@app.post("/rebuild")
|
|
async def rebuild(user: dict = Depends(require_auth)) -> JSONResponse:
|
|
"""Trigger an astro build of the site."""
|
|
try:
|
|
result = subprocess.run(
|
|
["npm", "run", "build"],
|
|
cwd=site_dir,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120,
|
|
)
|
|
return JSONResponse({
|
|
"success": result.returncode == 0,
|
|
"stdout": result.stdout[-3000:] if result.stdout else "",
|
|
"stderr": result.stderr[-3000:] if result.stderr else "",
|
|
})
|
|
except subprocess.TimeoutExpired:
|
|
raise HTTPException(504, "Build timed out after 120s")
|
|
except Exception as e:
|
|
raise HTTPException(500, str(e))
|